文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

[离线计算-Spark|Hive] HDFS小文件处理

2021-05-27 01:17

关注

[离线计算-Spark|Hive]  HDFS小文件处理

本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并处理.

背景

HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.

大量的小文件也会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭.

小文件解决思路

通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。

总体流程如下:

TLYig1.png

该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.

Hudi小文件处理

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用

在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小

hudi 小文件处理流程:

7rIbi8.png

每次写入都会遵循此过程,以确保Hudi表中没有小文件。

核心代码:

写入文件分配:

org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts

 //获取分区路径
 Set partitionPaths = profile.getPartitionPaths();

 //根据先前提交期间写入的记录获取平均记录大小。用于估计有多少记录打包到一个文件中。
 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);
 
 LOG.info("AvgRecordSize => " + averageRecordSize);

 //获取每个分区文件路径下小文件
 Map> partitionSmallFilesMap =
        getSmallFilesForPartitions(new ArrayList(partitionPaths), jsc);


for (String partitionPath : partitionPaths) {
     ...
    
     List smallFiles = partitionSmallFilesMap.get(partitionPath);
    //未分配的写入记录
    long totalUnassignedInserts = pStat.getNumInserts();  

    ...

    for (SmallFile smallFile : smallFiles) {
      //hoodie.parquet.max.file.size 数据文件最大大小,Hudi将试着维护文件大小到该指定值
      //算出数据文件大小 - 小文件 就是剩余可以写入文件大小, 除以平均记录大小就是插入的记录行数      
      long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);

        //分配记录到小文件中
        if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
            // create a new bucket or re-use an existing bucket
            int bucket;
            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
            } else {
              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
            }
            bucketNumbers.add(bucket);
            recordsPerBucket.add(recordsToAppend);
            //减去已经分配的记录数
            totalUnassignedInserts -= recordsToAppend;
          }  


        //如果记录没有分配完
        if (totalUnassignedInserts > 0) {
            //hoodie.copyonwrite.insert.split.size 每个分区条数
            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
            //是否自动计算每个分区条数
            if (config.shouldAutoTuneInsertSplits()) {
                insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
            }

           //计算要创建的bucket
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); 
           
          ...
          
          for (int b = 0; b < insertBuckets; b++) {
            bucketNumbers.add(totalBuckets);
            if (b == insertBuckets - 1) {
              //针对最后一个buket处理,就是写完剩下的记录
              recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
            } else {
              recordsPerBucket.add(insertRecordsPerBucket);
            }
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.INSERT;
            bucketInfo.partitionPath = partitionPath;
            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
            bucketInfoMap.put(totalBuckets, bucketInfo);
            totalBuckets++;
          }

        }

    }

}


获取每个分区路径下小文件:
org.apache.hudi.table.action.commit.UpsertPartitioner#getSmallFiles

 if (!commitTimeline.empty()) { // if we have some commits
      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
      List allFiles = table.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

      for (HoodieBaseFile file : allFiles) {

        //获取小于 hoodie.parquet.small.file.limit 参数值就为小文件
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
          String filename = file.getFileName();
          SmallFile sf = new SmallFile();
          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
          sf.sizeBytes = file.getFileSize();
          smallFileLocations.add(sf);
        }
      }
    }

UpsertPartitioner继承spark的Partitioner, hudi在写入的时候会利用spark 自定分区的机制优化记录分配到不同文件的能力, 从而达到在写入时不断优化解决小文件问题.

涉及到的关键配置:

在hudi写入时候如何使用、配置参数?

在写入hudi的代码中 .option中配置上述参数大小,如下:

.option(HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES, 120 * 1024 * 1024)

总结

本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并处理.

参考

  1. https://www.cnblogs.com/leesf456/p/14642991.html
本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师
阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