Multiple Input Files-Example Prog
we also offer , online and classroom trainings
we support in POC
author: Bharat (sree ram)
contact : 9640892992
________________________________________________________________________
File1:emp1.txt
................
101,f,3000
102,m,4000
103,f,5000
104,m,5000
105,m,9000
................
File2:emp2.txt
................
201,aaaa,m,10000,11
202,b,m,30000,11
203,c,f,6000,14
204,dd,f,90000,14
205,ee,m,10000,13
206,ff,f,10000,13
207,mm,m,30000,15
.............................................................................
MapReduce
program:
.............................................................................
package
my.map.red;
import
java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer;
//import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public
class MultipleFiles
{
public static class Map1 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k, Text v, Context con) throws IOException,
InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[1];
int sal=Integer.parseInt(words[2]);
con.write(new Text(sex), new
IntWritable(sal));
}
}
public static class Map2 extends
Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k, Text v, Context con) throws IOException,
InterruptedException
{
String line=v.toString();
String[] words=line.split(",");
String sex=words[2];
int sal=Integer.parseInt(words[3]);
con.write(new Text(sex), new
IntWritable(sal));
}
}
public static class Red extends
Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text sex, Iterable<IntWritable> salaries,
Context con)
throws IOException , InterruptedException
{
int tot=0;
for(IntWritable sal:salaries)
{
tot+=sal.get();
}
con.write(sex, new IntWritable(tot));
}
}
public static void main(String[] args) throws
Exception
{
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
Job j = new Job(c,"multiple");
j.setJarByClass(MultipleFiles.class);
j.setMapperClass(Map1.class);
j.setMapperClass(Map2.class);
j.setReducerClass(Red.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(j, p1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(j,p2, TextInputFormat.class, Map2.class);
FileOutputFormat.setOutputPath(j, p3);
System.exit(j.waitForCompletion(true) ?
0:1);
}
}
very nice sir
ReplyDeleteThis is detailed and helped me a lot
ReplyDelete