文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

hadoop如何自定义格式化输出

2023-06-02 21:54

关注

这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat {static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(CustomizeOutputFormat.class);job.setMapperClass(CustMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//此处这只自定义的格式化输出job.setOutputFormatClass(CustOutputFormat.class);String jobName = "Customize outputformat test!";job.setJobName(jobName);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean b = job.waitForCompletion(true);if(b) {LOG.info("Job "+ jobName +" is done.");}else {LOG.info("Job "+ jobName +"is going wrong,now exit.");System.exit(0);}}}class CustMapper extends Mapper<LongWritable, Text, Text, Text>{String[] textIn = null;Text outkey = new Text();Text outvalue = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {textIn = value.toString().split("\t");outkey.set(textIn[0]);outvalue.set(textIn[1]);context.write(outkey, outvalue);}}//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat<Text, Text>{@Overridepublic RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {//获得configrationConfiguration conf = context.getConfiguration();//获得FileSystemFileSystem fs =  FileSystem.newInstance(conf);//获得输出路径Path path = CustOutputFormat.getOutputPath(context);URI uri = path.toUri();//创建两个文件,得到写入流FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));//创建自定义RecordWriter  传入 两个流CustRecordWriter rw = new CustRecordWriter(foa,fob);return rw;}class CustRecordWriter extends RecordWriter<Text, Text>{ FSDataOutputStream foa = null; FSDataOutputStream fob = null;CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){this.foa = foa;this.fob = fob;}@Overridepublic void write(Text key, Text value) throws IOException, InterruptedException {String mText  = key.toString();//根据可以长度的不同分别输入到不同的文件if(mText.length()>=5) {foa.writeUTF(mText+"\t"+value.toString()+"\n");}else {fob.writeUTF(mText+"\t"+value.toString()+"\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {//最后将两个写入流关闭if(foa!=null) {foa.close();}if(fob!=null) {fob.close();}}}}//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SortDriver.class);job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(SortBean.class);job.setOutputValueClass(NullWritable.class);MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);//FileInputFormat.setInputPaths(job, new Path("/sort"));FileOutputFormat.setOutputPath(job, new Path("/sortresult"));System.exit(job.waitForCompletion(true)==true?0:1);}}

感谢各位的阅读!关于“hadoop如何自定义格式化输出”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