Hudi 源码 | Hudi 索引:Tag 和 TagLocation

HBase技术社区

共 53695字,需浏览 108分钟

 ·

2024-07-25 14:16

前言

上篇文章和之前的总结的源码文章,本文总结源码 tag/tagLocation ,对应功能:根据索引信息判断记录是否存在,如果不存在,代表是新增数据,如果记录存在则代表是更新数据,需要找到并设置 currentLocation。

tag

AbstractWriteHelper.tag

  private I tag(
      I dedupedRecords, HoodieEngineContext context, HoodieTable<T, I, K, O> table)
 
{
    // perform index loop up to get existing location of records
    // 执行索引循环以获取记录的现有位置
    // 对于 Java Client 这里 table 为 HoodieJavaCopyOnWriteTable
    return table.getIndex().tagLocation(dedupedRecords, context, table);
  }

table.getIndex()

对于 Java Client 这里 table 为 HoodieJavaCopyOnWriteTable , HoodieJavaCopyOnWriteTable.getIndex() -> HoodieJavaTable.getIndex

  protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
    return JavaHoodieIndex.createIndex(config);
  }

  public static HoodieIndex createIndex(HoodieWriteConfig config) {
    // first use index class config to create index.
    if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
      Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
      if (!(instance instanceof HoodieIndex)) {
        throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
      }
      return (JavaHoodieIndex) instance;
    }

    // TODO more indexes to be added
    // 从这里看出,当前版本(0.9.0),Java Client 只支持两种索引类型:INMEMORY 和 BLOOM
    switch (config.getIndexType()) {
      case INMEMORY:
        return new JavaInMemoryHashIndex(config);
      case BLOOM:
        return new JavaHoodieBloomIndex(config);
      default:
        throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
    }
  } 

因为指定了索引类型为 BLOOM , 所以这里返回 JavaHoodieBloomIndex 。

JavaHoodieBloomIndex

JavaHoodieBloomIndex 的父类为 HoodieBaseBloomIndex ,其 tagLocation 在父类 HoodieBaseBloomIndex

public class JavaHoodieBloomIndex<T extends HoodieRecordPayloadextends HoodieBaseBloomIndex<T{
  public JavaHoodieBloomIndex(HoodieWriteConfig config) {
    super(config);
  }
}

tagLocation

  1. 提取映射:Map(partitionPath, List )

  2. lookupIndex :根据索引查找每个 recordKey 的 location,返回 每个 recordKey 和 location 的对应关系: Map<HoodieKey, HoodieRecordLocation>

  3. tagLocationBacktoRecords :根据 lookupIndex 返回的 recordKey 和 location 的对应关系对应关系 keyFilenamePair ,为每个 HoodieRecord 设置 currentLocation 。

  @Override
  public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
                                           HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
    // Step 1: Extract out thinner Map of (partitionPath, recordKey)
    // 第一步:提取更薄的映射(partitionPath,recordKey)
    // (partitionPath, List(recordKey))
    Map<String, List<String>> partitionRecordKeyMap = new HashMap<>();
    records.forEach(record -> {
      // 如果包含记录对应的分区路径
      if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) {
        // 现有分区中对应的 List 添加 recordKey
        partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey());
      } else {
        // 将 recordKey 添加到 recordKeys 中
        List<String> recordKeys = Lists.newArrayList();
        recordKeys.add(record.getRecordKey());
        // 添加新的分区路径和对应的  List(recordKey)
        partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys);
      }
    });

    // Step 2: Lookup indexes for all the partition/recordkey pair
    // 第二步:根据索引查找每个 recordKey 的 location,返回 每个 recordKey 和 location 的对应关系
    // (HoodieKey, HoodieRecordLocation)
    Map<HoodieKey, HoodieRecordLocation> keyFilenamePairMap =
        lookupIndex(partitionRecordKeyMap, context, hoodieTable);

    if (LOG.isDebugEnabled()) {
      long totalTaggedRecords = keyFilenamePairMap.values().size();
      LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
    }

    // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
    // 第三步:通过与现有recordKey连接,将传入记录标记为插入或更新
    List<HoodieRecord<T>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records);

    return taggedRecords;
  }

