Flink SQL LookupJoin终极解决方案及Flink Rule入门
Flink Join
常规Join
例如常用的内联接:
SELECT * FROM Orders
JOIN Product
ON Orders.productId = Product.id
这种 JOIN 要求 JOIN 两边数据都永久保留在 Flink state 中,才能保证输出结果的准确性,这将导致 State 的无限膨胀。
可以配置 state 的TTL(time-to-live:table.exec.state.ttl)来避免其无限增长,但请注意这可能会影响查询结果的准备性。
Interval Join
根据 JOIN 条件和时间限制进行的 JOIN。它基于两个 KeyStream,按照 JOIN 条件将一条流上的每条数据与另一条流上不同时间窗口的数据进行连接。
例如,查询订单及关联的支付信息,其中支付是在下单时间前后各1小时内:
SELECT
...
FROM
Orders AS o JOIN Payment AS p ON
o.orderId = p.orderId AND
p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
orderTime + INTERVAL '1' HOUR
Temporal join
首先介绍一个时态表的概念,这是一个随时间不断变化的动态表,它可能包含表的多个快照。
对于时态表中的记录,可以追踪、访问其历史版本的表称为版本表,如数据库的 changeLog;
只能追踪、访问最新版本的表称为普通表,如数据库的表。
在Flink中,定义了主键约束和事件时间属性的表就是版本表。
Temporal Join 允许 JOIN 版本表,即主表可以用一个不断更新的版本表,根据时间和等值关联条件来扩充其详细信息。两个表必须同时为事件时间或处理时间。
当使用事件时间时,版本表保留从上一个 watermark 到当前时刻的所有版本数据,左右表都需要配置好 watermark;右表必须为 CDC 数据,正确配置主键,且主键必须在 JOIN 的等值关联条件中。例如:
-- 左表为普通的 append-only 表.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 右表为汇率的版本表,CDC 数据
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
-- 主键必须在关联条件中
ON orders.currency = currency_rates.currency;
order_id price currency conversion_rate order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
当使用处理时间时,用户可以将 Lookup 表(右表)看成一个普通的HashMap,它存储了最新的全量数据。Flink 可直接 JOIN 一个外部数据库系统的表,而无须存储最新版本的状态。例如:
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency;
-- 或 Join 一个表函数
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
注意:"FOR SYSTEM_TIME AS OF"语法不支持 VIEW/任意最新表是因为考虑到Flink的实现与其语义不大相符,左流的 JOIN 处理不会等待右边的版本表(VIEW/表函数)完成快照后才进行。个人理解可能会导致左表 JOIN 上的右表并不一定是当前最新的数据。
Lookup Join
同基于事件时间的 Temporal Join,以 JOIN 算子执行时的时间点查询右表的数据进行关联。一般用于维表关联,只支持等值 JOIN。例如:
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency;
Lookup Join 执行流程
以 Flink 单测用例为例进行讲解,新手可以基于此上手开发自定义的 Rule。
准备工作
编译 Flink Table 模块
flink-table 目录下执行:mvn clean package -Pfast,hive-2.1.1,scala-2.12 -DskipTests
打开单测文件
Flink Rule 的 UT 包含:
逻辑计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical 物理计划测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql、XXX/batch/sql 集成测试:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql、XXX/batch/sql
这也是向社区提交 Rule 相关 PR 需要完成的 UT
打开日志级别
在需要单测的代码前,加上:Configurator.setAllLevels("", Level.TRACE)
跟踪sql执行
下文基于文件:flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala 的执行进行分析。 执行单测:testJoinTemporalTable SELECT * FROM MyTable AS T JOIN LookupTable
FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
sql解析
parser(calcite语法支持)会将SQL语句 "FOR SYSTEM_TIME AS OF " 解析成 SqlSnapshot ( SqlNode),validate() 将其转换成 LogicalSnapshot(RelNode),可以看到逻辑 执行计划:
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
LogicalFilter(condition=[=($cor0.a, $0)])
LogicalSnapshot(period=[$cor0.proctime])
LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
优化器优化
FlinkStreamProgram/FlinkBatchProgram中定义了一系列规则,对逻辑/物理计划进行转换和优化。
该案例中会经历下边的几个重要的转换过程:
LogicalCorrelateToJoinFromLookupTableRuleWithFilter:
// 从类的定义可以看出,上方的逻辑计划能匹配上该规则
class LogicalCorrelateToJoinFromLookupTableRuleWithFilter
extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
operand(classOf[LogicalCorrelate],
operand(classOf[RelNode], any()),
operand(classOf[LogicalFilter],
operand(classOf[LogicalSnapshot],
operand(classOf[RelNode], any())))),
"LogicalCorrelateToJoinFromLookupTableRuleWithFilter"
) {
override def matches(call: RelOptRuleCall): Boolean = {
val snapshot: LogicalSnapshot = call.rel(3)
val snapshotInput: RelNode = trimHep(call.rel(4))
isLookupJoin(snapshot, snapshotInput)
}
……
}
// 匹配到规则后判断是否为 lookupJoin
protected def isLookupJoin(snapshot: LogicalSnapshot, snapshotInput: RelNode): Boolean = {
……
// 是处理时间 且 快照的表为LookupTableSource
isProcessingTime && snapshotOnLookupSource
}
匹配到后,会将LogicalCorrelate转换成LogicalJoin
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
+- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
FlinkProjectJoinTransposeRule + ProjectRemoveRule:Project算子下推并裁剪
// 对调Project和下方的Join算子,实现下推Project
public FlinkProjectJoinTransposeRule(
PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
super(operand(Project.class, operand(Join.class, any())), relFactory, null);
this.preserveExprCondition = preserveExprCondition;
}
优化后:
LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalSnapshot(period=[$cor0.proctime])
+- LogicalTableScan(table=[[default_catalog, default_database, LookupTable]])
接下来的Volcano规则会对逻辑计划进行组合优化,生成最优的计划。可以看到执行后,最优结果为:
12129 [main] DEBUG org.apache.calcite.plan.RelOptPlanner [] - Cheapest plan:
FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]): rowcount = 3.0E7, cumulative cost = {4.0E8 rows, 5.0E8 cpu, 1.37E10 io, 0.0 network, 0.0 memory}, id = 403
FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 378
FlinkLogicalSnapshot(period=[$cor0.proctime]): rowcount = 1.0E8, cumulative cost = {2.0E8 rows, 2.0E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}, id = 402
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}, id = 381
// 最后结果:
FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
尝试规则
Rules Attempts Time (us)
FlinkJoinPushExpressionsRule 2 553
JoinConditionPushRule 2 152
FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL) 1 54,956
FlinkLogicalJoinConverter(in:NONE,out:LOGICAL) 1 4,787
FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL) 1 3,162
FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL) 1 1,403
SimplifyJoinConditionRule 1 249
* Total 9 65,262
其中:几个Converter放在LOGICAL_CONVERTERS中,该集合包含了一系列将 Calcite node 转换成 Flink node 的逻辑规则。
比如:FlinkLogicalSnapshotConverter:
// 把 LogicalSnapshot 转换成 FlinkLogicalSnapshot
class FlinkLogicalSnapshotConverter
extends ConverterRule(
// 匹配 LogicalSnapshot 类型,且没有Convention,输出的为 FlinkConventions.LOGICAL
classOf[LogicalSnapshot],
Convention.NONE,
FlinkConventions.LOGICAL,
"FlinkLogicalSnapshotConverter") {
def convert(rel: RelNode): RelNode = {
val snapshot = rel.asInstanceOf[LogicalSnapshot]
val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
}
}
增加处理时间实体化的算子
// convert time indicators
chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)
// 如果是事件时间,且必要的情况下,这里会创建一个 sqlFunction 来实现
rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr)
经转换:
FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
物理规则优化
经下述物理Volcano规则处理后
FlinkJoinPushExpressionsRule
JoinConditionPushRule
StreamPhysicalTableSourceScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalTableSourceScanConverter(in:NONE,out:LOGICAL)
StreamPhysicalSnapshotOnTableScanRule
StreamPhysicalCalcRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)
StreamPhysicalDataStreamScanRule(in:LOGICAL,out:STREAM_PHYSICAL)
FlinkLogicalSnapshotConverter(in:NONE,out:LOGICAL)
FlinkLogicalDataStreamTableScanConverter(in:NONE,out:LOGICAL)
SimplifyJoinConditionRule
得到最优结果:
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
+- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
StreamPhysicalCalcRule:将FlinkLogicalCalc转换成StreamPhysicalCalc SnapshotOnTableScanRule:将
FlinkLogicalJoin
+- FlinkLogicalDataStreamTableScan
+- FlinkLogicalSnapshot
+- FlinkLogicalTableSourceScan
转换成
StreamPhysicalLookupJoin
+- StreamPhysicalDataStreamScan
这里是LookupJoin的关键转换逻辑:
// 该规则使用父类的匹配条件
class SnapshotOnTableScanRule
extends BaseSnapshotOnTableScanRule("StreamPhysicalSnapshotOnTableScanRule") {
}
// 可以看到,正好匹配上未优化前的逻辑计划
abstract class BaseSnapshotOnTableScanRule(description: String)
extends RelOptRule(
operand(classOf[FlinkLogicalJoin],
operand(classOf[FlinkLogicalRel], any()),
operand(classOf[FlinkLogicalSnapshot],
operand(classOf[TableScan], any()))),
description)
with CommonLookupJoinRule
private def doTransform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
temporalTable: RelOptTable,
calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
val joinInfo = join.analyzeCondition
val cluster = join.getCluster
val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
//将input从逻辑节点转换成物理节点,这里会触发 StreamPhysicalDataStreamScanRule,
//把FlinkLogicalTableSourceScan转换成StreamPhysicalDataStreamScan
val convInput = RelOptRule.convert(input, requiredTrait)
new StreamPhysicalLookupJoin(
cluster,
providedTrait,
convInput,
temporalTable,
calcProgram,
joinInfo,
join.getJoinType)
}
至此完成物理计划的转换
翻译物理计划
planner.translate()其中包括了:
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
val transformations = translateToPlan(execGraph)
在translateToExecNodeGraph中:会调用物理计划生成最后节点的translateToExecNode方法。如
StreamPhysicalLookupJoin会转换成StreamExecLookupJoin
在translateToPlan中:调用ExecNode的translateToPlanInternal方法。以CommonExecLookupJoin为例:
protected CommonExecLookupJoin(……){
//这里忽略校验和异步LookupFunction逻辑
public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
// -----------创建lookupFunction Operator的工厂---------------
RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
UserDefinedFunction userDefinedFunction =
LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
UserDefinedFunctionHelper.prepareInstance(
planner.getTableConfig().getConfiguration(), userDefinedFunction);
boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
StreamOperatorFactory<RowData> operatorFactory;
operatorFactory =
createSyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(TableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin,
planner.getExecEnv().getConfig().isObjectReuseEnabled());
//-------------------------------------------------------
// 转换成Transformation
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
return new OneInputTransformation<>(
inputTransformation,
getDescription(),
operatorFactory,
InternalTypeInfo.of(resultRowType),
inputTransformation.getParallelism());
}
}
//只罗列核心逻辑,主要分三块
private StreamOperatorFactory<RowData> createSyncLookupJoin() {
// 通过codeGenerator,生成lookupFunction的函数,包装成FlatMap函数
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
LookupJoinCodeGenerator.generateSyncLookupFunction();
// 生成表函数的输出结果的Collector
GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
LookupJoinCodeGenerator.generateCollector();
// 最后会生成LookupJoinRunner的ProcessFunction
// 如果在lookupJoin这一侧(即右表)有Calc的话,该Runner中会带有Calc的计算逻辑
// 比如:SELECT * FROM T JOIN DIM FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.b + 1
// Fetcher会读出LookupFunction中的原始数据,再经过calc计算后,再与主表(左流)的数据进行比对
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
LookupJoinCodeGenerator.generateCalcMapFunction(
config,
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
filterOnTemporalTable,
temporalTableOutputType,
tableSourceRowType);
ProcessFunction<RowData, RowData> processFunc =
new LookupJoinWithCalcRunner(
generatedFetcher,
generatedCalc,
generatedCollector,
isLeftOuterJoin,
rightRowType.getFieldCount());
}
最后再Transformations->StreamGraph->JobGraph,与DataStream API的流程就统一了。