Sponsored Links
Ad by Google
Joins are very important aspect in any databases and, In Hadoop MapReduce joins are also available to join the multiple datasets. Joins are always involved while preparing data for presentation from multiple tables/datasets like fetching users activities on social media, where user and user_activity two different datasets needs to be incorporate to extract the informations. We have already seen some most popular features of Hadoop MapReduce like Custom Data Types, Partitioner in MapReduce etc.
Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
Reduce Side Join
A reduce side join is very simple and easy to implement as compared to map side join, but yes it is highly payee join as compared to map side join, because both datasets needs to go through with shuffle&sort phase, for more about internals of MapReduce and how it works see(how MapReduce work). Reduce side join required some additional activity/code to implement with the combination of Secondary sort, so must see an example of Secondary sort, below are the steps needs to perform to achieve the reduce side join.
And for these we have two different log files -
Here is the tabular view of these datasets,
1. user.log
2. user_activity.log
3. Expected output
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
pom.xml
CustomKey is a custom datatype used as key in mapper and reducer phase,contains userId and dataSetType to identify the type of data like user.log or user_activity.log(1=user.log and 2=user_activity.log).
Group the joined data based on the natural key.
Custom partitioner to control the keys of intermediate output of mappers, custom partitioner tutorial.
Mapper class to process the tab separated user.log dataset, which will treated as master dataset for this project, here we are adding 1 with key for user.log dataset.
Mapper class to process the pipe | separated user_activity.log dataset, here we are adding 2 with key for user_activity.log data.
This is our reducer class and here we are joining the two datasets, extracting first record from the values which is user data as we had added 1 for user and 2 for user_activity data, so the first record corresponding to a key should be user name and next records are user_activity data.
Our main class to run the reduce side join example, here we have used MultipleInputPaths for two different datasets and also sets partitioner and group comparator to job instance.
Step 10. Steps to execute ReduceSideJoin project
i. Start Hadoop components,open your terminal and type
iii. Create input folder on HDFS using below command.
Step 11. Create & Execute jar file
We almost done,now create jar file of ReduceSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute ReduceSideJoin-1.0.jar file use below command
That's it.
Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
- Reduce Side Join
- Map Side Join(map side join using distributed cache)
Reduce Side Join
A reduce side join is very simple and easy to implement as compared to map side join, but yes it is highly payee join as compared to map side join, because both datasets needs to go through with shuffle&sort phase, for more about internals of MapReduce and how it works see(how MapReduce work). Reduce side join required some additional activity/code to implement with the combination of Secondary sort, so must see an example of Secondary sort, below are the steps needs to perform to achieve the reduce side join.
- The key of the mapper output must be the join key like primary key, so that the record with same key must go to the same reducer.
- While processing the datasets in mapper phase, must be tagged with an identifier so that both datasets can be easily identified.(master datasets must be reached to reducer before the datasets to be joined)
- The Secondary sort must be implemented to ensure the ordering of the values reached to reducer.
- If the input datasets are in different format than must be use multiple mapper with MultipleInputs to process the multiple input files.
And for these we have two different log files -
- user.log(tab separated)
- user_activity.log(pipe | separated)
Here is the tabular view of these datasets,
1. user.log
2. user_activity.log
3. Expected output
Tools and Technologies we are using here:
- Java 8
- Eclipse Mars
- Hadoop 2.7.1
- Maven 3.3
- Ubuntu 14(Linux OS)
- CustomKey.java(composite key custom datatype)
- KeyPartitioner.java(custom partitioner)
- GroupComparator.java(comparator)
- UserMapper.java(for tab separator)
- UserActivityMapper.java(for pipe | separator)
- UserActivityReducer.java
- UserActivityDriver.java(with multiple input paths)
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.poc</groupId> <artifactId>ReduceSideJoin</artifactId> <version>1.0</version> <name>ReduceSideJoin</name> <description>ReduceSideJoin Example in Map Reduce</description> </project>Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file, just copy and paste, it will work.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javamakeuse.bd.poc</groupId> <artifactId>ReduceSideJoin</artifactId> <version>1.0</version> <name>ReduceSideJoin</name> <description>ReduceSideJoin Example</description> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>Step 3. CustomKey.java
CustomKey is a custom datatype used as key in mapper and reducer phase,contains userId and dataSetType to identify the type of data like user.log or user_activity.log(1=user.log and 2=user_activity.log).
package com.javamakeuse.bd.poc.util; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class CustomKey implements WritableComparable<CustomKey> { private Integer userId; private Integer dataSetType; public CustomKey() { } public CustomKey(Integer userId, Integer dataSetType) { super(); this.userId = userId; this.dataSetType = dataSetType; } public Integer getUserId() { return userId; } public Integer getDataSetType() { return dataSetType; } @Override public void write(DataOutput out) throws IOException { out.writeInt(userId); out.writeInt(dataSetType); } @Override public void readFields(DataInput in) throws IOException { userId = in.readInt(); dataSetType = in.readInt(); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((dataSetType == null) ? 0 : dataSetType.hashCode()); result = prime * result + ((userId == null) ? 0 : userId.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; CustomKey other = (CustomKey) obj; if (dataSetType == null) { if (other.dataSetType != null) return false; } else if (!dataSetType.equals(other.dataSetType)) return false; if (userId == null) { if (other.userId != null) return false; } else if (!userId.equals(other.userId)) return false; return true; } @Override public int compareTo(CustomKey o) { int returnValue = compare(userId, o.getUserId()); if (returnValue != 0) { return returnValue; } return compare(dataSetType, o.getDataSetType()); } public static int compare(int k1, int k2) { return (k1 < k2 ? -1 : (k1 == k2 ? 0 : 1)); } @Override public String toString() { return "CustomKey [userId=" + userId + ", dataSetType=" + dataSetType + "]"; } }Step 4. GroupComparator.java
Group the joined data based on the natural key.
package com.javamakeuse.bd.poc.util; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { protected GroupComparator() { super(CustomKey.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { CustomKey ck = (CustomKey) w1; CustomKey ck2 = (CustomKey) w2; return CustomKey.compare(ck.getUserId(), ck2.getUserId()); } }Step 5. KeyPartitioner.java
Custom partitioner to control the keys of intermediate output of mappers, custom partitioner tutorial.
package com.javamakeuse.bd.poc.util; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class KeyPartitioner extends Partitioner<CustomKey, Text> { @Override public int getPartition(CustomKey key, Text value, int numPartitions) { return (key.getUserId().hashCode() & Integer.MAX_VALUE) % numPartitions; } }Step 6. UserMapper.java
Mapper class to process the tab separated user.log dataset, which will treated as master dataset for this project, here we are adding 1 with key for user.log dataset.
package com.javamakeuse.bd.poc.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.javamakeuse.bd.poc.util.CustomKey; public class UserMapper extends Mapper<LongWritable, Text, CustomKey, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CustomKey, Text>.Context context) throws IOException, InterruptedException { String[] columns = value.toString().split("\t"); int userId = Integer.parseInt(columns[0]); // dataSetType 1= user.log data context.write(new CustomKey(userId, 1), new Text(columns[1])); } }Step 7. UserActivityMapper.java
Mapper class to process the pipe | separated user_activity.log dataset, here we are adding 2 with key for user_activity.log data.
package com.javamakeuse.bd.poc.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.javamakeuse.bd.poc.util.CustomKey; public class UserActivityMapper extends Mapper<LongWritable, Text, CustomKey, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CustomKey, Text>.Context context) throws IOException, InterruptedException { String[] columns = value.toString().split("\\|"); if (columns != null && columns.length > 3) { // dataSetType 2=user_activity.log data context.write(new CustomKey(Integer.parseInt(columns[1]), 2), new Text(columns[2] + "\t" + columns[3])); } } }Step 8. UserActivityReducer.java
This is our reducer class and here we are joining the two datasets, extracting first record from the values which is user data as we had added 1 for user and 2 for user_activity data, so the first record corresponding to a key should be user name and next records are user_activity data.
package com.javamakeuse.bd.poc.reducer; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.javamakeuse.bd.poc.util.CustomKey; public class UserActivityReducer extends Reducer<CustomKey, Text, IntWritable, Text> { @Override protected void reduce(CustomKey key, Iterable<Text> values, Reducer<CustomKey, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { Iterator<Text> itr = values.iterator(); Text userName = new Text(itr.next()); while (itr.hasNext()) { Text activityInfo = itr.next(); Text userActivityInfo = new Text(userName.toString() + "\t" + activityInfo.toString()); context.write(new IntWritable(key.getUserId()), userActivityInfo); } } }Step 9. UserActivityDriver.java
Our main class to run the reduce side join example, here we have used MultipleInputPaths for two different datasets and also sets partitioner and group comparator to job instance.
package com.javamakeuse.bd.poc.driver; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.javamakeuse.bd.poc.mapper.UserActivityMapper; import com.javamakeuse.bd.poc.mapper.UserMapper; import com.javamakeuse.bd.poc.reducer.UserActivityReducer; import com.javamakeuse.bd.poc.util.CustomKey; import com.javamakeuse.bd.poc.util.GroupComparator; import com.javamakeuse.bd.poc.util.KeyPartitioner; public class UserActivityDriver extends Configured implements Tool { public static void main(String[] args) { try { int status = ToolRunner.run(new UserActivityDriver(), args); System.exit(status); } catch (Exception e) { e.printStackTrace(); } } public int run(String[] args) throws Exception { if (args.length != 3) { System.err.printf("Usage: %s [generic options] <input1> <output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Job job = Job.getInstance(); job.setJarByClass(getClass()); job.setJobName("ReduceSideJoin Example"); // input paths MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserActivityMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, UserMapper.class); // output path FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setReducerClass(UserActivityReducer.class); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(GroupComparator.class); job.setMapOutputKeyClass(CustomKey.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } }Done, next to run this program, you can run it using any eclipse also, below are the steps to run using terminal.
Step 10. Steps to execute ReduceSideJoin project
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.shii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps 8385 NameNode 8547 DataNode 5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar 9446 Jps 8918 ResourceManager 9054 NodeManager 8751 SecondaryNameNodeYou can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS using below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /inputThe above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the user.log and user_activity.log input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/user.log user_activity.log /input/Note - user.log and user_activity.log file is available inside this project source code, you would be able to download it from our downloadable link, you will be find downloadable link at the end of this tutorial.
Step 11. Create & Execute jar file
We almost done,now create jar file of ReduceSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute ReduceSideJoin-1.0.jar file use below command
hadoop jar /home/subodh/ReduceSideJoin-1.0.jar com.javamakeuse.bd.poc.driver.UserActivityDriver /input/user_activity.log /input/user.log /outputAbove will generate below output and also create an output folder with output of the ReduceSideJoin project.
16/03/27 22:08:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/03/27 22:08:31 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 16/03/27 22:08:31 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 16/03/27 22:08:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/03/27 22:08:32 INFO input.FileInputFormat: Total input paths to process : 1 16/03/27 22:08:32 INFO input.FileInputFormat: Total input paths to process : 1 16/03/27 22:08:32 INFO mapreduce.JobSubmitter: number of splits:2 16/03/27 22:08:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local574821361_0001 16/03/27 22:08:32 INFO mapreduce.Job: The url to track the job: http://localhost:8080/ 16/03/27 22:08:32 INFO mapreduce.Job: Running job: job_local574821361_0001 16/03/27 22:08:32 INFO mapred.LocalJobRunner: OutputCommitter set in config null 16/03/27 22:08:32 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/27 22:08:32 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 16/03/27 22:08:32 INFO mapred.LocalJobRunner: Waiting for map tasks 16/03/27 22:08:32 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_m_000000_0 16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/27 22:08:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/27 22:08:33 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user_activity.log:0+641 16/03/27 22:08:33 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 16/03/27 22:08:33 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 16/03/27 22:08:33 INFO mapred.MapTask: soft limit at 83886080 16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 16/03/27 22:08:33 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 16/03/27 22:08:33 INFO mapred.LocalJobRunner: 16/03/27 22:08:33 INFO mapred.MapTask: Starting flush of map output 16/03/27 22:08:33 INFO mapred.MapTask: Spilling map output 16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufend = 632; bufvoid = 104857600 16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600 16/03/27 22:08:33 INFO mapred.MapTask: Finished spill 0 16/03/27 22:08:33 INFO mapred.Task: Task:attempt_local574821361_0001_m_000000_0 is done. And is in the process of committing 16/03/27 22:08:33 INFO mapred.LocalJobRunner: map 16/03/27 22:08:33 INFO mapred.Task: Task 'attempt_local574821361_0001_m_000000_0' done. 16/03/27 22:08:33 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_m_000000_0 16/03/27 22:08:33 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_m_000001_0 16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/27 22:08:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/27 22:08:33 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user.log:0+157 16/03/27 22:08:33 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584) 16/03/27 22:08:33 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100 16/03/27 22:08:33 INFO mapred.MapTask: soft limit at 83886080 16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600 16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 16/03/27 22:08:33 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 16/03/27 22:08:33 INFO mapred.LocalJobRunner: 16/03/27 22:08:33 INFO mapred.MapTask: Starting flush of map output 16/03/27 22:08:33 INFO mapred.MapTask: Spilling map output 16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufend = 61; bufvoid = 104857600 16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600 16/03/27 22:08:33 INFO mapred.MapTask: Finished spill 0 16/03/27 22:08:33 INFO mapred.Task: Task:attempt_local574821361_0001_m_000001_0 is done. And is in the process of committing 16/03/27 22:08:33 INFO mapred.LocalJobRunner: map 16/03/27 22:08:33 INFO mapred.Task: Task 'attempt_local574821361_0001_m_000001_0' done. 16/03/27 22:08:33 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_m_000001_0 16/03/27 22:08:33 INFO mapred.LocalJobRunner: map task executor complete. 16/03/27 22:08:33 INFO mapred.LocalJobRunner: Waiting for reduce tasks 16/03/27 22:08:33 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_r_000000_0 16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 16/03/27 22:08:33 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ] 16/03/27 22:08:33 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@4e12ef32 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10 16/03/27 22:08:33 INFO reduce.EventFetcher: attempt_local574821361_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events 16/03/27 22:08:33 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local574821361_0001_m_000000_0 decomp: 652 len: 656 to MEMORY 16/03/27 22:08:33 INFO reduce.InMemoryMapOutput: Read 652 bytes from map-output for attempt_local574821361_0001_m_000000_0 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 652, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->652 16/03/27 22:08:33 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local574821361_0001_m_000001_0 decomp: 71 len: 75 to MEMORY 16/03/27 22:08:33 INFO reduce.InMemoryMapOutput: Read 71 bytes from map-output for attempt_local574821361_0001_m_000001_0 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 71, inMemoryMapOutputs.size() -> 2, commitMemory -> 652, usedMemory ->723 16/03/27 22:08:33 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning 16/03/27 22:08:33 INFO mapred.LocalJobRunner: 2 / 2 copied. 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs 16/03/27 22:08:33 INFO mapred.Merger: Merging 2 sorted segments 16/03/27 22:08:33 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 703 bytes 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merged 2 segments, 723 bytes to disk to satisfy reduce memory limit 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merging 1 files, 725 bytes from disk 16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce 16/03/27 22:08:33 INFO mapred.Merger: Merging 1 sorted segments 16/03/27 22:08:33 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 711 bytes 16/03/27 22:08:33 INFO mapred.LocalJobRunner: 2 / 2 copied. 16/03/27 22:08:33 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 16/03/27 22:08:33 INFO mapreduce.Job: Job job_local574821361_0001 running in uber mode : false 16/03/27 22:08:33 INFO mapreduce.Job: map 100% reduce 0% 16/03/27 22:08:34 INFO mapred.Task: Task:attempt_local574821361_0001_r_000000_0 is done. And is in the process of committing 16/03/27 22:08:34 INFO mapred.LocalJobRunner: 2 / 2 copied. 16/03/27 22:08:34 INFO mapred.Task: Task attempt_local574821361_0001_r_000000_0 is allowed to commit now 16/03/27 22:08:34 INFO output.FileOutputCommitter: Saved output of task 'attempt_local574821361_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local574821361_0001_r_000000 16/03/27 22:08:34 INFO mapred.LocalJobRunner: reduce > reduce 16/03/27 22:08:34 INFO mapred.Task: Task 'attempt_local574821361_0001_r_000000_0' done. 16/03/27 22:08:34 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_r_000000_0 16/03/27 22:08:34 INFO mapred.LocalJobRunner: reduce task executor complete. 16/03/27 22:08:34 INFO mapreduce.Job: map 100% reduce 100% 16/03/27 22:08:34 INFO mapreduce.Job: Job job_local574821361_0001 completed successfully 16/03/27 22:08:34 INFO mapreduce.Job: Counters: 35 File System Counters FILE: Number of bytes read=36102 FILE: Number of bytes written=869331 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2237 HDFS: Number of bytes written=669 HDFS: Number of read operations=28 HDFS: Number of large read operations=0 HDFS: Number of write operations=5 Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=693 Map output materialized bytes=731 Input split bytes=519 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=731 Reduce input records=13 Reduce output records=9 Spilled Records=26 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=0 Total committed heap usage (bytes)=938999808 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=669Step 12. Verify the output
That's it.
Download the complete example from here Source Code
Sponsored Links
0 comments:
Post a Comment