lookupIndex

查找每个 recordKey 的 location,并返回已存在的所有 recordKey 和 location 的映射: Map<HoodieKey, HoodieRecordLocation>,如果不存在,则删除记录键。

  1. 在传入记录中获取每个分区的记录数 :recordsPerPartition

  2. loadInvolvedFiles :将所有涉及的文件加载为<Partition, filename>对

  3. explodeRecordsWithFileComparisons :利用区间树根据最大值最小值查找每个 HoodieKey 可能存在于哪个文件,返回:List<Pair<fileId, HoodieKey>> ,这里多个 fileId 对应一个 HoodieKey ,一个 fileId 对应多个HoodieKey,类似于笛卡尔积,多对多的关系,按照 fileId 排序

  4. findMatchingFilesForRecordKeys :遍历 explodeRecordsWithFileComparisons 返回的 List<Pair<fileId, HoodieKey>> ,以 fileId 为维度,利用布隆索引判断有哪些 HoodieKey 可能存在于该文件中,并添加到候选:candidateRecordKeys ,最后遍历 candidateRecordKeys ,去 parquet 数据文件中确认该 key 是否确实存在于该文件,最后返回确切的 recordKey 和 location 的映射关系 :Map<HoodieKey, HoodieRecordLocation>

  /**
   * Lookup the location for each record key and return the pair<record_key,location> for all record keys already
   * present and drop the record keys if not present.
   *
   * 查找每个 recordKey 的 location,并返回已存在的所有 recordKey 的 pair<record_key,location>,如果不存在,则删除记录键。
   */

  private Map<HoodieKey, HoodieRecordLocation> lookupIndex(
      Map<String, List<String>> partitionRecordKeyMap, final HoodieEngineContext context,
      final HoodieTable hoodieTable)
 
{
    // Obtain records per partition, in the incoming records
    // 在传入记录中获取每个分区的记录数
    Map<String, Long> recordsPerPartition = new HashMap<>();
    // (分区路径,每个分区路径对应的记录数)
    partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size())));
    // 所有的分区路径
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

    // Step 2: Load all involved files as <Partition, filename> pairs
    // 第二步:将所有涉及的文件加载为<Partition, filename>对
    // List(Partition, BloomIndexFileInfo) BloomIndexFileInfo 包含 fileID,minRecordKey,maxRecordKey
    List<Pair<String, BloomIndexFileInfo>> fileInfoList =
        loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable);
    // Map (Partition, List(BloomIndexFileInfo))
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));

    // Step 3: Obtain a List, for each incoming record, that already exists, with the file id,
    // that contains it.
    // 第三步:为每个已存在的传入记录获取一个列表,其中包含该列表的文件id。
    // List(fileId, HoodieKey) ,这里多个 fileId 对应一个 HoodieKey ,一个 fileId 对应多个HoodieKey
    // 类似于笛卡尔积,多对多的关系,按照 fileId 排序
    List<Pair<String, HoodieKey>> fileComparisons =
        explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap);
    return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable);
  }

loadInvolvedFiles

将所有涉及的文件加载为 List<Pair<partitionPath, BloomIndexFileInfo>>

  /**
   * Load all involved files as <Partition, filename> pair List.
   *
   * 将所有涉及的文件加载为<Partition,BloomIndexFileInfo>对列表。
   */

  //TODO duplicate code with spark, we can optimize this method later
  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieEngineContext context,
                                                           final HoodieTable hoodieTable) {
    // Obtain the latest data files from all the partitions.
    // 从所有分区中获取最新的数据文件。
    // List (partitionPath,FileId)
    List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
        .collect(toList());

    // 是否需要根据最大值最小值进行第一阶段过滤
    if (config.getBloomIndexPruneByRanges()) {// 默认true
      // also obtain file ranges, if range pruning is enabled
      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
      return context.map(partitionPathFileIDList, pf -> {
        try {
          HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
          // 读取最大值最小值,具体为 parquet 文件元数据中的 hoodie_min_record_key 、hoodie_max_record_key
          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
          // 返回 (partitionPath, (fileId, hoodie_min_record_key, hoodie_max_record_key))
          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
        } catch (MetadataNotFoundException me) {
          LOG.warn("Unable to find range metadata in file :" + pf);
          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
        }
      }, Math.max(partitionPathFileIDList.size(), 1));
    } else {
      // 返回 (partitionPath, (fileId, null, null))
      return partitionPathFileIDList.stream()
          .map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
    }

Interval Tree

Interval Tree :区间树 ,翻译软件一般翻译为间隔树。

  • 百度百科:区间树是在平衡树基础上进行扩展得到的支持以区间为元素的动态集合的操作,其中每个节点的关键值是区间的左端点。

  • 博客:区间树是在红黑树基础上进行扩展得到的支持以区间为元素的动态集合的操作,其中每个节点的关键值是区间的左端点。通过建立这种特定的结构,可是使区间的元素的查找和插入都可以在O(lgn)的时间内完成。相比于基础的红黑树数据结构,增加了一个max[x],即以x为根的子树中所有区间的断点的最大值

  • 请注意:区间树和线段树不一样,线段树是一种特殊的区间树。区间树:Interval Tree , 线段树:Segment Tree 。网上有很多博客将区间树和线段树归为一种。

  • 线段树:Segment Tree ,线段树是一种二叉搜索树,与区间树相似,它将一个区间划分成一些单元区间,每个单元区间对应线段树中的一个叶结点。使用线段树可以快速的查找某一个节点在若干条线段中出现的次数,时间复杂度为O(logN)。而未优化的空间复杂度为2N,实际应用时一般还要开4N的数组以免越界,因此有时需要离散化让空间压缩。

explodeRecordsWithFileComparisons

  • 首先判断是否需要使用区间树基于最小和最大记录键值进行过滤,默认为true,则创建 IntervalTreeBasedIndexFileFilter 。

  • IntervalTreeBasedIndexFileFilter :基于区间树的索引查找。为每个分区构建一个{@link KeyRangeLookupTree},并使用它来搜索需要查找的任何给定recordKey的匹配索引文件。

  • 主要逻辑:利用区间树根据最大值最小值,返回可能包含 recordKey 的文件列表。利用区间树的原因主要是可以降低查询时间。

  • 查询逻辑:

    • 对于有最大值和最小值的文件,如果该 recordKey 在最大值最小之区间内,则认为该文件可能包含 recordKey

    • 对于没有最大值和最小值的文件,则认为该文件可能包含 recordKey

  • 返回值:List(fileId, HoodieKey) , 多个 fileId 对应一个 HoodieKey

  /**
   * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
   * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
   * to be compared gets cut down a lot from range pruning.
   *
   * 对于每个传入的记录,生成N个输出记录,每个文件1个,需要对照该记录的密钥进行检查。
   * 对于键有明确插入顺序的表(例如:时间戳作为前缀),要比较的文件数量会因范围修剪而大大减少。
   * <p>
   * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
   * recordKey ranges in the index info.
   *
   * 子分区,以确保可以根据文件查找记录,还可以根据索引信息中的recordKey范围修剪文件<=>记录比较。
   *
   * 主要逻辑:利用区间树根据最大值最小值,返回可能包含 recordKey 的文件列表。利用区间树的原因主要是可以降低查询时间。
   * 查询逻辑:1、对于有最大值和最小值的文件,如果该 recordKey 在最大值最小之区间内,则认为该文件可能包含 recordKey
   *         2、对于没有最大值和最小值的文件,则认为该文件可能包含 recordKey
   *
   * 返回值:List(fileId, HoodieKey)
   */

  List<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      Map<String, List<String>> partitionRecordKeyMap) {
    // 是否使用区间树基于最小和最大记录键值进行过滤,默认为true
    IndexFileFilter indexFileFilter =
        config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
    // List(fileId, HoodieKey)
    List<Pair<String, HoodieKey>> fileRecordPairs = new ArrayList<>();
    partitionRecordKeyMap.keySet().forEach(partitionPath ->  {
      List<String> hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath);
      hoodieRecordKeys.forEach(hoodieRecordKey -> {
        indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> {
          // (fileId, HoodieKey)
          fileRecordPairs.add(Pair.of(partitionFileIdPair.getRight(),
              new HoodieKey(hoodieRecordKey, partitionPath)));
        });
      });
    });
    return fileRecordPairs;
  }

