Hudi 源码 | Hudi Insert 源码剖析(整体流程)

HBase技术社区

共 41944字,需浏览 84分钟

 · 2022-11-18

前言

Apache Hudi Insert源码分析总结,以Java Client为例,不了解Hudi Java Client的可以参考:Hudi Java Client总结|读取Hive写Hudi代码示例

以Java Client为例的原因:1、自己生产上用的Java Client,相比于Spark客户端更熟悉一点。
2、Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
3、Java Client更贴近源码,可以直接分析核心逻辑。不用剖析Spark、Flink源码。对Sprk、Flink源码不熟悉的更容易上手。
4、等分析完Java Client源码后,有时间的话我会再总结一下Spark客户端的源码,这样大家会更容易理解。

版本

Hudi 0.9.0

备注:其实每个版本核心代码都差不多,之所以使用0.9.0,一个是因为对于Java Client,我用0.9.0用的比较多,相比于使用最新版可以节省不少时间,另一个原因是,之前总结的Java Client的源码也是基于0.9.0。比如Hudi Clean Policy 清理策略实现分析Hudi Clean 清理文件实现分析

initTable

首先是通过initTable初始化Hudi表,可以看出来主要就是根据我们配置的一些参数,创建.hoodie元数据目录,然后将这些参数持久化到hoodier.properties文件中,具体的细节可以自己研究。

    public HoodieTableMetaClient initTable(Configuration configuration, String basePath)
        throws IOException 
{
      return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
    }

  /**
   * Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
   *
   * @return Instance of HoodieTableMetaClient
   */

  public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hadoopConf, String basePath,
      Properties props)
 throws IOException 
{
    LOG.info("Initializing " + basePath + " as hoodie table " + basePath);
    Path basePathDir = new Path(basePath);
    final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
    if (!fs.exists(basePathDir)) {
      fs.mkdirs(basePathDir);
    }
    Path metaPathDir = new Path(basePath, METAFOLDER_NAME);
    if (!fs.exists(metaPathDir)) {
      fs.mkdirs(metaPathDir);
    }

    // if anything other than default archive log folder is specified, create that too
    String archiveLogPropVal = new HoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);
    if (!StringUtils.isNullOrEmpty(archiveLogPropVal)) {
      Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
      if (!fs.exists(archiveLogDir)) {
        fs.mkdirs(archiveLogDir);
      }
    }

    // Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
    final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
    if (!fs.exists(temporaryFolder)) {
      fs.mkdirs(temporaryFolder);
    }

    // Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
    final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
    if (!fs.exists(auxiliaryFolder)) {
      fs.mkdirs(auxiliaryFolder);
    }

    initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
    HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
    // We should not use fs.getConf as this might be different from the original configuration
    // used to create the fs in unit tests
    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
    LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
    return metaClient;
  }    

HoodieWriteConfig

这里的配置是写数据时使用的配置,上面initTable的配置是持久化文件的配置,当然这俩配置要保持一致(实际上Spark客户端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些参数。熟悉这些参数后就可以进行调优了。

            Properties indexProperties = new Properties();
            indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(), 150000); // 1000万总体时间提升1分钟
            HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                    .withSchema(writeSchema.toString())
                    .withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism)
                    .forTable(tableName)
                    .withWritePayLoad(payloadClassName)
                    .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
                    .withIndexConfig(HoodieIndexConfig.newBuilder()
                            .withIndexType(HoodieIndex.IndexType.BLOOM)
//                            .bloomIndexPruneByRanges(false) // 1000万总体时间提升1分钟
                            .bloomFilterFPP(0.000001)   // 1000万总体时间提升3分钟
                            .fromProperties(indexProperties)
                            .build())
                    .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150200)
                            .compactionSmallFileSize(Long.parseLong(smallFileLimit))
                            .approxRecordSize(Integer.parseInt(recordSizeEstimate))
                            .retainCommits(100).build())
                    .withStorageConfig(HoodieStorageConfig.newBuilder()
                            .parquetMaxFileSize(Long.parseLong(maxFileSize)).build())
                    .build();

HoodieJavaWriteClient

创建writeClient

HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)

startCommit

String newCommitTime = writeClient.startCommit();

具体的实现在其父类AbstractHoodieWriteClient中。首先调用rollbackFailedWrites执行rollback操作,关于rollback分析本文先不讲。然后通过HoodieActiveTimeline.createNewInstantTime()创建一个新的instantTime。最后创建metaClient,通过metaClient.getActiveTimeline().createNewInstant生成.commit.request文件

  public String startCommit() {
    // 首先调用rollbackFailedWrites执行rollback操作
    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
    // 生成新的instantTime
    String instantTime = HoodieActiveTimeline.createNewInstantTime();
    // 创建metaClient
    HoodieTableMetaClient metaClient = createMetaClient(true);
    startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
    return instantTime;
  }
  private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
    LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
    // if there are pending compactions, their instantTime must not be greater than that of this instant time
    metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
        ValidationUtils.checkArgument(
            HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
        "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
            + latestPending + ",  Ingesting at " + instantTime));
    if (config.getFailedWritesCleanPolicy().isLazy()) {
      this.heartbeatClient.start(instantTime);
    }
    // 创建.commit.request
    metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
        instantTime));
  }  

generateRecord

主要是构造writeClient写数据所需的数据结构writeRecords:List<hoodierecord>,具体的可以参考我之前分享的文章。</hoodierecord

client.insert(writeRecords, newCommitTime)

首先获取table,这里返回HoodieJavaCopyOnWriteTable,接着验证一下schema和历史数据的兼容性。然后通过preWrite执行写之前的一些步骤,比如设置操作类型,接着调用table.insert执行完整的写数据操作,返回result。最后调用postWrite执行archive、clean等操作返回WriteStatuses。

  public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
    // 首先获取table,这里的table为HoodieJavaCopyOnWriteTable
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
        getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
    // 验证schema
    table.validateUpsertSchema();
    // 写之前的一些步骤,比如设置操作类型
    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
    // 调用table.insert执行写数据操作,返回result
    HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
    if (result.getIndexLookupDuration().isPresent()) {
      metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
    }
    // 调用postWrite返回WriteStatuses
    return postWrite(result, instantTime, table);
  }

postWrite

我们先看一下postWrite的逻辑,首先判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean,也就是archive、clean等操作是在写操作完成、生成.commit文件之后进行的。

  /**
   * 判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean
   */

  protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
                                        String instantTime,
                                        HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable)
 
{
    if (result.getIndexLookupDuration().isPresent()) {
      metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
    }
    // commit是否已经提交,这里主要考虑是否设置了自动提交,hoodie.auto.commit默认true
    // 如果不是自动提交的话,那么我们需要手动执行clean等操作,然后手动commit
    // 所以这里默认为true
    // isCommitted代表着已经生成了.commit文件,也就是写操作成功了,也就是通过table.insert已经完成了整个的写操作
    if (result.isCommitted()) {
      // Perform post commit operations.
      if (result.getFinalizeDuration().isPresent()) {
        metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
            result.getWriteStats().get().size());
      }
      // postCommit主要是执行archive、clean等操作。也就是archive、clean等操作是在写操作完成,生成.commit文件之后进行的。
      postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());

      emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
    }
    return result.getWriteStatuses();
  }

table.insert

先调用JavaInsertCommitActionExecutor.execute接着调用JavaWriteHelper.newInstance().write

  public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
                                                       String instantTime,
                                                       List<HoodieRecord<T>> records) {
    return new JavaInsertCommitActionExecutor<>(context, config,
        this, instantTime, records).execute();
  }

  public HoodieWriteMetadata<List<WriteStatus>> execute() {
    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
        config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), thisfalse);
  }

JavaWriteHelper.write

它的write方法是在其父类AbstractWriteHelper中实现的,首先首先判断是否需要去重(通过配置项hoodie.combine.before.insert配置是否需要去重),insert默认不需要去重(upsert/delete默认需要)。如果需要去重的话调用方法combineOnCondition先进行去重。
然后判断是否需要tag, tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据,对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件。由于这里为insert所以不需要tag,这也是insert和upsert一个比较大的区别。
我们后面分析upsert源码时,会专门分析tag怎么实现的,本文先略过。然后通过调用executor.execute执行写操作,返回result,这里的executor为JavaInsertCommitActionExecutor。

  public HoodieWriteMetadata<O> write(String instantTime,
                                      I inputRecords,
                                      HoodieEngineContext context,
                                      HoodieTable<T, I, K, O> table,
                                      boolean shouldCombine,
                                      int shuffleParallelism,
                                      BaseCommitActionExecutor<T, I, K, O, R> executor,
                                      boolean performTagging)
 
{
    try {
      // De-dupe/merge if needed
      // 如果开启了去重,则先去重,insert默认不去重
      // 配置项hoodie.combine.before.insert
      I dedupedRecords =
          combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

      Instant lookupBegin = Instant.now();
      I taggedRecords = dedupedRecords;
      if (performTagging) { // 是否需要tag,insert为false
        // perform index loop up to get existing location of records
        // tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据
        // 对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件
        taggedRecords = tag(dedupedRecords, context, table);
      }
      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

      // 通过调用executor.execute执行写操作,返回result。这里的executor为JavaInsertCommitActionExecutor
      HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
      result.setIndexLookupDuration(indexLookupDuration);
      return result;
    } catch (Throwable e) {
      if (e instanceof HoodieUpsertException) {
        throw (HoodieUpsertException) e;
      }
      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
    }
  }

  public boolean shouldCombineBeforeInsert() {
    return getBoolean(COMBINE_BEFORE_INSERT);
  }

  public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
      .key("hoodie.combine.before.insert")
      .defaultValue("false")
      .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
          + " writing to storage.");

JavaInsertCommitActionExecutor.execute

JavaInsertCommitActionExecutor.execute实际上调用的父类BaseJavaCommitActionExecutorexecute
首先通过buildProfile构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用。WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息。数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理,位置信息是为了获取要更新的文件,也就是对应的fileId。对于upsert数据,我们复用原来的fileId。对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写。然后将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight。这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息。然后通过getPartitioner根据WorkloadProfile获取partitioner,接着调用partition方法返回partitionedRecords(<桶号,对应的HoodieRecord>),一个桶对应一个文件 fileId。最后再遍历partitionedRecords,也就是每个桶执行一次写操作handleInsertPartition/handleUpsertPartition,最后调用BoundedInMemoryExecutor.execute,利用生产者消费者模式写数据,关于如何通过生产者消费者模式写数据,我已经在Hudi源码|bootstrap源码分析总结(写Hudi)分析过bootstrap的源码了,原理一样,不同的是实现类不一样,感兴趣的可以看看。

关于tag(索引相关)、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文讲个大概,可能有不准确的地方,大家可以先结合17张图带你彻底理解Hudi Upsert原理进行学习,至于具体的源码分析,限于篇幅及个人精力,本文先不涉及,会放在后面的文章单独讲解,对于本文可能不准确的地方,也会在后面的文章中更新。

  public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
    HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

    WorkloadProfile profile = null;
    if (isWorkloadProfileNeeded()) { // 始终为true
      // 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
      // WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
      // 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
      // 位置信息是为了获取要更新的文件
      // 对于upsert数据,我们复用原来的fileId
      // 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
      profile = new WorkloadProfile(buildProfile(inputRecords));
      LOG.info("Workload profile :" + profile);
      try {
        // 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
        // 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
        saveWorkloadProfileMetadataToInflight(profile, instantTime);
      } catch (Exception e) {
        HoodieTableMetaClient metaClient = table.getMetaClient();
        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
        try {
          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
          }
        } catch (IOException ex) {
          LOG.error("Check file exists failed");
          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
        }
      }
    }


    // 根据WorkloadProfile获取partitioner
    final Partitioner partitioner = getPartitioner(profile);
    // <桶号,对应的HoodieRecord>,一个桶对应一个文件 fileId
    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);

    List<WriteStatus> writeStatuses = new LinkedList<>();
    // forEach,每个桶执行一次写操作handleInsertPartition/handleUpsertPartition
    // 最终通过BoundedInMemoryExecutor.execute 生产者消费者模式写数据
    partitionedRecords.forEach((partition, records) -> {
      // 是否更新、删除
      if (WriteOperationType.isChangingRecords(operationType)) {
        handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
      } else {
        handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
      }
    });
    updateIndex(writeStatuses, result);
    // commit生成.commit文件,.commit文件的生成标记着写数据的完成
    updateIndexAndCommitIfNeeded(writeStatuses, result);
    return result;
  }

commit

上面通过handleInsertPartition/handleUpsertPartition实际上已经完成了数据的写入。但是最后还要生成.commit元数据文件,代表一次commit的完成,否则如果没有生成commit的话,比如只有.commit.request或者commit.inflight,这样在查询时候不会查到本地写数据生成的文件,而且下次写数据时会触发rollback来处理。

