Flink实战篇|FlinkSQL窗口提前触发实战解析

程序源代码

共 9039字,需浏览 19分钟

 ·

2023-02-10 22:46

全网最全大数据面试提升手册!

简介

正常线上业务计算设置的window窗口比较大,比如1个小时,1天,甚至一周。那么是不是只能在窗口结束之后才能看到数据呢?对于实时的任务,每天或者一周结束的时候,才能输出计算的结果,没有任何意义,我们需要的是实时更新的中间结果数据。比如实时的PV,UV等指标计算。下面介绍一下Flink SQL如何提前触发窗口计算。

实现方式



不开启提前触发窗口

如下demo,从kafka读取数据,窗口聚合,输出一天的pv、uv

 
-- kafka source 
drop table if exists user_log;
CREATE TABLE user_log(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR,
    proc_time  as PROCTIME(),
    ts          TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ( 
       'connector' = 'kafka'
      ,'topic' = 'user_log'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
      );drop table if exists user_log_sink_1;CREATE TABLE user_log_sink_1
(
    wCurrent string
    ,wStart     STRING
    ,wEnd    STRING
    ,pv    bigint
    ,uv    bigint
    ,primary key(wCurrent,wStart,wEnd) not enforced
) WITH (--       'connector' = 'print'
      'connector' = 'upsert-kafka'
      ,'topic' = 'user_log_sink'
      ,'properties.bootstrap.servers' = 'localhost:9092'
      ,'properties.group.id' = 'user_log'
      ,'key.format' = 'json'
      ,'value.format' = 'json'
      );-- window aggregationinsert into user_log_sink_1select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
     ,date_format(TUMBLE_START(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wStart
     ,date_format(TUMBLE_END(proc_time, INTERVAL '1' minute), 'yyyy-MM-dd HH:mm:ss') AS wEnd
     ,count(1) coun
     ,count(distinct user_id)from user_loggroup by TUMBLE(proc_time, INTERVAL '1' minute)

注:为了方便测试,1 天的窗口改为 1 分钟的窗口

任务流图如下:

任务输出如下:

+I[2022-06-01 17:14:00, 2022-06-01 17:13:00, 2022-06-01 17:14:00, 29449, 9999]
+I[2022-06-01 17:15:00, 2022-06-01 17:14:00, 2022-06-01 17:15:00, 29787, 9999]
+I[2022-06-01 17:16:00, 2022-06-01 17:15:00, 2022-06-01 17:16:00, 29765, 9999]
+I[2022-06-01 17:17:00, 2022-06-01 17:16:00, 2022-06-01 17:17:00, 29148, 9999]
+I[2022-06-01 17:18:00, 2022-06-01 17:17:00, 2022-06-01 17:18:00, 30079, 9999]


.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5))

提前触发窗口

 TableConfig config = tenv.getConfig();
 config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",true);
 config.getConfiguration().setString("table.exec.emit.early-fire.delay","10s");  
        //每10秒钟触发一次输出

可以提前触发窗口的结果

任务同上,只是添加以上两个参数

任务输出结果如下:

2022-06-01 17:54:21,031 INFO  - add parameter to table config: table.exec.emit.early-fire.enabled = true2022-06-01 17:54:21,032 INFO  - add parameter to table config: table.exec.emit.early-fire.delay = 50002022-06-01 17:54:21,032 INFO  - add parameter to table config: pipeline.name = test_table_parameter


+I[2022-06-01 17:54:35, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500]
-U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500]
+U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 5027, 5000]
-U[2022-06-01 17:54:45, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 5027, 5000]
+U[2022-06-01 17:54:45, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 7527, 7500]
-U[2022-06-01 17:54:50, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 7527, 7500]
+U[2022-06-01 17:54:50, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 10079, 9999]
-U[2022-06-01 17:54:55, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 10079, 9999]
+U[2022-06-01 17:54:55, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 12579, 9999]
-U[2022-06-01 17:55:00, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 12579, 9999]
+U[2022-06-01 17:55:00, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 14579, 9999]


+I[2022-06-01 17:55:05, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 2500, 2500]
-U[2022-06-01 17:55:10, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 2500, 2500]
+U[2022-06-01 17:55:10, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 5299, 5000]
。。。忽略部分中间结果
-U[2022-06-01 17:55:55, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 25364, 9999]
+U[2022-06-01 17:55:55, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 27864, 9999]
-U[2022-06-01 17:56:00, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 27864, 9999]
+U[2022-06-01 17:56:00, 2022-06-01 17:55:00, 2022-06-01 17:56:00, 29867, 9999]


+I[2022-06-01 17:56:05, 2022-06-01 17:56:00, 2022-06-01 17:57:00, 2500, 2500]
-U[2022-06-01 17:56:10, 2022-06-01 17:56:00, 2022-06-01 17:57:00, 2500, 2500]

insert into user_log_sink_1select date_format(now(), 'yyyy-MM-dd HH:mm:ss')
     ,date_format(window_start, 'yyyy-MM-dd HH:mm:ss') AS wStart
     ,date_format(window_end, 'yyyy-MM-dd HH:mm:ss') AS wEnd
     ,count(user_id) pv
     ,count(distinct user_id) uvFROM TABLE(
      TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES )) t1gro

报错:

Exception in thread "main" org.apache.flink.table.api.TableException: Currently, window table function based aggregate doesn't support early-fire and late-fire configuration 'table.exec.emit.early-fire.enabled' and 'table.exec.emit.late-fire.enabled'.
  at org.apache.flink.table.planner.plan.utils.WindowUtil$.checkEmitConfiguration(WindowUtil.scala:262)
  at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLocalWindowAggregate.translateToExecNode(StreamPhysicalLocalWindowAggregate.scala:127)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
  at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
  at org.apache.flink.table.planner.plan.nod

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
互联网最坏的时代可能真的来了
我在B站读大学,大数据专业
我们在学习Flink的时候,到底在学习什么?
193篇文章暴揍Flink,这个合集你需要关注一下
Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
我们在学习Spark的时候,到底在学习什么?
在所有Spark模块中,我愿称SparkSQL为最强!
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半,社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

浏览 84
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报