Writing a Hadoop MapReduce task in Java

16 Aug

Hadoop
Although Hadoop Framework itself is created with Java the MapReduce jobs can be written in many different languages. In this post I show how to create a MapReduce job in Java based on a Maven project like any other Java project.

  • Prepare the example input
  • Lets start with a fictional business case. In this case we need a CSV file with English words from a dictionary and all translations in other languages added to it, separated by a ‘|’ symbol. I have based this example on this post. So the job will read dictionaries of different languages and match each English word with a translation in another language. The input dictionaries for the job is taken from here. I downloaded a few files in different languages and put them together in one file (Hadoop is better to process one large file than multiple small ones). My example file can be found here.

  • Create the Java MapReduce project
  • Next step is creating the Java code for the MapReduce job. Like I said before I use a Maven project for this so I created a new empty Maven project in my IDE, IntelliJ. I modified the default pom to add the necessary plugins and dependencies:
    The dependency I added:

    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
       <version>1.2.0</version>
       <scope>provided</scope>
    </dependency>
    

    The Hadoop dependency is necessary to make use of the Hadoop classes in my MapReduce job. Since I want to run the job on AWS EMR I make sure I have a matching Hadoop version. Furthermore the scope can be set to ‘provided’ since the Hadoop framework will be available on the Hadoop cluster.
    Beside the dependency I added the following two plugins to the pom.xml:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <mainClass>net.pascalalma.hadoop.Dictionary</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
    </plugins>
    

    The first plugin is used to create an executable jar of our project. This makes the running of the JAR on the Hadoop cluster easier since we don’t have to state the main class.
    The second plugin is necessary to make the created JAR compatible with the instances of the AWS EMR cluster. This AWS cluster comes with a JDK 1.6. If you omit this one the cluster will fail (I got a message like ‘Unsupported major.minor version 51.0′). I will show later in another post how to setup this AWS EMR cluster.
    That is the basic project, just like a regular Java project. Lets implement the MapReduce jobs next.

  • Implement the MapReduce classes
  • I have described the functionality that we want to perform in the first step. To achieve this I created three Java classes in my Hadoop project. The first class is the ‘Mapper‘:

    package net.pascalalma.hadoop;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    /**
     * Created with IntelliJ IDEA.
     * User: pascal
     * Date: 16-07-13
     * Time: 12:07
     */
    public class WordMapper extends Mapper<Text,Text,Text,Text> {
    
        private Text word = new Text();
    
        public void map(Text key, Text value, Context context) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString(),",");
            while (itr.hasMoreTokens())
            {
                word.set(itr.nextToken());
                context.write(key, word);
            }
        }
    }
    

    This class isn’t very complicated. It just receives a row from the input file and creates a Map of it in which each key will have one value (and multiple keys are allowed at this stage).
    The next class is the ‘Reducer‘ which reduces the map to the wanted output:

    package net.pascalalma.hadoop;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Created with IntelliJ IDEA.
     * User: pascal
     * Date: 17-07-13
     * Time: 19:50
     */
    public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {
    
        private Text result = new Text();
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String translations = "";
    
            for (Text val : values) {
                translations += "|" + val.toString();
            }
    
            result.set(translations);
            context.write(key, result);
        }
    }
    

    This Reduce steps collects all values for a given key and put them after each other separated with a ‘|’ symbol.

    The final class left is the one that is putting it all together to make it a runnable job:

    package net.pascalalma.hadoop;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * Created with IntelliJ IDEA.
     * User: pascal
     * Date: 16-07-13
     * Time: 12:07
     */
    public class Dictionary {
    
        public static void main(String[] args) throws Exception
        {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "dictionary");
            job.setJarByClass(Dictionary.class);
            job.setMapperClass(WordMapper.class);
            job.setReducerClass(AllTranslationsReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileInputFormat.addInputPath(job, new Path(args[0])); 
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }
    

    In this main method we put together a Job and run it. Please note that I simply expect the args[0] and args[1] to be the name of the input file and output directory (non existing). I didn’t add any check for this. Here is my ‘Run Configuration’ in IntelliJ:
    Screen Shot 2013-08-15 at 21.36.35
    Just make sure the output directory is not existing at the time you run the class. The logging output created by the job looks like this:

    2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore
    aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
    WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
    WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
    WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
    INFO: Total input paths to process : 1
    aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
    WARNING: Snappy native library not loaded
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
    INFO: Running job: job_local_0001
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize
    INFO:  Using ResourceCalculatorPlugin : null
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
    INFO: io.sort.mb = 100
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
    INFO: data buffer = 79691776/99614720
    aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
    INFO: record buffer = 262144/327680
    aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
    INFO: Starting flush of map output
    aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
    INFO: Finished spill 0
    aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done
    INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
    INFO:  map 0% reduce 0%
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
    INFO: 
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone
    INFO: Task 'attempt_local_0001_m_000000_0' done.
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize
    INFO:  Using ResourceCalculatorPlugin : null
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
    INFO: 
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
    INFO: Merging 1 sorted segments
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
    INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes
    aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
    INFO: 
    aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done
    INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
    INFO: 
    aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit
    INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
    aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
    INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output
    aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
    INFO:  map 100% reduce 0%
    aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
    INFO: reduce > reduce
    aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone
    INFO: Task 'attempt_local_0001_r_000000_0' done.
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
    INFO:  map 100% reduce 100%
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
    INFO: Job complete: job_local_0001
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO: Counters: 17
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:   File Output Format Counters 
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Bytes Written=423039
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:   FileSystemCounters
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     FILE_BYTES_READ=1464626
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     FILE_BYTES_WRITTEN=1537251
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:   File Input Format Counters 
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Bytes Read=469941
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:   Map-Reduce Framework
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Reduce input groups=11820
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Map output materialized bytes=524414
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Combine output records=0
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Map input records=20487
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Reduce shuffle bytes=0
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Reduce output records=11820
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Spilled Records=43234
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Map output bytes=481174
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Total committed heap usage (bytes)=362676224
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Combine input records=0
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Map output records=21617
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     SPLIT_RAW_BYTES=108
    aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
    INFO:     Reduce input records=21617
    
    Process finished with exit code 0
    

    The output file created by this job can be found in the supplied output directory as can be seen in the next screenshot:
    Screen Shot 2013-08-15 at 21.42.49
    As you have seen we can run this main method in an IDE (or from the command line) but I would like to see some unit tests performed on the Mapper and Reducer before we go there. I will show this in another post how to do that.

5 Responses to “Writing a Hadoop MapReduce task in Java”

  1. Dan 08/11/2013 at 15:50 #

    Hey,

    Could you post up more complete look at your Pom file? I am not sure what the correct place to put the plug in section in.

    Great guide though!

    • Dan 08/11/2013 at 15:53 #

      Figured out I had to place the plug in section inside of

Trackbacks/Pingbacks

  1. Links & reads for 2013 Week 33 | Martin's Weekly Curations - 19/08/2013

    […] Writing a Hadoop MapReduce task in Java […]

  2. Unit testing a Java Hadoop job | The Pragmatic Integrator - 26/08/2013

    […] my previous post I showed how to setup a complete Maven based project to create a Hadoop job in Java. Of course it […]

  3. Run your Hadoop MapReduce job on Amazon EMR | The Pragmatic Integrator - 03/09/2013

    […] input for the job (see this for more info about this example job) we have to make the dictionary contents available for the EMR cluster. […]

Comments are closed.

Follow

Get every new post delivered to your Inbox.

Join 111 other followers

%d bloggers like this: