Flink 自定义触发器实现带超时时间的 CountWindow
点击上方蓝色字体,选择“设为星标”

Flink 的 window 有两个基本款,TimeWindow 和 CountWindow。
TimeWindow 是到时间就触发窗口,CountWindow 是到数量就触发。
如果我需要到时间就触发,并且到时间之前如果已经积累了足够数量的数据;或者在限定时间内没有积累足够数量的数据,我依然希望触发窗口业务,那么就需要自定义触发器。
import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.common.state.ReducingState;import org.apache.flink.api.common.state.ReducingStateDescriptor;import org.apache.flink.api.common.typeutils.base.LongSerializer;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.windowing.triggers.Trigger;import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/*** 带超时的计数窗口触发器*/public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);/*** 窗口最大数据量*/private int maxCount;/*** event time / process time*/private TimeCharacteristic timeType;/*** 用于储存窗口当前数据量的状态对象*/private ReducingStateDescriptorcountStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {this.maxCount = maxCount;this.timeType = timeType;}private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {clear(window, ctx);return TriggerResult.FIRE_AND_PURGE;}public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ReducingStatecountState = ctx.getPartitionedState(countStateDescriptor); countState.add(1L);if (countState.get() >= maxCount) {LOG.info("fire with count: " + countState.get());return fireAndPurge(window, ctx);}if (timestamp >= window.getEnd()) {LOG.info("fire with tiem: " + timestamp);return fireAndPurge(window, ctx);} else {return TriggerResult.CONTINUE;}}public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.ProcessingTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with process tiem: " + time);return fireAndPurge(window, ctx);}}public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (timeType != TimeCharacteristic.EventTime) {return TriggerResult.CONTINUE;}if (time >= window.getEnd()) {return TriggerResult.CONTINUE;} else {LOG.info("fire with event tiem: " + time);return fireAndPurge(window, ctx);}}public void clear(TimeWindow window, TriggerContext ctx) throws Exception {ReducingStatecountState = ctx.getPartitionedState(countStateDescriptor); countState.clear();}/*** 计数方法*/class Sum implements ReduceFunction<Long> {public Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}}
使用示例(超时时间 10 秒,数据量上限 1000):
stream.timeWindowAll(Time.seconds(10)).trigger(new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime)).process(new XxxxWindowProcessFunction()).addSink(new XxxSinkFunction()).name("Xxx");
即可。
文章不错?点个【在看】吧! ?
评论
