相关文章推荐
侠义非凡的硬盘  ·  在sed插入语句中输出制表位 \t_sed ...·  1 年前    · 
玉树临风的炒粉  ·  主流数据文件类型(.dat/.txt/.js ...·  1 年前    · 
冲动的鸵鸟  ·  ggplot2默认字体修改_ggplot2字 ...·  2 年前    · 
慷慨的柑橘  ·  「docker社区版本」相关问答|文档|产品 ...·  2 年前    · 
小百科  ›  [离线求解-Spark|Hive] HDFS大点文件预处理-腾讯云开发者社区-腾讯云
hive spark hdfs
谦逊的硬币
1 年前
作者头像
awwewwbbb
0 篇文章

[离线计算-Spark|Hive] HDFS小文件处理

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP
返回腾讯云官网
社区首页 > 专栏 > chaplinthink的专栏 > [离线计算-Spark|Hive] HDFS小文件处理

[离线计算-Spark|Hive] HDFS小文件处理

作者头像
awwewwbbb
发布 于 2022-04-27 09:08:00
451 0
发布 于 2022-04-27 09:08:00
举报

背景

HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.

大量的小文件也会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭.

小文件解决思路

通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。

总体流程如下:

TLYig1.png
TLYig1.png

该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.

Hudi小文件处理

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用

在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小

hudi 小文件处理流程:

7rIbi8.png
7rIbi8.png

每次写入都会遵循此过程,以确保Hudi表中没有小文件。

核心代码:

写入文件分配:

org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts

 //获取分区路径
 Set<String> partitionPaths = profile.getPartitionPaths();
 //根据先前提交期间写入的记录获取平均记录大小。用于估计有多少记录打包到一个文件中。
 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);
 LOG.info("AvgRecordSize => " + averageRecordSize);
 //获取每个分区文件路径下小文件
 Map<String, List<SmallFile>> partitionSmallFilesMap =
        getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc);
for (String partitionPath : partitionPaths) {
     List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
    //未分配的写入记录
    long totalUnassignedInserts = pStat.getNumInserts();  
    for (SmallFile smallFile : smallFiles) {
      //hoodie.parquet.max.file.size 数据文件最大大小,Hudi将试着维护文件大小到该指定值
      //算出数据文件大小 - 小文件 就是剩余可以写入文件大小, 除以平均记录大小就是插入的记录行数      
      long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);
        //分配记录到小文件中
        if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
            // create a new bucket or re-use an existing bucket
            int bucket;
            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
            } else {
              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
            bucketNumbers.add(bucket);
            recordsPerBucket.add(recordsToAppend);
            //减去已经分配的记录数
            totalUnassignedInserts -= recordsToAppend;
        //如果记录没有分配完
        if (totalUnassignedInserts > 0) {
            //hoodie.copyonwrite.insert.split.size 每个分区条数
            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
            //是否自动计算每个分区条数
            if (config.shouldAutoTuneInsertSplits()) {
                insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
           //计算要创建的bucket
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); 
          for (int b = 0; b < insertBuckets; b++) {
            bucketNumbers.add(totalBuckets);
            if (b == insertBuckets - 1) {
              //针对最后一个buket处理,就是写完剩下的记录
              recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
            } else {
              recordsPerBucket.add(insertRecordsPerBucket);
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.INSERT;
            bucketInfo.partitionPath = partitionPath;
            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
            bucketInfoMap.put(totalBuckets, bucketInfo);
            totalBuckets++;
}

获取每个分区路径下小文件: org.apache.hudi.table.action.commit.UpsertPartitioner#getSmallFiles

 if (!commitTimeline.empty()) { // if we have some commits
      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
      List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
      for (HoodieBaseFile file : allFiles) {
        //获取小于 hoodie.parquet.small.file.limit 参数值就为小文件
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
          String filename = file.getFileName();
          SmallFile sf = new SmallFile();
          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
          sf.sizeBytes = file.getFileSize();
 
推荐文章
侠义非凡的硬盘  ·  在sed插入语句中输出制表位 \t_sed -i '5i-CSDN博客
1 年前
玉树临风的炒粉  ·  主流数据文件类型(.dat/.txt/.json/.csv)导入到python - KévinX - 博客园
1 年前
冲动的鸵鸟  ·  ggplot2默认字体修改_ggplot2字体改成newroman_schneesnow的博客-CSDN博客
2 年前
慷慨的柑橘  ·  「docker社区版本」相关问答|文档|产品|活动 - 七牛云
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
小百科 - 百科知识指南
© 2024 ~ 沪ICP备11025650号