文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

2023-06-02 19:49

关注

小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

//map读入的键package hgs.combinefileinputformat.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineFileKey implements  WritableComparable<CombineFileKey> {private String fileName;private long offset;public String getFileName() {return fileName;}public void setFileName(String fileName) {this.fileName = fileName;}public long getOffset() {return offset;}public void setOffset(long offset) {this.offset = offset;}@Overridepublic void readFields(DataInput input) throws IOException {this.fileName = Text.readString(input);this.offset = input.readLong();}@Overridepublic void write(DataOutput output) throws IOException {Text.writeString(output, fileName);output.writeLong(offset);}@Overridepublic int compareTo(CombineFileKey obj) {int f = this.fileName.compareTo(obj.fileName);if(f==0)return (int)Math.signum((double)(this.offset-obj.offset));return f;}@Overridepublic int hashCode() {//摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/final int prime = 31;    int result = 1;    result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());    result = prime * result + (int) (offset ^ (offset >>> 32));    return result;}@Overridepublic boolean equals(Object o) {if(o instanceof CombineFileKey)return this.compareTo((CombineFileKey)o)==0;return false;}}
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.util.LineReader;public class CombineFileReader extends RecordReader<CombineFileKey, Text>{private long startOffset; //offset of the chunk;private long end; //end of the chunk;private long position; // current posprivate FileSystem fs;private Path path; private CombineFileKey key;private Text value;private FSDataInputStream input;private LineReader reader;public CombineFileReader(CombineFileSplit split,TaskAttemptContext context ,Integer index) throws IOException {//初始化path fs startOffset endthis.path = split.getPath(index);this.fs = this.path.getFileSystem(context.getConfiguration());this.startOffset = split.getOffset(index);this.end = split.getLength()+this.startOffset;//判断现在开始的位置是否在一行的内部boolean skipFirstLine = false;//open the filethis.input = fs.open(this.path);//不等于0说明读取位置在一行的内部if(this.startOffset !=0 ){skipFirstLine = true;--(this.startOffset);//定位到开始读取的位置this.input.seek(this.startOffset);}//初始化readerthis.reader = new LineReader(input);if(skipFirstLine){ // skip first line and re-establish "startOffset".//这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行//将其实位置调整到一行的开始,这样的话会舍弃部分数据this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min             ((long)Integer.MAX_VALUE, this.end - this.startOffset));}this.position = this.startOffset;}@Overridepublic void close() throws IOException {}@Overridepublic void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {}//返回当前的key@Overridepublic CombineFileKey getCurrentKey() throws IOException, InterruptedException {return key;}//返回当前的value@Overridepublic Text getCurrentValue() throws IOException, InterruptedException {return value;}//执行的进度@Overridepublic float getProgress() throws IOException, InterruptedException {//返回的类型为floatif(this.startOffset==this.end){return 0.0f;}else{return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset));}}//该方法判断是否有下一个key value@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//对key和value初始化if(this.key == null){this.key = new CombineFileKey();this.key.setFileName(this.path.getName());}this.key.setOffset(this.position);if(this.value == null){this.value = new Text();}//读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成int newSize = 0;if(this.position<this.end){newSize = reader.readLine(this.value);position += newSize;}//没有数据,将key value置位空if(newSize == 0){this.key = null;this.value = null;return false;}else{return true;}}}
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;public class CustCombineInputFormat extends CombineFileInputFormat<CombineFileKey, Text> {public CustCombineInputFormat(){super();//最大切片大小this.setMaxSplitSize(67108864);//64 MB}@Overridepublic RecordReader<CombineFileKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {return new CombineFileRecordReader<CombineFileKey, Text>((CombineFileSplit)split,context,CombineFileReader.class);}@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}}//驱动类package hgs.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.combinefileinputformat.test.CustCombineInputFormat;public class LetterCountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//conf.set("mapreduce.map.log.level", "INFO");///conf.set("mapreduce.reduce.log.level", "INFO");Job job = Job.getInstance(conf, "LetterCount");job.setJarByClass(hgs.test.LetterCountDriver.class);// TODO: specify a mapperjob.setMapperClass(LetterCountMapper.class);// TODO: specify a reducerjob.setReducerClass(LetterReducer.class);// TODO: specify output typesjob.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);if(args[0].equals("1"))job.setInputFormatClass(CustCombineInputFormat.class);else{}// TODO: specify input and output DIRECTORIES (not files)FileInputFormat.setInputPaths(job, new Path("/words"));FileOutputFormat.setOutputPath(job, new Path("/result"));if (!job.waitForCompletion(true))return;}}

hdfs文件:

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

运行结果:不使用自定义的:CustCombineInputFormat

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

运行结果:在使用自定义的:CustCombineInputFormat

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

以上是“hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