IntervalTreeBasedIndexFileFilter

基于区间树的索引查找。为每个分区构建一个{@link KeyRangeLookupTree},并使用它来搜索需要查找的任何给定recordKey的匹配索引文件。

  • KeyRangeLookupTree: 基于区间树实现的查找树,查询任意给定 Key 的时间复杂度为 (N logN)

  • 对于有有最大值最小值的文件,构造为区间树:KeyRangeLookupTree

  • 对于没有最大值最小值的文件,将 fileId 添加到 partitionToFilesWithNoRanges

  IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
    partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
      // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
      // 请注意,区间树实现没有自动平衡来确保logN搜索时间。
      // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be
      // skewed which could result in N search time instead of logN.
      // 所以,我们在这里打乱输入,希望树不会有任何倾斜。否则,树可能会倾斜,这可能导致搜索时间是N而不是logN。
      Collections.shuffle(bloomIndexFiles);
      KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
      bloomIndexFiles.forEach(indexFileInfo -> {
        if (indexFileInfo.hasKeyRanges()) { // 如果有最大值最小值
          // 将 最大值,最小值,fileId 插入到 lookUpTree
          // 构造间隔数
          lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(),
              indexFileInfo.getFileId()));
        } else {
          if (!partitionToFilesWithNoRanges.containsKey(partition)) {
            partitionToFilesWithNoRanges.put(partition, new HashSet<>());
          }
          // 将没有最大值最小值的 fileId 添加到 partitionToFilesWithNoRanges
          partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
        }
      });
      partitionToFileIndexLookUpTree.put(partition, lookUpTree);
    });
  }

lookUpTree.insert

区间树的具体构造逻辑。

  void insert(KeyRangeNode newNode) {
    root = insert(getRoot(), newNode);
  }

  /**
   * Inserts a new {@link KeyRangeNode} to this look up tree.
   *
   * If no root exists, make {@code newNode} as the root and return the new root.
   *
   * If current root and newNode matches with min record key and max record key, merge two nodes. In other words, add
   * files from {@code newNode} to current root. Return current root.
   *
   * If current root is < newNode if current root has no right sub tree update current root's right sub tree max and min
   * set newNode as right sub tree else update root's right sub tree min and max with newNode's min and max record key
   * as applicable recursively call insert() with root's right subtree as new root
   *
   * else // current root is >= newNode if current root has no left sub tree update current root's left sub tree max and
   * min set newNode as left sub tree else update root's left sub tree min and max with newNode's min and max record key
   * as applicable recursively call insert() with root's left subtree as new root
   *
   * @param root refers to the current root of the look up tree
   * @param newNode newNode the new {@link KeyRangeNode} to be inserted
   */

  private KeyRangeNode insert(KeyRangeNode root, KeyRangeNode newNode) {
    if (root == null) {
      root = newNode;
      return root;
    }

    if (root.compareTo(newNode) == 0) {
      root.addFiles(newNode.getFileNameList());
      return root;
    }

    if (root.compareTo(newNode) < 0) {
      if (root.getRight() == null) {
        root.setRightSubTreeMax(newNode.getMaxRecordKey());
        root.setRightSubTreeMin(newNode.getMinRecordKey());
        root.setRight(newNode);
      } else {
        if (root.getRightSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
          root.setRightSubTreeMax(newNode.getMaxRecordKey());
        }
        if (root.getRightSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
          root.setRightSubTreeMin(newNode.getMinRecordKey());
        }
        insert(root.getRight(), newNode);
      }
    } else {
      if (root.getLeft() == null) {
        root.setLeftSubTreeMax(newNode.getMaxRecordKey());
        root.setLeftSubTreeMin(newNode.getMinRecordKey());
        root.setLeft(newNode);
      } else {
        if (root.getLeftSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
          root.setLeftSubTreeMax(newNode.getMaxRecordKey());
        }
        if (root.getLeftSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
          root.setLeftSubTreeMin(newNode.getMinRecordKey());
        }
        insert(root.getLeft(), newNode);
      }
    }
    return root;
  }

getMatchingFilesAndPartition

  • 对于有最大值最小值的文件,利用区间树 KeyRangeLookupTree 查找可能包含该 recordKey 的 fileId 列表 。

    • 根据文件的最大最小值判断,如果 recordKey 在最大值最小值区间,则可能存在该文件中

    • 如果不在最大值最小值区间,则不存在该文件中

    • 利用区间树的原因主要是可以降低查询时间。

  • 对于没有最大值最小值的文件,则认为都可能存在该 recordKey ,所以全部返回    

  • 返回值 Set(Pair(partitionPath, fileId))

  @Override
  public Set<Pair<String, String>> getMatchingFilesAndPartition(String partitionPath, String recordKey) {
    // (partitionPath, fileId)
    Set<Pair<String, String>> toReturn = new HashSet<>();
    // could be null, if there are no files in a given partition yet or if all index files have no ranges
    // 如果给定分区中还没有文件,或者所有索引文件都没有范围,则可能为null
    if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) {
      // 利用 KeyRangeLookupTree 查找该分区下有最大值最小值的文件中可能包含该 recordKey 的 fileId 列表
      // 查找逻辑:根据文件的最大最小值判断,如果 recordKey 在最大值最小值区间,则可能存在该文件中
      // 如果不在最大值最小值区间,则不存在该文件中
      partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    if (partitionToFilesWithNoRanges.containsKey(partitionPath)) {
      // 对于没有最大值最小值的文件,则认为都是可能存在该 recordKey ,所以全部返回
      partitionToFilesWithNoRanges.get(partitionPath).forEach(file ->
          toReturn.add(Pair.of(partitionPath, file)));
    }
    return toReturn;
  }

findMatchingFilesForRecordKeys

找出<RowKey,filename>对。

  /**
   * Find out <RowKey, filename> pair.
   * 找出<RowKey,filename>对。
   */

  Map<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
      List<Pair<String, HoodieKey>> fileComparisons,
      HoodieTable hoodieTable)
 
{
    // 按照 fileId 排序
    fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1.getLeft().compareTo(o2.getLeft())).collect(toList());

    List<HoodieKeyLookupHandle.KeyLookupResult> keyLookupResults = new ArrayList<>();

    // 这里实际返回 LazyKeyCheckIterator,其父类的 LazyIterableIterator 的 next 方法会调用 computeNext
    Iterator<List<HoodieKeyLookupHandle.KeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator());
    while (iterator.hasNext()) {
      // 这里实际调用 LazyKeyCheckIterator.computeNext
      // 这里涉及读取保存在 Parquet文件中的布隆过滤器 BloomFilter
      keyLookupResults.addAll(iterator.next());
    }

    Map<HoodieKey, HoodieRecordLocation> hoodieRecordLocationMap = new HashMap<>();

    // 过滤掉 matchingRecordKeys 为空的,matchingRecordKeys 为空代表,没有一个 recordKey 存在于该文件中
    keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
    keyLookupResults.forEach(lookupResult -> {
      lookupResult.getMatchingRecordKeys().forEach(r -> {
        // (HoodieKey, HoodieRecordLocation) ,将 HoodieKey 和 HoodieRecordLocation 关联
        hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
      });
    });

    return hoodieRecordLocationMap;
  }

