Sponsored Links
Ad by Google
The Combiner is one of the powerful feature of Hadoop map reduce programming and used as an optimization technique. In my previous post, we have seen an example of custom data type in Hadoop at here using WritableComparable interface.
In this tutorial, I am going to show you an example of map reduce programming using Combiner, although, we have already seen an example of map reduce programming in our wordcount tutorial.
What is Combiner?
Combiner in map reduce programming is known as a local reducer/mini reducer. It can be used to reduce the amount of data transfer between map and reduce phase. And, It's known as one of the key component to reduce the network traffic between map and reduce phase. Combiner runs on the output of mapper task.
Possible that Combiner will not run even if you used in your map reduce programming. It's better to use combiner as an optimization. It's not guarantee that Combiner will always run, it depends on the spilled. Possible that sometimes mapper output doesn’t needs to spilled to the disk and that time Combiner will not execute even it's used in map reduce programming.
Note - Don't use Combiner blindly because it can be caused of wrong output, if you are using combiner while doing aggregation like in average function. It can be used in commutative and associative function.
Ok, let's implement it in a map reduce programming, I am going to apply Combiner in our click stream analysis program. We have a click stream data set with these values in a tab separated format.
Overview of the project structure:
Main Objects of this project are:
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
pom.xml
This is our mapper class which will process LongWritable as a key and Text as a value and generate the output Text as a key and IntWriteable as a Value.
This class will play a role of Combiner, there is no any specific implementation required for combiner it's just a reducer class, which can be set as a Combiner.
Main class to run, and produce the output at the destination,and in this class we defined Combiner also, see line no 33.
i. Start Hadoop components,open your terminal and type
iii. Create input folder on HDFS with below command.
Step 7. Create & Execute jar file
We almost done,now create jar file of CombinerExample source code. You can create jar file using eclipse or by using mvn package command.
To execute CombinerExample-1.0.jar file use below command
China 1
China 1
China 1
Pakistan 1
Pakistan 1
Pakistan 1
Pakistan 1
United States 1
Step 8. Verify output
You can verify output inside output folder.
That's it :)
In this tutorial, I am going to show you an example of map reduce programming using Combiner, although, we have already seen an example of map reduce programming in our wordcount tutorial.
What is Combiner?
Combiner in map reduce programming is known as a local reducer/mini reducer. It can be used to reduce the amount of data transfer between map and reduce phase. And, It's known as one of the key component to reduce the network traffic between map and reduce phase. Combiner runs on the output of mapper task.
Possible that Combiner will not run even if you used in your map reduce programming. It's better to use combiner as an optimization. It's not guarantee that Combiner will always run, it depends on the spilled. Possible that sometimes mapper output doesn’t needs to spilled to the disk and that time Combiner will not execute even it's used in map reduce programming.
Note - Don't use Combiner blindly because it can be caused of wrong output, if you are using combiner while doing aggregation like in average function. It can be used in commutative and associative function.
Ok, let's implement it in a map reduce programming, I am going to apply Combiner in our click stream analysis program. We have a click stream data set with these values in a tab separated format.
- timestamp
- requested_url
- referring_url
- user_agent
- country
- mac_address
Tools and Technologies we are using here:
- Java 8
- Eclipse Mars
- Hadoop 2.7.1
- Maven 3.3
- Ubuntu 14(Linux OS)
Overview of the project structure:
Main Objects of this project are:
- ClickStreamDriver.java
- ClickStreamReducer.java
- ClickStreamMapper.java
- pom.xml
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.exampl</groupId> <artifactId>CombinerExample</artifactId> <version>1.0</version> <name>CombinerExample</name> <description>Combiner Example in Map Reduce</description> </project>Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.exampl</groupId> <artifactId>CombinerExample</artifactId> <version>1.0</version> <name>CombinerExample</name> <description>Combiner Example in Map Reduce</description> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>Step 3. ClickStreamMapper.java (Mapper)
This is our mapper class which will process LongWritable as a key and Text as a value and generate the output Text as a key and IntWriteable as a Value.
package com.javamakeuse.bd; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ClickStreamMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable count = new IntWritable(1); Text country = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] columns = value.toString().split("\\t"); if (columns.length > 4) { country.set(columns[4]); // India 1 context.write(country, count); } } }Step 4. ClickStreamReducer.java (Reducer)
This class will play a role of Combiner, there is no any specific implementation required for combiner it's just a reducer class, which can be set as a Combiner.
package com.javamakeuse.bd; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ClickStreamReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int noOfClicks = 0; for (IntWritable value : values) { // calculating the number of clicks per country. noOfClicks += value.get(); } // writing to the disk context.write(key, new IntWritable(noOfClicks)); } }Step 5. ClickStreamDriver.java (Driver main)
Main class to run, and produce the output at the destination,and in this class we defined Combiner also, see line no 33.
package com.javamakeuse.bd; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ClickStreamDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(); job.setJarByClass(getClass()); job.setJobName("ClickStream"); //input path FileInputFormat.addInputPath(job, new Path(args[0])); // output path FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(ClickStreamMapper.class); job.setCombinerClass(ClickStreamReducer.class); job.setReducerClass(ClickStreamReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new ClickStreamDriver(), args); System.exit(exitCode); } }Step 6. Steps to execute our Click Stream Program
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.shii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps 8385 NameNode 8547 DataNode 5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar 9446 Jps 8918 ResourceManager 9054 NodeManager 8751 SecondaryNameNodeYou can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /inputThe above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the click_stream_data.txt input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/click_stream_data.txt /input/Note - click_stream_data.txt file will available inside this project source code, you would be able to download it from our downloadable link, you will find downloadable link at the end of this tutorial.
Step 7. Create & Execute jar file
We almost done,now create jar file of CombinerExample source code. You can create jar file using eclipse or by using mvn package command.
To execute CombinerExample-1.0.jar file use below command
hadoop jar /home/subodh/CombinerExample-1.0.jar com.javamakeuse.bd.ClickStreamDriver /input /outputAbove will generate below output and also create an output folder with output of the CombinerExample project.
16/03/08 23:36:20 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 16/03/08 23:36:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/03/08 23:36:21 INFO input.FileInputFormat: Total input paths to process : 1 16/03/08 23:36:21 INFO mapreduce.JobSubmitter: number of splits:1 16/03/08 23:36:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1706395962_0001 16/03/08 23:36:21 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 16/03/08 23:36:21 INFO mapreduce.Job: Running job: job_local1706395962_0001 16/03/08 23:36:21 INFO mapred.LocalJobRunner: OutputCommitter set in config null 16/03/08 23:36:21 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/08 23:36:21 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 16/03/08 23:36:21 INFO mapred.LocalJobRunner: Waiting for map tasks 16/03/08 23:36:21 INFO mapred.LocalJobRunner: Starting task: attempt_local1706395962_0001_m_000000_0 16/03/08 23:36:21 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/08 23:36:21 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/08 23:36:21 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/click_stream_data.txt:0+159973 16/03/08 23:36:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 16/03/08 23:36:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 16/03/08 23:36:22 INFO mapred.MapTask: soft limit at 83886080 16/03/08 23:36:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 16/03/08 23:36:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 16/03/08 23:36:22 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 16/03/08 23:36:22 INFO mapred.LocalJobRunner: 16/03/08 23:36:22 INFO mapred.MapTask: Starting flush of map output 16/03/08 23:36:22 INFO mapred.MapTask: Spilling map output 16/03/08 23:36:22 INFO mapred.MapTask: bufstart = 0; bufend = 1120; bufvoid = 104857600 16/03/08 23:36:22 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213996(104855984); length = 401/6553600 16/03/08 23:36:22 INFO mapred.MapTask: Finished spill 0 16/03/08 23:36:22 INFO mapred.Task: Task:attempt_local1706395962_0001_m_000000_0 is done. And is in the process of committing 16/03/08 23:36:22 INFO mapred.LocalJobRunner: map 16/03/08 23:36:22 INFO mapred.Task: Task 'attempt_local1706395962_0001_m_000000_0' done. 16/03/08 23:36:22 INFO mapred.LocalJobRunner: Finishing task: attempt_local1706395962_0001_m_000000_0 16/03/08 23:36:22 INFO mapred.LocalJobRunner: map task executor complete. 16/03/08 23:36:22 INFO mapred.LocalJobRunner: Waiting for reduce tasks 16/03/08 23:36:22 INFO mapred.LocalJobRunner: Starting task: attempt_local1706395962_0001_r_000000_0 16/03/08 23:36:22 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/08 23:36:22 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/08 23:36:22 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@30d6304e 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10 16/03/08 23:36:22 INFO reduce.EventFetcher: attempt_local1706395962_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 16/03/08 23:36:22 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1706395962_0001_m_000000_0 decomp: 63 len: 67 to MEMORY 16/03/08 23:36:22 INFO reduce.InMemoryMapOutput: Read 63 bytes from map-output for attempt_local1706395962_0001_m_000000_0 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 63, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->63 16/03/08 23:36:22 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 16/03/08 23:36:22 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs 16/03/08 23:36:22 INFO mapred.Merger: Merging 1 sorted segments 16/03/08 23:36:22 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 55 bytes 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merged 1 segments, 63 bytes to disk to satisfy reduce memory limit 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merging 1 files, 67 bytes from disk 16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 16/03/08 23:36:22 INFO mapred.Merger: Merging 1 sorted segments 16/03/08 23:36:22 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 55 bytes 16/03/08 23:36:22 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/08 23:36:22 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 16/03/08 23:36:22 INFO mapreduce.Job: Job job_local1706395962_0001 running in uber mode : false 16/03/08 23:36:22 INFO mapreduce.Job: map 100% reduce 0% 16/03/08 23:36:23 INFO mapred.Task: Task:attempt_local1706395962_0001_r_000000_0 is done. And is in the process of committing 16/03/08 23:36:23 INFO mapred.LocalJobRunner: 1 / 1 copied. 16/03/08 23:36:23 INFO mapred.Task: Task attempt_local1706395962_0001_r_000000_0 is allowed to commit now 16/03/08 23:36:23 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1706395962_0001_r_000000_0' to hdfs://localhost:9000/user/subodh/outputc/_temporary/0/task_local1706395962_0001_r_000000 16/03/08 23:36:23 INFO mapred.LocalJobRunner: reduce > reduce 16/03/08 23:36:23 INFO mapred.Task: Task 'attempt_local1706395962_0001_r_000000_0' done. 16/03/08 23:36:23 INFO mapred.LocalJobRunner: Finishing task: attempt_local1706395962_0001_r_000000_0 16/03/08 23:36:23 INFO mapred.LocalJobRunner: reduce task executor complete. 16/03/08 23:36:23 INFO mapreduce.Job: map 100% reduce 100% 16/03/08 23:36:23 INFO mapreduce.Job: Job job_local1706395962_0001 completed successfully 16/03/08 23:36:23 INFO mapreduce.Job: Counters: 35 File System Counters FILE: Number of bytes read=11380 FILE: Number of bytes written=565119 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=319946 HDFS: Number of bytes written=47 HDFS: Number of read operations=13 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 Map-Reduce Framework Map input records=101 Map output records=101 Map output bytes=1120 Map output materialized bytes=67 Input split bytes=114 Combine input records=101 Combine output records=4 Reduce input groups=4 Reduce shuffle bytes=67 Reduce input records=4 Reduce output records=4 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=5 Total committed heap usage (bytes)=526385152 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=159973 File Output Format Counters Bytes Written=47We are using Combiner, so reducer will received the commutative value from mapper, without using combiner mapper will generate something like this value -
China 1
China 1
China 1
Pakistan 1
Pakistan 1
Pakistan 1
Pakistan 1
United States 1
Step 8. Verify output
You can verify output inside output folder.
subodh@subodh-Inspiron-3520:~/programs$ hadoop fs -cat /output/pa* 16/03/08 23:52:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable China 84 Pakistan 4 United States 12
That's it :)
Download the complete example from here Source Code
Sponsored Links
0 comments:
Post a Comment