这里索引相关的先不看,commit调用链updateIndexAndCommitIfNeeded->commitOnAutoCommit->autoCommit->commit->commit,最终在BaseJavaCommitActionExecutor的commit方法中通过activeTimeline.saveAsComplete生成.commit文件。
前面讲过了,在生成.commit文件后,会调用postWrite方法触发archive、clean等操作。实际上archive、clean等操作的失败,不影响本次写数据的成功。比如clean失败了,可以下次再clean就可以了。所以当commit完成后,如果clean失败了,这样对于有失败机制的集成工具,比如我们使用的Apache NIFI,是不能将本批次数据放进失败队列的。PS:当本次commit不成功时,我们需要放进失败队列,目的是防止数据丢失。其实我们可以自己写代码继承JavaClient类,将postWrite方法和table.insert分开。这样便于判断是写数据失败还是clean失败,以后我会分享相关代码实现。

  public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
    Instant indexStartTime = Instant.now();
    // Update the index back
    List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
    result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
    result.setWriteStatuses(statuses);
    result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
    commitOnAutoCommit(result);
  }

  protected void commitOnAutoCommit(HoodieWriteMetadata result) {
    // validate commit action before committing result
    runPrecommitValidators(result);
    // validate commit action before committing result
    if (config.shouldAutoCommit()) {
      LOG.info("Auto commit enabled: Committing " + instantTime);
      autoCommit(extraMetadata, result);
    } else {
      LOG.info("Auto commit disabled for " + instantTime);
    }
  }

  protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
    this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
        lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
    try {
      TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
          result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
      commit(extraMetadata, result);
    } finally {
      this.txnManager.endTransaction();
    }
  }

  // BaseJavaCommitActionExecutor
  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
    commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
  }

  protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
    String actionType = getCommitActionType();
    LOG.info("Committing " + instantTime + ", action Type " + actionType);
    result.setCommitted(true);
    result.setWriteStats(writeStats);
    // Finalize write
    finalizeWrite(instantTime, writeStats, result);
    try {
      LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
      HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
          extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

      // 通过activeTimeline.saveAsComplete生成.commit文件
      activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
      LOG.info("Committed " + instantTime);
      result.setCommitMetadata(Option.of(metadata));
    } catch (IOException e) {
      throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
          e);
    }
  } 

handleInsertPartition

handleInsertPartition到生产者消费者模式调用链,handleInsertPartition->handleUpsertPartition->handleInsert->JavaLazyInsertIterable.computeNext->BoundedInMemoryExecutor.execute

  protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                              Partitioner partitioner) {
    return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
  }

  protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
                                                              Partitioner partitioner) {
    JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
    BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
    BucketType btype = binfo.bucketType;
    try {
      if (btype.equals(BucketType.INSERT)) {
        return handleInsert(binfo.fileIdPrefix, recordItr);
      } else if (btype.equals(BucketType.UPDATE)) {
        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
      } else {
        throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
      }
    } catch (Throwable t) {
      String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
      LOG.error(msg, t);
      throw new HoodieUpsertException(msg, t);
    }
  }

  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
      LOG.info("Empty partition");
      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }
    return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
        taskContextSupplier, new CreateHandleFactory<>());
  }

  // JavaLazyInsertIterable
  protected List<WriteStatus> computeNext() {
    // Executor service used for launching writer thread.
    BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
        null;
    try {
      final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
      bufferedIteratorExecutor =
          new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
      // bufferedIteratorExecutor.execute通过生产者消费者模型实现写数据    
      final List<WriteStatus> result = bufferedIteratorExecutor.execute();
      assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
      return result;
    } catch (Exception e) {
      throw new HoodieException(e);
    } finally {
      if (null != bufferedIteratorExecutor) {
        bufferedIteratorExecutor.shutdownNow();
      }
    }
  }  

总结

本文以Java Client为例,对Apache Hudi insert源码进行了整体逻辑的分析总结,希望能对大家有所帮助。由于精力有限,对于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我会在后面的文章再进行总结。并且等insert相关源码分析完后,会再进行upsert的源码分析。可能有些地方不够准确,还请大家多多指正,让我们共同进步。

注释代码

github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments

相关阅读

浏览 49
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报