LazyKeyCheckIterator.computeNext

    protected List<KeyLookupResult> computeNext() {
      List<KeyLookupResult> ret = new ArrayList<>();
      try {
        // process one file in each go.
        // 遍历 (fileId, HoodieKey)
        while (inputItr.hasNext()) {
          Pair<String, HoodieKey> currentTuple = inputItr.next();
          String fileId = currentTuple.getLeft();
          String partitionPath = currentTuple.getRight().getPartitionPath();
          String recordKey = currentTuple.getRight().getRecordKey();
          // (partitionPath, fileId)
          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

          // lazily init state
          // 延迟初始化状态
          if (keyLookupHandle == null) {
            // 在 HoodieKeyLookupHandle 的构造方法中会读取保存在Parquet文件中的布隆过滤器信息
            // 将其反序列化为 BloomFilter
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
          }

          // if continue on current file
          // 如果继续当前文件
          // (partitionPath, fileId) 确定一个文件,一个 fileId 对应多个HoodieKey,
          // 所以可能在一个文件上可能遍历多次
          // 前面已经按照 fileId 排序,所以可以保证一个 fileId 对应的记录是连续的。
          if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
            // 添加 recordKey
            // 这里利用布隆过滤器进行二次过滤,将命中(可能存在于该文件中)的 recordKey 添加到 candidateRecordKeys (候选RecordKeys)
            // bloomFilter.mightContain(recordKey) 判断该recordKey 是否可能存在于该文件
            keyLookupHandle.addKey(recordKey);
          } else { // 如果上一个文件结束
            // do the actual checking of file & break out
            // 进行文件的实际检查和分解
            // 将 keyLookupHandle.getLookupResult 查询结果添加到返回值 ret 中
            ret.add(keyLookupHandle.getLookupResult());
            // 新文件的 HoodieKeyLookupHandle
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            // 添加 recordKey
            // 这里利用布隆过滤器进行二次过滤,将命中(可能存在于该文件中)的 recordKey 添加到 candidateRecordKeys (候选RecordKeys)
            // bloomFilter.mightContain(recordKey) 判断该recordKey 是否可能存在于该文件
            keyLookupHandle.addKey(recordKey);
            break;
          }
        }

        // handle case, where we ran out of input, close pending work, update return val
        // 处理输入不足的情况,关闭待处理的工作,更新返回值
        if (!inputItr.hasNext()) {
          // 遍历结束,将 getLookupResult 的返回值添加到 ret 中
          ret.add(keyLookupHandle.getLookupResult());
        }
      } catch (Throwable e) {
        if (e instanceof HoodieException) {
          throw e;
        }
        throw new HoodieIndexException("Error checking bloom filter index. ", e);
      }
      return ret;
    }

createNewFileReader().readBloomFilter()

  this.bloomFilter = createNewFileReader().readBloomFilter();

  // HoodieParquetReader.readBloomFilter
  public BloomFilter readBloomFilter() {
    return parquetUtils.readBloomFilterFromMetadata(conf, path);
  }

  // BaseFileUtils.readBloomFilterFromMetadata
    /**
   * Read the bloom filter from the metadata of the given data file.
   * 从给定数据文件的元数据中读取布隆过滤器。
   * @param configuration Configuration
   * @param filePath The data file path
   * @return a BloomFilter object
   */

  public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) {
    Map<String, String> footerVals =
        readFooter(configuration, false, filePath,
            HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
            HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
            HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
    String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
    if (null == footerVal) {
      // We use old style key "com.uber.hoodie.bloomfilter"
      footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
    }
    BloomFilter toReturn = null;
    if (footerVal != null) {
      if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
        toReturn = BloomFilterFactory.fromString(footerVal,
            footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
      } else {
        toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
      }
    }
    return toReturn;
  }

keyLookupHandle.addKey

  public void addKey(String recordKey) {
    // check record key against bloom filter of current file & add to possible keys if needed
    // 根据当前文件的布隆过滤器检查记录键,并在需要时添加可能的键
    if (bloomFilter.mightContain(recordKey)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFilePair);
      }
      // 如果命中的话,添加到候选
      candidateRecordKeys.add(recordKey);
    }
    totalKeysChecked++;
  }

getLookupResult

在所有添加的键中,返回在文件组中实际找到的键的列表。

  /**
   * Of all the keys, that were added, return a list of keys that were actually found in the file group.
   *
   * 在所有添加的键中,返回在文件组中实际找到的键的列表。
   */

  public KeyLookupResult getLookupResult() {
    if (LOG.isDebugEnabled()) {
      LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys);
    }

    HoodieBaseFile dataFile = getLatestDataFile();
    // 调用 checkCandidatesAgainstFile 返回在文件组中实际找到的 RecordKeys。
    List<String> matchingKeys =
        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
    LOG.info(
        String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
            candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
    return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
        dataFile.getCommitTime(), matchingKeys);
  }

/**
   * Given a list of row keys and one file, return only row keys existing in that file.
   *
   * 给定一个行键列表和一个文件,只返回该文件中存在的行键。
   * 这里拿候选的 RecordKeys 去实际的 parquet文件中做一一比对,看是否确实存在于该 parquet文件中
   * 返回过滤后的实际存在于该 parquet文件中的 RecordKeys
   */

  public List<String> checkCandidatesAgainstFile(Configuration configuration, List<String> candidateRecordKeys,
                                                 Path filePath)
 throws HoodieIndexException 
{
    List<String> foundRecordKeys = new ArrayList<>();
    try {
      // Load all rowKeys from the file, to double-confirm
      if (!candidateRecordKeys.isEmpty()) {
        HoodieTimer timer = new HoodieTimer().startTimer();
        // 这里拿候选的 RecordKeys 去实际的 parquet文件中做一一比对,看是否确实存在于该 parquet文件中
        // 返回过滤后的实际存在于该 parquet文件中的 RecordKeys
        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new HashSet<>(candidateRecordKeys));
        foundRecordKeys.addAll(fileRowKeys);
        LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
            timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size()));
        if (LOG.isDebugEnabled()) {
          LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys);
        }
      }
    } catch (Exception e) {
      throw new HoodieIndexException("Error checking candidate keys against file.", e);
    }
    return foundRecordKeys;
  }

tagLocationBacktoRecords

在第二步已经通过 lookupIndex 获取的 HoodieKey 和 HoodieRecordLocation 的对应关系,tagLocationBacktoRecords 就是根据 lookupIndex 返回的对应关系 keyFilenamePair ,为每个 HoodieRecord 设置 currentLocation 。
有的 HoodieRecord 可能没有对应的 fileId,所以也就不会设置 currentLocation 。

  /**
   * Tag the <rowKey, filename> back to the original HoodieRecord List.
   *
   * 将<rowKey,filename>标记回原始的 HoodieRecord 列表。
   * 其实就是设置 HoodieRecord 的 currentLocation
   */

  protected List<HoodieRecord<T>> tagLocationBacktoRecords(
      Map<HoodieKey, HoodieRecordLocation> keyFilenamePair, List<HoodieRecord<T>> records) {
    // (HoodieKey, HoodieRecord)
    Map<HoodieKey, HoodieRecord<T>> keyRecordPairMap = new HashMap<>();
    records.forEach(r -> keyRecordPairMap.put(r.getKey(), r));
    // Here as the record might have more data than rowKey (some rowKeys' fileId is null),
    // so we do left outer join.
    // 在这里,由于记录可能比rowKey有更多的数据(一些rowKeys的fileId为空),因此我们进行了左外连接。
    List<Pair<HoodieRecord<T>, HoodieRecordLocation>> newList = new ArrayList<>();
    keyRecordPairMap.keySet().forEach(k -> {
      if (keyFilenamePair.containsKey(k)) { // 如果存在,代表该 key 已经存在于文件中
        //(HoodieRecord, HoodieRecordLocation) 根据该 key 获取对应的 HoodieRecordLocation
        newList.add(Pair.of(keyRecordPairMap.get(k), keyFilenamePair.get(k)));
      } else {
        // 否则,没有对应的文件,添加为 null
        newList.add(Pair.of(keyRecordPairMap.get(k), null));
      }
    });
    List<HoodieRecord<T>> res = Lists.newArrayList();
    for (Pair<HoodieRecord<T>, HoodieRecordLocation> v : newList) {
      // 通过 HoodieIndexUtils.getTaggedRecord 设置每个 HoodieRecord 的 currentLocation
      res.add(HoodieIndexUtils.getTaggedRecord(v.getLeft(), Option.ofNullable(v.getRight())));
    }
    return res;
  }

