Hudi 源码 | Hudi 索引:Parquet 布隆过滤器写入过程

共 18574字,需浏览 38分钟

 ·

2024-07-25 14:16

前言

上篇文章提到 :索引的逻辑主要是根据 parquet 文件中保存的索引信息,判断记录是否存在,如果不存在,代表是新增数据,如果记录存在则代表是更新数据,需要找到并设置 currentLocation。对于布隆索引来说,这里的索引信息其实是布隆过滤器,本篇文章主要是先总结布隆过滤器是如何保存到 parquet 文件中的(主要是源码调用逻辑)。

我们新写一个表的时候,最开始是没有parquet文件的,所以顺序应该是先将布隆过滤器写到 parquet 文件中,下次写数据的时候,先读取 parquet 文件中的布隆过滤器来验证表中是否存在该记录,有助于实现高效的更新和删除操作。

索引

索引是一个关键的步骤,它验证表中是否存在记录,并有助于实现高效的更新和删除操作。请注意本文中涵盖的索引是为写入端准备的,这与读取端索引不同。

对于有的索引类型是不涉及将索引信息保存到文件中的,比如 简单索引(simple index),而对于Bloom索引(Bloom Index)则需要将布隆过滤器以及最大值最小值等信息写到 parquet 文件中的元数据中。

Bloom索引(Bloom Index)

Bloom Index 最大限度地减少了用于查找的键和文件的数量,同时保持了较低的读取成本。

Bloom Index 采用 2 阶段过滤来减少用于查找的键和文件的数量。

  • 第一阶段涉及将输入键与使用存储在基本文件页脚中的最小和最大记录键值构建的间隔树进行比较。超出这些范围的键代表新插入,而其余键则被视为下一阶段的候选键。

  • 第二阶段根据反序列化的 Bloom 过滤器检查候选键,这有助于确定明确不存在的键和可能存在的键。然后使用筛选后的键和关联的基文件执行实际的文件查找,这些基文件随后返回用于标记的键和位置元组。

请注意,查找之前的过滤过程仅涉及读取文件页脚,因此读取成本较低。

源码分析

主要步骤包含布隆过滤器(BloomFilter)的创建、更新、和持久化。

fileWriter

HoodieCreateHandle 和 HoodieMergeHandle 中的 fileWriter 均为 HoodieParquetWriter 。
都是在构造函数中调用 HoodieFileWriterFactory.getFileWriter 创建

HoodieFileWriterFactory

  public static <T extends HoodieRecordPayload, R extends IndexedRecord, I, K, O> HoodieFileWriter<R> getFileWriter(
      String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config, Schema schema,
      TaskContextSupplier taskContextSupplier)
 throws IOException 
{
    final String extension = FSUtils.getFileExtension(path.getName());
    if (PARQUET.getFileExtension().equals(extension)) {
      // config.populateMetaFields() 默认 true
      return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, config.populateMetaFields());
    }
    if (HFILE.getFileExtension().equals(extension)) {
      return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
    }
    if (ORC.getFileExtension().equals(extension)) {
      return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier);
    }
    throw new UnsupportedOperationException(extension + " format not supported yet.");
  }

  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
      String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
      TaskContextSupplier taskContextSupplier, boolean populateMetaFields)
 throws IOException 
{
    // enableBloomFilter 和 populateMetaFields 值一样,都为 true
    return newParquetFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier, populateMetaFields, populateMetaFields);
  }

  private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newParquetFileWriter(
      String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable,
      TaskContextSupplier taskContextSupplier, boolean populateMetaFields, boolean enableBloomFilter)
 throws IOException 
{
    // 创建布隆过滤器
    Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, filter);

    HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
        config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
        hoodieTable.getHadoopConf(), config.getParquetCompressionRatio());

    return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
  } 

populateMetaFields

启用时,填充所有元字段。禁用时,不会填充任何元字段,增量查询也不会起作用。这仅用于批处理的仅附加/不可变数据

