Multiple Input Files-Example Prog




we also offer , online and classroom trainings
we support in POC
author: Bharat (sree ram)
contact : 9640892992
________________________________________________________________________

File1:emp1.txt
................
101,f,3000
102,m,4000
103,f,5000
104,m,5000
105,m,9000
................
File2:emp2.txt
................
201,aaaa,m,10000,11
202,b,m,30000,11
203,c,f,6000,14
204,dd,f,90000,14
205,ee,m,10000,13
206,ff,f,10000,13
207,mm,m,30000,15

.............................................................................
MapReduce program:
.............................................................................
package my.map.red;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MultipleFiles
{
  public static class Map1 extends Mapper<LongWritable,Text,Text,IntWritable>
  {
              public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
              {
                           String line=v.toString();
                           String[] words=line.split(",");
                           String sex=words[1];
                           int sal=Integer.parseInt(words[2]);
                           con.write(new Text(sex), new IntWritable(sal));
              }
  }
  public static class Map2 extends Mapper<LongWritable,Text,Text,IntWritable>
  {
              public void map(LongWritable k, Text v, Context con) throws IOException, InterruptedException
              {
                           String line=v.toString();
                           String[] words=line.split(",");
                           String sex=words[2];
                           int sal=Integer.parseInt(words[3]);
                           con.write(new Text(sex), new IntWritable(sal));
              }
  }
  public static class Red extends Reducer<Text,IntWritable,Text,IntWritable>
  {
               public void reduce(Text sex, Iterable<IntWritable> salaries, Context con)
                throws IOException , InterruptedException
                {
                            int tot=0;
                            for(IntWritable sal:salaries)
                            {
                                    tot+=sal.get();
                            }
                            con.write(sex, new IntWritable(tot));
                       
                }
   }
  public static void main(String[] args) throws Exception
  {
              Configuration c=new Configuration();
              String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
              Path p1=new Path(files[0]);
              Path p2=new Path(files[1]);
              Path p3=new Path(files[2]);
              Job j = new Job(c,"multiple");
              j.setJarByClass(MultipleFiles.class);
              j.setMapperClass(Map1.class);
              j.setMapperClass(Map2.class);
              j.setReducerClass(Red.class);
              j.setOutputKeyClass(Text.class);
              j.setOutputValueClass(IntWritable.class);
              MultipleInputs.addInputPath(j, p1, TextInputFormat.class, Map1.class);
              MultipleInputs.addInputPath(j,p2, TextInputFormat.class, Map2.class);
      FileOutputFormat.setOutputPath(j, p3);
      System.exit(j.waitForCompletion(true) ? 0:1);
             
  }

}

2 comments: