文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

hadoop中mapreduce如何实现串联执行

2023-06-02 21:56

关注

小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain {private static final Log LOG = LogFactory.getLog(PickMain.class);public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//下面通过使用ContolledJob和JobControl来实现提交多个作业Configuration conf = new Configuration();Job job1 = Job.getInstance(conf);job1.setJarByClass(PickMain.class);job1.setMapperClass(FindMapper.class);job1.setReducerClass(FindReducer.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job1, new Path(args[0]));FileOutputFormat.setOutputPath(job1, new Path(args[1]));Configuration conf2 = new Configuration();Job job2 = Job.getInstance(conf2);job2.setJarByClass(PickMain.class);job2.setMapperClass(SecondFindMapper.class);job2.setReducerClass(SecondFindReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job2, new Path(args[1]));FileOutputFormat.setOutputPath(job2, new Path(args[2]));//创建ControlledJob对job进行包装ControlledJob cjob1 = new ControlledJob(conf);ControlledJob cjob2 = new ControlledJob(conf2);cjob1.setJob(job1);cjob2.setJob(job2);//设置依赖关系,这个时候只有等到job1执行完成后job2才会执行cjob2.addDependingJob(cjob1);//JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动JobControl jc = new JobControl("my_jobcontrol");jc.addJob(cjob1);jc.addJob(cjob2);Thread th = new Thread(jc);th.start();//等到所有的job都执行完成后在退出while(!jc.allFinished()) {Thread.sleep(5000);}System.exit(0);}}class FindMapper extends Mapper<LongWritable, Text, Text, Text>{Text m1 = new Text();Text m2 = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String line = value.toString();String[] tmp1 = line.split(":");String outval = tmp1[0];String[] outkeys = tmp1[1].split(",");for(int i = 0 ; i<outkeys.length;i++) {m1.set(outkeys[i]);m2.set(outval);context.write(m1,m2);}}}class FindReducer extends Reducer<Text, Text, Text, NullWritable>{StringBuilder sb = new StringBuilder();NullWritable nul = NullWritable.get();Text outval = new Text();String spector = ":";@Overrideprotected void reduce(Text txt, Iterable<Text> txtiter, Reducer<Text, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {sb.delete(0, sb.length());sb.append(txt.toString());Iterator<Text> it = txtiter.iterator();while(it.hasNext()) {sb.append(spector+it.next().toString());}outval.set(sb.toString());context.write(outval, nul);}}class SecondFindMapper extends Mapper<LongWritable, Text, Text, Text>{Text keyout = new Text();Text valueout = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {String[] fs = value.toString().split(":");valueout.set(fs[0]);if(fs.length>0) {for(int i = 1;i<fs.length-1;i++) {for(int j = i+1;j<fs.length;j++) {if((int)fs[i].toCharArray()[0]>(int)fs[j].toCharArray()[0]) {keyout.set(fs[j]+"-"+fs[i]);}else {keyout.set(fs[i]+"-"+fs[j]);}context.write(keyout, valueout);}}}}}class  SecondFindReducer extends Reducer<Text, Text, Text, Text>{StringBuilder sb = new StringBuilder();Text outvalue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> iter, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {sb.delete(0, sb.length());Iterator<Text> it =  iter.iterator();if(it.hasNext()) {sb.append(it.next().toString());}while(it.hasNext()) {sb.append(","+it.next().toString());}outvalue.set(sb.toString());context.write(key, outvalue);}}

以上是“hadoop中mapreduce如何实现串联执行”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