简单说就是是否填充所有的元数据字段。
从上面的代码可以看到是否开启布隆过滤器和 populateMetaFields 的值一样。

  public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty
      .key("hoodie.populate.meta.fields")
      .defaultValue("true")
      .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated "
          + "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing");

  public boolean populateMetaFields() {
    return Boolean.parseBoolean(getStringOrDefault(HoodieTableConfig.POPULATE_META_FIELDS,
        HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
  }          

创建布隆过滤器

HoodieFileWriterFactory.createBloomFilter

  private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
    return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
            config.getDynamicBloomFilterMaxNumEntries(),
            config.getBloomFilterType());
  }

几个参数:

  • config.getBloomFilterNumEntries : BLOOM_FILTER_NUM_ENTRIES_VALUE  hoodie.index.bloom.num_entries 默认值 6000

  • config.getBloomFilterFPP :BLOOM_FILTER_FPP_VALUE hoodie.index.bloom.fpp 默认值 0.000000001

  • config.getDynamicBloomFilterMaxNumEntries :BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES hoodie.bloom.index.filter.dynamic.max.entries 默认值 100000

  • config.getBloomFilterType() :BLOOM_FILTER_TYPE hoodie.bloom.index.filter.type 默认值 BloomFilterTypeCode.DYNAMIC_V0.name

BloomFilterFactory.createBloomFilter

  public static BloomFilter createBloomFilter(int numEntries, double errorRate, int maxNumberOfEntries,
                                              String bloomFilterTypeCode)
 
{
    if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
      return new SimpleBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH);
    } else if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.DYNAMIC_V0.name())) {
      // 默认值 BloomFilterTypeCode.DYNAMIC_V0.name
      return new HoodieDynamicBoundedBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH, maxNumberOfEntries);
    } else {
      throw new IllegalArgumentException("Bloom Filter type code not recognizable " + bloomFilterTypeCode);
    }
  }

默认值 BloomFilterTypeCode.DYNAMIC_V0.name ,返回 HoodieDynamicBoundedBloomFilter

writeSupport

HoodieParquetWriter 中的 writeSupport 为 HoodieAvroWriteSupport

HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(hoodieTable.getHadoopConf()).convert(schema), schema, filter);

更新布隆过滤器

通过 HoodieAvroWriteSupport.add 更新布隆过滤器,将 recordKey 添加到 BloomFilter 中,调用链:

  • Insert : HoodieCreateHandle.write -> HoodieParquetWriter.writeAvro/writeAvroWithMetadata -> HoodieAvroWriteSupport.add

  • Upsert : HoodieMergeHandle.write -> HoodieParquetWriter.writeAvro/writeAvroWithMetadata-> HoodieAvroWriteSupport.add

  public void add(String recordKey) {
    if (bloomFilterOpt.isPresent()) {
      // HoodieDynamicBoundedBloomFilter.add -> InternalDynamicBloomFilter.add -> org.apache.hadoop.util.bloom.BloomFilter
      this.bloomFilterOpt.get().add(recordKey);
      if (minRecordKey != null) {
        minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
      } else {
        minRecordKey = recordKey;
      }

      if (maxRecordKey != null) {
        maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
      } else {
        maxRecordKey = recordKey;
      }
    }
  }

这里主要调用 HoodieDynamicBoundedBloomFilter.add 继而调用 InternalDynamicBloomFilter.add 最终将 recordKey 添加到 org.apache.hadoop.util.bloom.BloomFilter 中,然后更新最大值和最小值。

写布隆过滤器

Insert 和 Update 最终都通过调用 HoodieParquetWriter.close 实现将布隆过滤写到 Parquet Metadata :

writer.model.name : avro
org.apache.hudi.bloomfilter : bloomfilter
hoodie_min_record_key : minRecordKey
hoodie_max_record_key : maxRecordKey
hoodie_bloom_filter_type_code : DYNAMIC_V0

