Sponsored Links
Ad by Google
We have already seen an example of Combiner in MapReduce programming and Custom Partitioner. In this tutorial, I am going to show you an example of Map side join in Hadoop MapReduce. If you want to dig more into the deep of MapReduce, and how it works, than you may like this article on how map reduce works.
MapReduce process the big data sets, and processing large data sets most of the time required joining between datasets based on common key like we almost always do while playing with any RDBMS database based on primary/foreign key concept.
Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
Map side Join
You can use Map side join using two different ways based on your datasets, and those depends on below conditions -
OK, Let's find the user activity on social media, what are the actions user performed on popular social media like commenting on post, shared something, like something etc.
And for these we have two different log files -
1. user.log
2. user_activity.log
3. Expected output
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
pom.xml
This is our value object class, which will contains the fields needs to be written as an output of the project.
Inside this mapper class, we are setting the properties of UserActivityVO class, user.log file is in from distributed cache.
The reducer class just iterating the values and writing into the context.
Here we are adding user.log file into the distributed cache.
Step 7. Steps to execute MapSideJoin project
i. Start Hadoop components,open your terminal and type
iii. Create input folder on HDFS with below command.
Step 8. Create & Execute jar file
We almost done,now create jar file of MapSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute MapSideJoin-1.0.jar file use below command
Step 9. Verify output
That's it.
Happy Data analytic :)
MapReduce process the big data sets, and processing large data sets most of the time required joining between datasets based on common key like we almost always do while playing with any RDBMS database based on primary/foreign key concept.
Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
- Map Side Join
- Reduce Side Join(Reduce side join example)
Map side Join
You can use Map side join using two different ways based on your datasets, and those depends on below conditions -
- Both datasets are must be divided into the same number of partitions, and must be already sorted by the same key.
- From the two datasets one must be small(something like master dataset) and able to fit into the memory of each nodes.
OK, Let's find the user activity on social media, what are the actions user performed on popular social media like commenting on post, shared something, like something etc.
And for these we have two different log files -
- user.log
- user_activity.log
1. user.log
2. user_activity.log
3. Expected output
Tools and Technologies we are using here:
- Java 8
- Eclipse Mars
- Hadoop 2.7.1
- Maven 3.3
- Ubuntu 14(Linux OS)
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.poc</groupId> <artifactId>MapSideJoin</artifactId> <version>1.0</version> <name>MapSideJoin</name> <description>MapSideJoin Example in Map Reduce</description> </project>Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file, just copy and paste, it will work.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.poc</groupId> <artifactId>MapSideJoin</artifactId> <version>1.0</version> <name>MapSideJoin</name> <description>MapSideJoin Example</description> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>Step 3. UserActivityVO.java
This is our value object class, which will contains the fields needs to be written as an output of the project.
package com.javamakeuse.bd.poc.vo; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class UserActivityVO implements Writable { private int userId; private String userName; private String comments; private String postShared; public int getUserId() { return userId; } public void setUserId(int userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getComments() { return comments; } public void setComments(String comments) { this.comments = comments; } public String getPostShared() { return postShared; } public void setPostShared(String postShared) { this.postShared = postShared; } @Override public void write(DataOutput out) throws IOException { out.writeInt(userId); out.writeUTF(userName); out.writeUTF(comments); out.writeUTF(postShared); } @Override public void readFields(DataInput in) throws IOException { userId = in.readInt(); userName = in.readUTF(); comments = in.readUTF(); postShared = in.readUTF(); } @Override public String toString() { return "UserActivityVO [userId=" + userId + ", userName=" + userName + ", comments=" + comments + ", postShared=" + postShared + "]"; } }Step 4. UserActivityMapper.java (Mapper)
Inside this mapper class, we are setting the properties of UserActivityVO class, user.log file is in from distributed cache.
package com.javamakeuse.bd.poc.mapper; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.javamakeuse.bd.poc.vo.UserActivityVO; public class UserActivityMapper extends Mapper<LongWritable, Text, IntWritable, UserActivityVO> { // user map to keep the userId-userName private Map<Integer, String> userMap = new HashMap<>(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context) throws IOException, InterruptedException { String[] columns = value.toString().split("\t"); if (columns != null && columns.length > 2) { UserActivityVO userActivityVO = new UserActivityVO(); userActivityVO.setUserId(Integer.parseInt(columns[1])); userActivityVO.setComments(columns[2]); userActivityVO.setPostShared(columns[3]); userActivityVO.setUserName(userMap.get(userActivityVO.getUserId())); // writing into context context.write(new IntWritable(userActivityVO.getUserId()), userActivityVO); } } @Override protected void setup(Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context) throws IOException, InterruptedException { // loading user map in context loadUserInMemory(context); } private void loadUserInMemory(Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context) { // user.log is in distributed cache try (BufferedReader br = new BufferedReader(new FileReader("user.log"))) { String line; while ((line = br.readLine()) != null) { String columns[] = line.split("\t"); userMap.put(Integer.parseInt(columns[0]), columns[1]); } } catch (IOException e) { e.printStackTrace(); } } }Step 5. UserActivityReducer.java (Reducer)
The reducer class just iterating the values and writing into the context.
package com.javamakeuse.bd.poc.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import com.javamakeuse.bd.poc.vo.UserActivityVO; public class UserActivityReducer extends Reducer<IntWritable, UserActivityVO, UserActivityVO, NullWritable> { NullWritable value = NullWritable.get(); @Override protected void reduce(IntWritable key, Iterable<UserActivityVO> values, Reducer<IntWritable, UserActivityVO, UserActivityVO, NullWritable>.Context context) throws IOException, InterruptedException { for (UserActivityVO userActivityVO : values) { context.write(userActivityVO, value); } } }Step 6. UserActivityDriver.java (Driver)
Here we are adding user.log file into the distributed cache.
package com.javamakeuse.bd.poc.driver; import java.net.URI; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.javamakeuse.bd.poc.mapper.UserActivityMapper; import com.javamakeuse.bd.poc.reducer.UserActivityReducer; import com.javamakeuse.bd.poc.vo.UserActivityVO; public class UserActivityDriver extends Configured implements Tool { public static void main(String[] args) { try { int status = ToolRunner.run(new UserActivityDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input1> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(); job.setJarByClass(getClass()); job.setJobName("MapSideJoin Example"); // input path FileInputFormat.addInputPath(job, new Path(args[0])); // output path FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(UserActivityMapper.class); job.setReducerClass(UserActivityReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(UserActivityVO.class); job.addCacheFile(new URI("hdfs://localhost:9000/input/user.log")); job.setOutputKeyClass(UserActivityVO.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } }Done, next to run this program, you can run it using any eclipse also, below are the steps to run using terminal.
Step 7. Steps to execute MapSideJoin project
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.shii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps 8385 NameNode 8547 DataNode 5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar 9446 Jps 8918 ResourceManager 9054 NodeManager 8751 SecondaryNameNodeYou can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /inputThe above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the user.log and user_activity.log input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/user.log user_activity.log /input/Note - user.log and user_activity.log file is available inside this project source code, you would be able to download it from our downloadable link, you will find downloadable link at the end of this tutorial.
Step 8. Create & Execute jar file
We almost done,now create jar file of MapSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute MapSideJoin-1.0.jar file use below command
hadoop jar /home/subodh/MapSideJoin-1.0.jar com.javamakeuse.bd.poc.driver.UserActivityDriver /input/user_activity.log /outputAbove will generate below output and also create an output folder with output of the MapSideJoin project.
16/03/24 17:49:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/03/24 17:49:06 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 16/03/24 17:49:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 16/03/24 17:49:06 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/03/24 17:49:06 INFO input.FileInputFormat: Total input paths to process : 1 16/03/24 17:49:06 INFO mapreduce.JobSubmitter: number of splits:1 16/03/24 17:49:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1445861261_0001 16/03/24 17:49:07 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-subodh/mapred/local/1458821947020/user.log <- /home/subodh/user.log 16/03/24 17:49:07 INFO mapred.LocalDistributedCacheManager: Localized hdfs://localhost:9000/input/user.log as file:/tmp/hadoop-subodh/mapred/local/1458821947020/user.log 16/03/24 17:49:07 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 16/03/24 17:49:07 INFO mapreduce.Job: Running job: job_local1445861261_0001 16/03/24 17:49:07 INFO mapred.LocalJobRunner: OutputCommitter set in config null 16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/24 17:49:07 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Waiting for map tasks 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1445861261_0001_m_000000_0 16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/24 17:49:07 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/24 17:49:07 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user_activity.log:0+282 16/03/24 17:49:07 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 16/03/24 17:49:07 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 16/03/24 17:49:07 INFO mapred.MapTask: soft limit at 83886080 16/03/24 17:49:07 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 16/03/24 17:49:07 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 16/03/24 17:49:07 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 16/03/24 17:49:07 INFO mapred.LocalJobRunner: 16/03/24 17:49:07 INFO mapred.MapTask: Starting flush of map output 16/03/24 17:49:07 INFO mapred.MapTask: Spilling map output 16/03/24 17:49:07 INFO mapred.MapTask: bufstart = 0; bufend = 339; bufvoid = 104857600 16/03/24 17:49:07 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600 16/03/24 17:49:07 INFO mapred.MapTask: Finished spill 0 16/03/24 17:49:07 INFO mapred.Task: Task:attempt_local1445861261_0001_m_000000_0 is done. And is in the process of committing 16/03/24 17:49:07 INFO mapred.LocalJobRunner: map 16/03/24 17:49:07 INFO mapred.Task: Task 'attempt_local1445861261_0001_m_000000_0' done. 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1445861261_0001_m_000000_0 16/03/24 17:49:07 INFO mapred.LocalJobRunner: map task executor complete. 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Waiting for reduce tasks 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1445861261_0001_r_000000_0 16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/24 17:49:07 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/24 17:49:07 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@396989af 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10 16/03/24 17:49:07 INFO reduce.EventFetcher: attempt_local1445861261_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 16/03/24 17:49:07 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1445861261_0001_m_000000_0 decomp: 349 len: 353 to MEMORY 16/03/24 17:49:07 INFO reduce.InMemoryMapOutput: Read 349 bytes from map-output for attempt_local1445861261_0001_m_000000_0 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 349, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->349 16/03/24 17:49:07 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 16/03/24 17:49:07 INFO mapred.Merger: Merging 1 sorted segments 16/03/24 17:49:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 343 bytes 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merged 1 segments, 349 bytes to disk to satisfy reduce memory limit 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merging 1 files, 353 bytes from disk 16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 16/03/24 17:49:07 INFO mapred.Merger: Merging 1 sorted segments 16/03/24 17:49:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 343 bytes 16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/24 17:49:07 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 16/03/24 17:49:07 INFO mapred.Task: Task:attempt_local1445861261_0001_r_000000_0 is done. And is in the process of committing 16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/24 17:49:07 INFO mapred.Task: Task attempt_local1445861261_0001_r_000000_0 is allowed to commit now 16/03/24 17:49:07 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1445861261_0001_r_000000_0' to hdfs://localhost:9000/ua6/_temporary/0/task_local1445861261_0001_r_000000 16/03/24 17:49:07 INFO mapred.LocalJobRunner: reduce > reduce 16/03/24 17:49:07 INFO mapred.Task: Task 'attempt_local1445861261_0001_r_000000_0' done. 16/03/24 17:49:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1445861261_0001_r_000000_0 16/03/24 17:49:07 INFO mapred.LocalJobRunner: reduce task executor complete. 16/03/24 17:49:08 INFO mapreduce.Job: Job job_local1445861261_0001 running in uber mode : false 16/03/24 17:49:08 INFO mapreduce.Job: map 100% reduce 100% 16/03/24 17:49:08 INFO mapreduce.Job: Job job_local1445861261_0001 completed successfully 16/03/24 17:49:08 INFO mapreduce.Job: Counters: 35 File System Counters FILE: Number of bytes read=17570 FILE: Number of bytes written=576233 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=854 HDFS: Number of bytes written=527 HDFS: Number of read operations=31 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=339 Map output materialized bytes=353 Input split bytes=110 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=353 Reduce input records=4 Reduce output records=4 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=0 Total committed heap usage (bytes)=534773760 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=282 File Output Format Counters Bytes Written=527
Step 9. Verify output
subodh@subodh-Inspiron-3520:~/Downloads$ hadoop fs -cat /output/par* 16/03/24 17:49:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable UserActivityVO [userId=1, userName=Susan, comments=looking awesome:), postShared=http://dummyimage.com/160x166.gif/5fa2dd/ffffff] UserActivityVO [userId=2, userName=Kathleen, comments=full masti, postShared=http://dummyimage.com/250x142.png/ff4444/ffffff] UserActivityVO [userId=3, userName=Marilyn, comments=wow gangnam style,cool, postShared=http://dummyimage.com/124x173.png/cc0000/ffffff] UserActivityVO [userId=4, userName=Craig, comments=welcome to the heaven, postShared=http://dummyimage.com/148x156.png/ff4444/ffffff]
That's it.
Download the complete example from here Source Code
Happy Data analytic :)
Sponsored Links
Hi There,
ReplyDeleteCan anybody please tell me What is the meaning of below line #49?
userMap.put(Integer.parseInt(columns[0]), columns[1]);
It's map to keep the user_id as key and user_name as value inside that map.
Delete