Flink 教程 | PyFlink 教程(二):PyFlink Table API - Python 自定义函数
背景:Python 自定义函数是 PyFlink Table API 中最重要的功能之一,其允许用户在 PyFlink Table API 中使用 Python 语言开发的自定义函数,极大地拓宽了 Python Table API 的使用范围。
目前 Python 自定义函数的功能已经非常完善,支持多种类型的自定义函数,比如 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 支持)、Panda UDF、Pandas UDAF 等。接下来,我们详细介绍一下如何在 PyFlink Table API 作业中使用 Python 自定义函数。
一、Python 自定义函数基础
1. Python UDF
from pyflink.table.udf import udf, FunctionContext, ScalarFunction
from pyflink.table import DataTypes
方式一:
@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
方式二:
sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())
方式三:
class SubString(object):
def __call__(self, s: str, begin: int, end: int):
return s[begin:end]
sub_string = udf(SubString(), result_type=DataTypes.STRING())
方式四:
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())
方式五:
class SubString(ScalarFunction):
def open(self, function_context: FunctionContext):
pass
def eval(self, s: str, begin: int, end: int):
return s[begin:end]
sub_string = udf(SubString(), result_type=DataTypes.STRING())
需要通过名字为 “ udf ” 的装饰器,声明这是一个 scalar function; 需要通过装饰器中的 result_type 参数,声明 scalar function 的结果类型; 上述方式五,通过继承 ScalarFunction 的方式来定义 Python UDF 有以下用处: ScalarFunction 的基类 UserDefinedFunction 中定义了一个 open 方法,该方法只在作业初始化时执行一次,因此可以利用该方法,做一些初始化工作,比如加载机器学习模型、连接外部服务等。 此外,还可以通过 open 方法中的 function_context 参数,注册及使用 metrics。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
table.select(sub_string(table.a, 1, 3))
2. Python UDTF
from pyflink.table.udf import udtf
from pyflink.table import DataTypes
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str, sep: str):
splits = s.split(sep)
yield splits[0], splits[1]
需要通过名字为 “ udtf ” 的装饰器,声明这是一个 table function; 需要通过装饰器中的 result_types 参数,声明 table function 的结果类型。由于 table function 每条输出可以包含多个列,result_types 需要指定所有输出列的类型; Python UDTF 的定义,也支持 Python UDF 章节中所列出的多种定义方式,这里只展示了其中一种。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])
table.join_lateral(split(table.a, '|').alias("c1, c2"))
table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))
3. Python UDAF
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udaf
class WeightedAvg(AggregateFunction):
def create_accumulator(self):
# Row(sum, count)
return Row(0, 0)
def get_value(self, accumulator: Row) -> float:
if accumulator[1] == 0:
return 0
else:
return accumulator[0] / accumulator[1]
def accumulate(self, accumulator: Row, value, weight):
accumulator[0] += value * weight
accumulator[1] += weight
def retract(self, accumulator: Row, value, weight):
accumulator[0] -= value * weight
accumulator[1] -= weight
weighted_avg = udaf(f=WeightedAvg(),
result_type=DataTypes.DOUBLE(),
accumulator_type=DataTypes.ROW([
DataTypes.FIELD("f0", DataTypes.BIGINT()),
DataTypes.FIELD("f1", DataTypes.BIGINT())]))
需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function, 需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型; create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义,详细信息可以参见 Flink 官方文档 [1];需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义(Pandas UDAF 没有这方面的限制)。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],
["value", "count", "name"])
t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
4. Python UDTAF
from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import udtaf, TableAggregateFunction
class Top2(TableAggregateFunction):
def create_accumulator(self):
# 存储当前最大的两个值
return [None, None]
def accumulate(self, accumulator, input_row):
if input_row[0] is not None:
# 新的输入值最大
if accumulator[0] is None or input_row[0] > accumulator[0]:
accumulator[1] = accumulator[0]
accumulator[0] = input_row[0]
# 新的输入值次大
elif accumulator[1] is None or input_row[0] > accumulator[1]:
accumulator[1] = input_row[0]
def emit_value(self, accumulator):
yield Row(accumulator[0])
if accumulator[1] is not None:
yield Row(accumulator[1])
top2 = udtaf(f=Top2(),
result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
Python UDTAF 功能是 Flink 1.13 之后支持的新功能; create_accumulator,accumulate 和 emit_value 这 3 个方法必须定义,此外 TableAggregateFunction 中支持 retract、merge 等方法,可以根据需要选择是否定义,详细信息可以参见 Flink 官方文档[2]。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(2, 'Hi', 'Hello'),
(7, 'Hi', 'Hello')],
['a', 'b', 'c'])
t_env.execute_sql("""
CREATE TABLE my_sink (
word VARCHAR,
`sum` BIGINT
) WITH (
'connector' = 'print'
)
""")
result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")
# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
result.wait()
11> +I[Hi, 7]
10> +I[Hi2, 5]
11> +I[Hi, 3]
Python UDTAF 只能用于 Table API,不能用于 SQL 语句中; flat_aggregate 的结果包含了原始的 grouping 列以及 UDTAF(top 2)的输出,因此,可以在 select 中访问列 “ b ”。
二、Python 自定义函数进阶
1. 在纯 SQL 作业中使用 Python 自定义函数
CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON
CREATE TABLE source (
a VARCHAR
) WITH (
'connector' = 'datagen'
);
CREATE TABLE sink (
a VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT sub_string(a, 1, 3)
FROM source;
2. 在 Java 作业中使用 Python 自定义函数
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");
tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));
tEnv.executeSql("SELECT sub_string(a) FROM source").collect();
3. 依赖管理
第三方 Python 库如何被 Python 自定义函数访问。不同的作业,对于 Python 库的版本要求是不一样的,将第三方 Python 库预安装到集群的 Python 环境中,只适用于安装一些公共的依赖,不能解决不同作业对于 Python 依赖个性化的需求; 机器学习模型或者数据文件,如何分发到集群节点,并最终被 Python 自定义函数访问。
需要注意的是,Python UDF 的实现所在的文件,也需要在作业执行的时候,作为依赖文件上传; 可以通过合用 “存档文件” 与 “ Python 解释器路径”,指定作业使用上传的 Python 虚拟环境来执行,比如:
table_env.add_python_archive("/path/to/py_env.zip")
# 指定使用py_env.zip包中带的python来执行用户自定义函数,必须通过相对路径来指定
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
推荐用户使用 conda 来构建 Python 虚拟环境,conda 构建的 Python 虚拟环境包含了执行 Python 所需的绝大多数底层库,可以极大地避免当本地环境与集群环境不一样时,所构建的 Python 虚拟环境在集群执行时,缺少各种底层依赖库的问题。关于如何使用 conda 构建的 Python 虚拟环境,可以参考阿里云 VVP 文档中 “使用 Python 三方包” 章节的介绍 [4] 有些 Python 三方库需要安装才能使用,即并非 ”将其下载下来就可以直接放到 PYTHONPATH 中引用“,针对这种类型的 Python 三方库,有两种解决方案:
将其安装在 Python 虚拟环境之中,指定作业运行使用所构建的 Python 虚拟环境; 找一台与集群环境相同的机器(或 docker),安装所需的 Python 三方库,然后将安装文件打包。该方式相对于 Python 虚拟环境来说,打包文件比较小。详情可以参考阿里云 VVP 文档中 “使用自定义的 Python 虚拟环境” 章节的介绍 [5]。
4. 调试
5. 调优
checkpoint 时,会触发缓存数据的计算,因此当上述参数配置的值过大时,可能会导致checkpoint 时需要处理过多的数据,从而导致 checkpoint 时间过长,甚至会导致 checkpoint 失败。当遇到作业的 checkpoint 时间比较长的问题时,可以尝试减少上述参数的取值。
三、常见问题
1)Python 自定义函数的实际返回值类型与 result_type 中声明的类型不一致,该问题会导致 Java 算子在收到 Python 自定义函数的执行结果,进行反序列化时报错,错误堆栈类似:
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
四、总结
在这篇文章中,我们主要介绍了各种 Python 自定义函数的定义及使用方式,以及 Python 依赖管理、 Python 自定义函数调试及调优等方面的信息,希望可以帮助用户了解 Python 自定义函数。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景、最佳实践等。
实时机器学习:支持机器学习场景下实时特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的标准,推动例如搜索、推荐、广告、风控等场景的全面实时化; 大数据 + AI 一体化:包括编程语言一体化 (PyFlink 相关工作),执行引擎集成化 (TF on Flink),工作流及管理一体化(Flink AI Flow)。
引用链接:
评论