调用链:

  • Insert : BoundedInMemoryQueueConsumer.consume -> BoundedInMemoryQueueConsumer.finish -> CopyOnWriteInsertHandler.finish -> CopyOnWriteInsertHandler.closeOpenHandles -> HoodieCreateHandle.close -> HoodieParquetWriter.close
    -> org.apache.parquet.hadoop.ParquetWriter.close -> org.apache.parquet.hadoop.InternalParquetRecordWriter.close -> org.apache.parquet.hadoop.ParquetFileWriter.end -> ParquetFileWriter.serializeFooter

  • Upsert : BaseJavaCommitActionExecutor.handleUpdate -> handleUpdateInternal -> JavaMergeHelper.runMerge -> HoodieMergeHandle.close -> HoodieParquetWriter.close
    -> org.apache.parquet.hadoop.ParquetWriter.close -> org.apache.parquet.hadoop.InternalParquetRecordWriter.close -> org.apache.parquet.hadoop.ParquetFileWriter.end -> ParquetFileWriter.serializeFooter

HoodieParquetWriter.close

  public void close() throws IOException {
    try {
      writer.close();
    } catch (InterruptedException e) {
      throw new IOException(e);
    } finally {
      // release after the writer closes in case it is used for a last flush
      codecFactory.release();
    }
  }

InternalParquetRecordWriter.close

  public void close() throws IOException, InterruptedException {
    if (!closed) {
      flushRowGroupToStore();
      // 这里调用 HoodieAvroWriteSupport.finalizeWrite 获取布隆过滤器、最大值、最小值等。
      FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
      Map<String, String> finalMetadata = new HashMap<String, String>(extraMetaData);
      // 返回 avro
      String modelName = writeSupport.getName();
      if (modelName != null) {
        // writer.model.name : avro
        finalMetadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, modelName);
      }
      // 将 finalWriteContext.getExtraMetaData 添加到 finalMetadata 中
      finalMetadata.putAll(finalWriteContext.getExtraMetaData());
      // 调用 ParquetFileWriter.end 方法将 finalMetadata 写到 parquet 页脚中
      parquetFileWriter.end(finalMetadata);
      closed = true;
    }
  }

HoodieAvroWriteSupport.finalizeWrite

  @Override
  public WriteSupport.FinalizedWriteContext finalizeWrite() {
    HashMap<String, String> extraMetaData = new HashMap<>();
    if (bloomFilterOpt.isPresent()) {
      // org.apache.hudi.bloomfilter : bloomfilter (序列化的)
      extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString());
      if (minRecordKey != null && maxRecordKey != null) {
        // hoodie_min_record_key : minRecordKey
        // hoodie_max_record_key : maxRecordKey
        extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
        extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
      }
      // getBloomFilterTypeCode().name() : DYNAMIC_V0
      if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
        // hoodie_bloom_filter_type_code : DYNAMIC_V0
        extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name());
      }
    }
    // 将 extraMetaData 值赋给 WriteSupport.extraMetaData 并返回 FinalizedWriteContext
    return new WriteSupport.FinalizedWriteContext(extraMetaData);
  }
  }

writeSupport.getName -> HoodieAvroWriteSupport.getName -> AvroWriteSupport.getName

  @Override
  public String getName() {
    return "avro";
  }

ParquetFileWriter.end

  public void end(Map<String, String> extraMetaData) throws IOException {
    state = state.end();
    LOG.debug("{}: end", out.getPos());
    // 将 extraMetaData 赋给 FileMetaData.keyValueMetaData 
    this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
    serializeFooter(footer, out);
    out.close();
  }

ParquetFileWriter.serializeFooter

  private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
    long footerIndex = out.getPos();
    // 将 footer 转换为 org.apache.parquet.format.FileMetaData
    org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
    // 调用 writeFileMetaData 将 parquetMetadata 写到 parquet 元数据中
    writeFileMetaData(parquetMetadata, out);
    LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
    BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
    out.write(MAGIC);
  }

🧐 分享、点赞、在看,给个3连击👇

浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报