小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain {private static final Log LOG = LogFactory.getLog(PickMain.class);public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//下面通过使用ContolledJob和JobControl来实现提交多个作业Configuration conf = new Configuration();Job job1 = Job.getInstance(conf);job1.setJarByClass(PickMain.class);job1.setMapperClass(FindMapper.class);job1.setReducerClass(FindReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job1, new Path(args[0]));FileOutputFormat.setOutputPath(job1, new Path(args[1]));Configuration conf2 = new Configuration();Job job2 = Job.getInstance(conf2);job2.setJarByClass(PickMain.class);job2.setMapperClass(SecondFindMapper.class);job2.setReducerClass(SecondFindReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job2, new Path(args[1]));FileOutputFormat.setOutputPath(job2, new Path(args[2]));//创建ControlledJob对job进行包装ControlledJob cjob1 = new ControlledJob(conf);ControlledJob cjob2 = new ControlledJob(conf2);cjob1.setJob(job1);cjob2.setJob(job2);//设置依赖关系,这个时候只有等到job1执行完成后job2才会执行cjob2.addDependingJob(cjob1);//JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动JobControl jc = new JobControl("my_jobcontrol");jc.addJob(cjob1);jc.addJob(cjob2);Thread th = new Thread(jc);th.start();//等到所有的job都执行完成后在退出while(!jc.allFinished()) {Thread.sleep(5000);}System.exit(0);}}class FindMapper extends Mapper<LongWritable, Text, Text, Text>{Text m1 = new Text();Text m2 = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String line = value.toString();String[] tmp1 = line.split(":");String outval = tmp1[0];String[] outkeys = tmp1[1].split(",");for(int i = 0 ; i<outkeys.length;i++) {m1.set(outkeys[i]);m2.set(outval);context.write(m1,m2);}}}class FindReducer extends Reducer<Text, Text, Text, NullWritable>{StringBuilder sb = new StringBuilder();NullWritable nul = NullWritable.get();Text outval = new Text();String spector = ":";@Overrideprotected void reduce(Text txt, Iterable<Text> txtiter, Reducer<Text, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {sb.delete(0, sb.length());sb.append(txt.toString());Iterator<Text> it = txtiter.iterator();while(it.hasNext()) {sb.append(spector+it.next().toString());}outval.set(sb.toString());context.write(outval, nul);}}class SecondFindMapper extends Mapper<LongWritable, Text, Text, Text>{Text keyout = new Text();Text valueout = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String[] fs = value.toString().split(":");valueout.set(fs[0]);if(fs.length>0) {for(int i = 1;i<fs.length-1;i++) {for(int j = i+1;j<fs.length;j++) {if((int)fs[i].toCharArray()[0]>(int)fs[j].toCharArray()[0]) {keyout.set(fs[j]+"-"+fs[i]);}else {keyout.set(fs[i]+"-"+fs[j]);}context.write(keyout, valueout);}}}}}class SecondFindReducer extends Reducer<Text, Text, Text, Text>{StringBuilder sb = new StringBuilder();Text outvalue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> iter, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {sb.delete(0, sb.length());Iterator<Text> it = iter.iterator();if(it.hasNext()) {sb.append(it.next().toString());}while(it.hasNext()) {sb.append(","+it.next().toString());}outvalue.set(sb.toString());context.write(key, outvalue);}}
以上是“hadoop中mapreduce如何实现串联执行”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网行业资讯频道!