总结

  • tag/tagLocation :根据索引信息判断记录是否存在,如果不存在,代表是新增数据,如果记录存在则代表是更新数据,需要找到并设置 currentLocation。

  • tag : table.getIndex().tagLocation -> JavaHoodieBloomIndex.tagLocation -> HoodieBaseBloomIndex.tagLocation

  • tagLocation 会利用上篇文章讲的写到 parquet 文件中的 最大值最小值和布隆过滤器

  • 最大值最小值用在第一阶段的过滤:构造区间树 (Interval Tree),利用区间树查找 每个 recordKey 可能存在于哪些文件中,利用区间树的有点在于可以加速查找,时间复杂度为 O(logN)。

    • 对于有最大值和最小值的文件,如果该 recordKey 在最大值最小之区间内,则认为该文件可能包含 recordKey

    • 对于没有最大值和最小值的文件,则认为该文件可能包含 recordKey

    • 所以这里返回的是多对多的关系,类似于笛卡尔积。即一个文件可能保存多个 recordKey ,一个 recordKey 可能存在于多个文件中。

    • 所以对于 recordKey 有明确的顺序关系的(例如:时间戳作为前缀),要比较的文件数量会因范围修剪而大大减少。这样不仅可以加速查找时间,还会提高查询精确度,也就是返回的 recordKey 和 location 的关系数量会少许多。

    • 对于像字符串这种没有顺序关系的(hash值) recordKey ,会导致每个文件的最大值最小值区间范围都会比较大,这样 recordKey 就会可能存在于多个文件中,导致返回的对应关系特别大,不仅影响区间树的查询效率,还会影响后面的遍历性能。而且这种对应关系也会占比较大的内存,比如
      本批次recordKey 的数量有 10万 ,文件数有 1000个,那个最后返回的 List(fileId, HoodieKey) 的数量可能会达到 10w * 1000 = 1亿个,这是最坏的情况,但是一般情况下也会达到几千万个。

    • 所以对于没有顺序关系的 recordKey ,我们可能禁用第一阶段的利用区间树过滤,效率可能会更好一些。相关参数 :hoodie.bloom.index.use.treebased.filter = false , hoodie.bloom.index.prune.by.ranges = false

  • 布隆过滤器用在第二阶段的过滤,遍历第一阶段返回的 List(fileId, HoodieKey) ,利用从parquet文件中反序列化的来的布隆过滤器进行二次过滤,判断哪些 HoodieKey 有可能存在于该 fileId 中,如果可能存在则添加到候选:candidateRecordKeys

  • 布隆过滤器的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。所以只能判断有可能存在,还需要去和实际的数据文件去对比,进一步确认是否确实存在于该文件中。

  • 然后遍历 candidateRecordKeys ,去遍历每个parquet数据文件,和 parquet 文件中的 key 进行实际的比较,对于确实存在于该文件中的,返回实际的 HoodieKey 和 Location 的对应关系:Map<HoodieKey, HoodieRecordLocation>

  • 最后根据上一步返回的 HoodieKey 和 Location 的对应关系,为每个 HoodieRecord 设置 currentLocation ,有的 HoodieRecord 没有对应的 fileId ,所以不需要设置 currentLocation。

  • 无论是区间树查询最大值最小值,还是反序列化布隆过滤,都仅涉及读取文件页脚,所以读取成本较低

  • 随着表数据量的增加、数据文件数的增加,会导致两个问题,从而使索引性能越来越差。

    • 占用内存增加:前面提到有笛卡尔积,文件数越多,笛卡尔积越大,从而占用内存也会增加,遍历耗时也会增加。另外布隆过滤器 BitSet 也会随着每个文件的 recordKey 的数量的增加越来越大,从而导致布隆过滤器占用的内存也越来越大。

    • 我们在利用区间树和布隆过滤器过滤完每个key可能存在于哪些文件中之后,会使用筛选后的键和关联的base文件(这里为parquet) 执行实际的文件查找 。因为这里要加载所有涉及的整个parquet文件内容,随着文件数量的增大和文件大小的增加,都会导致遍历查询这些parquet文件的时间越来越长,从而导致索引性能越来越差。

🧐 分享、点赞、在看,给个3连击👇

浏览 9
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报