Example2:( For each city , what is total salary.)
Example 2: Emp2.java
_____________________________________________________________________________________
input Hdfs file: mr/emp
____________________
[training@localhost ~]$ hadoop fs -cat mr/emp
101,ravi,10000,hyd,m,11
102,rani,12000,pune,f,12
103,ravina,13000,hyd,f,13
104,rana,14000,hyd,m,11
105,roopa,15000,pune,f,12
106,razeena,16000,pune,f,12
107,susma,14000,hyd,f,12
108,sampurnesh,20000,delhi,m,13
109,samantha,18000,pune,f,12
110,kamal,19000,delhi,m,11
111,krupa,21000,delhi,m,11
112,kapoor,16000,pune,m,12
schema : ecode, name, sal, city, sex, dno
Task: for each city , what is total salary.
Sql : select city , sum(sal) from emp group by city;
__________
Emp1.java
__________
package bharath.sreeram.big.halitcs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Emp2
{
public static class MyMap extends Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,Text v, Context con)
throws IOException, InterruptedException
{
String line = v.toString();
String[] w=line.split(",");
String city=w[3];
int sal=Integer.parseInt(w[2]);
con.write(new Text(city), new IntWritable(sal));
}
}
public static class MyRed extends Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable> vlist, Context con)
throws IOException , InterruptedException
{
int tot =0;
for(IntWritable v:vlist)
tot+=v.get();
con.write(k,new IntWritable(tot));
}
}
public static void main(String[] args) throws Exception
{
Configuration c = new Configuration();
Job j= new Job(c,"Test");
j.setJarByClass(Emp2.class);
j.setMapperClass(MyMap.class);
j.setReducerClass(MyRed.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]);
Path p2 = new Path(args[1]);
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j,p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
Submitting Hadoop Job:
$ hadoop jar Desktop/mr.jar bharath.sreeram.big.halitcs.Emp2 mr/emp Result2
output of the Job:
$ hadoop fs -cat Result2/part-r-00000
delhi 60000
hyd 51000
pune 77000
_____________________________________________________________________________________
input Hdfs file: mr/emp
____________________
[training@localhost ~]$ hadoop fs -cat mr/emp
101,ravi,10000,hyd,m,11
102,rani,12000,pune,f,12
103,ravina,13000,hyd,f,13
104,rana,14000,hyd,m,11
105,roopa,15000,pune,f,12
106,razeena,16000,pune,f,12
107,susma,14000,hyd,f,12
108,sampurnesh,20000,delhi,m,13
109,samantha,18000,pune,f,12
110,kamal,19000,delhi,m,11
111,krupa,21000,delhi,m,11
112,kapoor,16000,pune,m,12
schema : ecode, name, sal, city, sex, dno
Task: for each city , what is total salary.
Sql : select city , sum(sal) from emp group by city;
__________
Emp1.java
__________
package bharath.sreeram.big.halitcs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Emp2
{
public static class MyMap extends Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable k,Text v, Context con)
throws IOException, InterruptedException
{
String line = v.toString();
String[] w=line.split(",");
String city=w[3];
int sal=Integer.parseInt(w[2]);
con.write(new Text(city), new IntWritable(sal));
}
}
public static class MyRed extends Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text k, Iterable<IntWritable> vlist, Context con)
throws IOException , InterruptedException
{
int tot =0;
for(IntWritable v:vlist)
tot+=v.get();
con.write(k,new IntWritable(tot));
}
}
public static void main(String[] args) throws Exception
{
Configuration c = new Configuration();
Job j= new Job(c,"Test");
j.setJarByClass(Emp2.class);
j.setMapperClass(MyMap.class);
j.setReducerClass(MyRed.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
Path p1 = new Path(args[0]);
Path p2 = new Path(args[1]);
FileInputFormat.addInputPath(j,p1);
FileOutputFormat.setOutputPath(j,p2);
System.exit(j.waitForCompletion(true) ? 0:1);
}
}
Submitting Hadoop Job:
$ hadoop jar Desktop/mr.jar bharath.sreeram.big.halitcs.Emp2 mr/emp Result2
output of the Job:
$ hadoop fs -cat Result2/part-r-00000
delhi 60000
hyd 51000
pune 77000
Mapreduce Programming blogs are very gud specially example 1 to 9 . Sir i need some more usecases for insurance domain of mapreduce program.
ReplyDeleteTables in my projects are:-
transactions, claims, customers, dealers, parts,vendor, labor