Hudi 源码 | Hudi Insert 源码剖析(整体流程)
共 41944字,需浏览 84分钟
·
2022-11-18 18:17
前言
Apache Hudi Insert源码分析总结,以Java Client为例,不了解Hudi Java Client的可以参考:Hudi Java Client总结|读取Hive写Hudi代码示例。
以Java Client为例的原因:1、自己生产上用的Java Client,相比于Spark客户端更熟悉一点。
2、Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
3、Java Client更贴近源码,可以直接分析核心逻辑。不用剖析Spark、Flink源码。对Sprk、Flink源码不熟悉的更容易上手。
4、等分析完Java Client源码后,有时间的话我会再总结一下Spark客户端的源码,这样大家会更容易理解。
版本
Hudi 0.9.0
备注:其实每个版本核心代码都差不多,之所以使用0.9.0,一个是因为对于Java Client,我用0.9.0用的比较多,相比于使用最新版可以节省不少时间,另一个原因是,之前总结的Java Client的源码也是基于0.9.0。比如Hudi Clean Policy 清理策略实现分析和Hudi Clean 清理文件实现分析
initTable
首先是通过initTable初始化Hudi表,可以看出来主要就是根据我们配置的一些参数,创建.hoodie
元数据目录,然后将这些参数持久化到hoodier.properties
文件中,具体的细节可以自己研究。
public HoodieTableMetaClient initTable(Configuration configuration, String basePath)
throws IOException {
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath, build());
}
/**
* Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
*
* @return Instance of HoodieTableMetaClient
*/
public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hadoopConf, String basePath,
Properties props) throws IOException {
LOG.info("Initializing " + basePath + " as hoodie table " + basePath);
Path basePathDir = new Path(basePath);
final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(basePathDir)) {
fs.mkdirs(basePathDir);
}
Path metaPathDir = new Path(basePath, METAFOLDER_NAME);
if (!fs.exists(metaPathDir)) {
fs.mkdirs(metaPathDir);
}
// if anything other than default archive log folder is specified, create that too
String archiveLogPropVal = new HoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);
if (!StringUtils.isNullOrEmpty(archiveLogPropVal)) {
Path archiveLogDir = new Path(metaPathDir, archiveLogPropVal);
if (!fs.exists(archiveLogDir)) {
fs.mkdirs(archiveLogDir);
}
}
// Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
if (!fs.exists(temporaryFolder)) {
fs.mkdirs(temporaryFolder);
}
// Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
if (!fs.exists(auxiliaryFolder)) {
fs.mkdirs(auxiliaryFolder);
}
initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}
HoodieWriteConfig
这里的配置是写数据时使用的配置,上面initTable的配置是持久化文件的配置,当然这俩配置要保持一致(实际上Spark客户端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些参数。熟悉这些参数后就可以进行调优了。
Properties indexProperties = new Properties();
indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(), 150000); // 1000万总体时间提升1分钟
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(writeSchema.toString())
.withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism)
.forTable(tableName)
.withWritePayLoad(payloadClassName)
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build())
.withIndexConfig(HoodieIndexConfig.newBuilder()
.withIndexType(HoodieIndex.IndexType.BLOOM)
// .bloomIndexPruneByRanges(false) // 1000万总体时间提升1分钟
.bloomFilterFPP(0.000001) // 1000万总体时间提升3分钟
.fromProperties(indexProperties)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150, 200)
.compactionSmallFileSize(Long.parseLong(smallFileLimit))
.approxRecordSize(Integer.parseInt(recordSizeEstimate))
.retainCommits(100).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(Long.parseLong(maxFileSize)).build())
.build();
HoodieJavaWriteClient
创建writeClient
HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg)
startCommit
String newCommitTime = writeClient.startCommit();
具体的实现在其父类AbstractHoodieWriteClient
中。首先调用rollbackFailedWrites
执行rollback
操作,关于rollback
分析本文先不讲。然后通过HoodieActiveTimeline.createNewInstantTime()
创建一个新的instantTime。最后创建metaClient
,通过metaClient.getActiveTimeline().createNewInstant生成.commit.request
文件
public String startCommit() {
// 首先调用rollbackFailedWrites执行rollback操作
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
// 生成新的instantTime
String instantTime = HoodieActiveTimeline.createNewInstantTime();
// 创建metaClient
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
return instantTime;
}
private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
}
// 创建.commit.request
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
generateRecord
主要是构造writeClient写数据所需的数据结构writeRecords:List<hoodierecord
client.insert(writeRecords, newCommitTime)
首先获取table,这里返回HoodieJavaCopyOnWriteTable,接着验证一下schema和历史数据的兼容性。然后通过preWrite执行写之前的一些步骤,比如设置操作类型,接着调用table.insert执行完整的写数据操作,返回result。最后调用postWrite执行archive、clean等操作返回WriteStatuses。
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
// 首先获取table,这里的table为HoodieJavaCopyOnWriteTable
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
// 验证schema
table.validateUpsertSchema();
// 写之前的一些步骤,比如设置操作类型
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// 调用table.insert执行写数据操作,返回result
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
// 调用postWrite返回WriteStatuses
return postWrite(result, instantTime, table);
}
postWrite
我们先看一下postWrite的逻辑,首先判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean,也就是archive、clean等操作是在写操作完成、生成.commit文件之后进行的。
/**
* 判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean
*/
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
// commit是否已经提交,这里主要考虑是否设置了自动提交,hoodie.auto.commit默认true
// 如果不是自动提交的话,那么我们需要手动执行clean等操作,然后手动commit
// 所以这里默认为true
// isCommitted代表着已经生成了.commit文件,也就是写操作成功了,也就是通过table.insert已经完成了整个的写操作
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());
}
// postCommit主要是执行archive、clean等操作。也就是archive、clean等操作是在写操作完成,生成.commit文件之后进行的。
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
}
return result.getWriteStatuses();
}
table.insert
先调用JavaInsertCommitActionExecutor.execute接着调用JavaWriteHelper.newInstance().write
public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
return new JavaInsertCommitActionExecutor<>(context, config,
this, instantTime, records).execute();
}
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
}
JavaWriteHelper.write
它的write方法是在其父类AbstractWriteHelper
中实现的,首先首先判断是否需要去重(通过配置项hoodie.combine.before.insert配置是否需要去重),insert默认不需要去重(upsert/delete默认需要)。如果需要去重的话调用方法combineOnCondition
先进行去重。
然后判断是否需要tag, tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据,对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件。由于这里为insert所以不需要tag,这也是insert和upsert一个比较大的区别。
我们后面分析upsert源码时,会专门分析tag怎么实现的,本文先略过。然后通过调用executor.execute执行写操作,返回result,这里的executor为JavaInsertCommitActionExecutor。
public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
int shuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
// 如果开启了去重,则先去重,insert默认不去重
// 配置项hoodie.combine.before.insert
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (performTagging) { // 是否需要tag,insert为false
// perform index loop up to get existing location of records
// tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据
// 对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
// 通过调用executor.execute执行写操作,返回result。这里的executor为JavaInsertCommitActionExecutor
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
.withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
+ " writing to storage.");
JavaInsertCommitActionExecutor.execute
JavaInsertCommitActionExecutor.execute实际上调用的父类BaseJavaCommitActionExecutor
的execute
首先通过buildProfile构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用。WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息。数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理,位置信息是为了获取要更新的文件,也就是对应的fileId。对于upsert数据,我们复用原来的fileId。对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写。然后将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight。这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息。然后通过getPartitioner根据WorkloadProfile获取partitioner,接着调用partition方法返回partitionedRecords(<桶号,对应的HoodieRecord>),一个桶对应一个文件 fileId。最后再遍历partitionedRecords,也就是每个桶执行一次写操作handleInsertPartition/handleUpsertPartition,最后调用BoundedInMemoryExecutor.execute,利用生产者消费者模式写数据,关于如何通过生产者消费者模式写数据,我已经在Hudi源码|bootstrap源码分析总结(写Hudi)分析过bootstrap的源码了,原理一样,不同的是实现类不一样,感兴趣的可以看看。
关于tag(索引相关)、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文讲个大概,可能有不准确的地方,大家可以先结合17张图带你彻底理解Hudi Upsert原理进行学习,至于具体的源码分析,限于篇幅及个人精力,本文先不涉及,会放在后面的文章单独讲解,对于本文可能不准确的地方,也会在后面的文章中更新。
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) { // 始终为true
// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
// 位置信息是为了获取要更新的文件
// 对于upsert数据,我们复用原来的fileId
// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
try {
// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
}
// 根据WorkloadProfile获取partitioner
final Partitioner partitioner = getPartitioner(profile);
// <桶号,对应的HoodieRecord>,一个桶对应一个文件 fileId
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
List<WriteStatus> writeStatuses = new LinkedList<>();
// forEach,每个桶执行一次写操作handleInsertPartition/handleUpsertPartition
// 最终通过BoundedInMemoryExecutor.execute 生产者消费者模式写数据
partitionedRecords.forEach((partition, records) -> {
// 是否更新、删除
if (WriteOperationType.isChangingRecords(operationType)) {
handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
} else {
handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
}
});
updateIndex(writeStatuses, result);
// commit生成.commit文件,.commit文件的生成标记着写数据的完成
updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
commit
上面通过handleInsertPartition/handleUpsertPartition实际上已经完成了数据的写入。但是最后还要生成.commit元数据文件,代表一次commit的完成,否则如果没有生成commit的话,比如只有.commit.request或者commit.inflight,这样在查询时候不会查到本地写数据生成的文件,而且下次写数据时会触发rollback来处理。
这里索引相关的先不看,commit调用链updateIndexAndCommitIfNeeded
->commitOnAutoCommit
->autoCommit
->commit
->commit
,最终在BaseJavaCommitActionExecutor的commit方法中通过activeTimeline.saveAsComplete生成.commit文件。
前面讲过了,在生成.commit文件后,会调用postWrite方法触发archive、clean等操作。实际上archive、clean等操作的失败,不影响本次写数据的成功。比如clean失败了,可以下次再clean就可以了。所以当commit完成后,如果clean失败了,这样对于有失败机制的集成工具,比如我们使用的Apache NIFI,是不能将本批次数据放进失败队列的。PS:当本次commit不成功时,我们需要放进失败队列,目的是防止数据丢失。其实我们可以自己写代码继承JavaClient类,将postWrite方法和table.insert分开。这样便于判断是写数据失败还是clean失败,以后我会分享相关代码实现。
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
// validate commit action before committing result
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
autoCommit(extraMetadata, result);
} else {
LOG.info("Auto commit disabled for " + instantTime);
}
}
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime)),
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner());
commit(extraMetadata, result);
} finally {
this.txnManager.endTransaction();
}
}
// BaseJavaCommitActionExecutor
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
}
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
result.setCommitted(true);
result.setWriteStats(writeStats);
// Finalize write
finalizeWrite(instantTime, writeStats, result);
try {
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
// 通过activeTimeline.saveAsComplete生成.commit文件
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
}
handleInsertPartition
附handleInsertPartition
到生产者消费者模式调用链,handleInsertPartition
->handleUpsertPartition
->handleInsert
->JavaLazyInsertIterable.computeNext
->BoundedInMemoryExecutor.execute
protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}
protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new CreateHandleFactory<>());
}
// JavaLazyInsertIterable
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
// bufferedIteratorExecutor.execute通过生产者消费者模型实现写数据
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
}
}
}
总结
本文以Java Client为例,对Apache Hudi insert源码进行了整体逻辑的分析总结,希望能对大家有所帮助。由于精力有限,对于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我会在后面的文章再进行总结。并且等insert相关源码分析完后,会再进行upsert的源码分析。可能有些地方不够准确,还请大家多多指正,让我们共同进步。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments