flink watermarks,flink水位线作用
墨初 知识笔记 78阅读
文章目录 1、水位线传递2、水位线设置空闲等待3、迟到数据处理窗口允许迟到4、迟到数据处理侧流输出5、问
1、水位线传递

上游task处理完水位线时钟改变后要把数据和当前水位线继续往下游算子的task发送。当一个任务接收到多个上游并行任务传递来的水位线时以最小的那个作为当前任务的事件时钟。如图上游算子并行度为4
- 第一波的2.4.3.6传递到下游task取2- 其中一个上游task的数据4到了传递到下游4.4.3.6此时水位线被更新为最小的3- 其中一个上游task的7到了下游task为4.7.3.6最小仍为3不更新- 上游task的6到下游下游为4.7.6.6最小为4水位线再更新
总结

使用上篇的乱序流来查看水位线的传递这次把并行度不能再是1设置为2
public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); SingleOutputStreamOperator<WaterSensor> sensorDS env .socketTextStream(node01, 9527) .map(new WaterSensorMapFunction()); // TODO 1.定义Watermark策略 WatermarkStrategy<WaterSensor> watermarkStrategy WatermarkStrategy // 1.1 指定watermark生成乱序的等待3s .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 1.2 指定 时间戳分配器从数据中提取 .withTimestampAssigner( (element, recordTimestamp) -> { // 返回的时间戳要 毫秒 System.out.println(数据 element ,recordTs recordTimestamp); return element.getTs() * 1000L; }); // TODO 2. 指定 watermark策略 SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) // TODO 3.使用 事件时间语义 的窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs context.window().getStart(); long endTs context.window().getEnd(); String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS); String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS); long count elements.spliterator().estimateSize(); out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据> elements.toString()); } } ) .print(); env.execute(); }}
执行
画个示意图
结合上图上面是并行度为2数据进来了会轮询到两个上游task如果此时一个上游task一直没有数据进来而当前Task是以最小的那个作为当前任务的事件时钟就会导致下游接收的Task时钟一直为起始值而无法推进进而导致窗口无法触发。
public class WatermarkIdlenessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); // MyPartitioner是自定义分区器数据%分区数只输入奇数都只会去往map的一个子任务余数总为10.1两个map的task总去1 SingleOutputStreamOperator<Integer> socketDS env .socketTextStream(hadoop102, 7777) .partitionCustom(new MyPartitioner(), r -> r) .map(r -> Integer.parseInt(r)) .assignTimestampsAndWatermarks( WatermarkStrategy .<Integer>forMonotonousTimestamps() .withTimestampAssigner((r, ts) -> r * 1000L) ); // 分成两组 奇数一组偶数一组 开10s的事件时间滚动窗口 socketDS .keyBy(r -> r % 2) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() { Override public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception { long startTs context.window().getStart(); long endTs context.window().getEnd(); String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS); String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS); long count elements.spliterator().estimateSize(); out.collect(key integer 的窗口[ windowStart , windowEnd )包含 count 条数据> elements.toString()); } }) .print(); env.execute(); }}
运行
分析以上demo中为了实现数据总流向一个子task用了自定义分区器
.partitionCustom(new MyPartitioner(), r -> r)
以输出数据为keykey除以并行度2区域为分区逻辑如果我一直输入奇数分区值就一直为1就可以实现数据只流向其中一个子task。流向下游算子时一个task始终没数据导致取小的时候一直取到了没数据的原始time时钟无法更新窗口无法触发。此时就需要设置最大空闲时间太久没数据来时就不让它参与比较。
.withIdleness(Duration.ofSeconds(5)) //空闲等待5s
此时输入到9时已到5s时间不再比较另一个没数据的task11一进来立马触发窗口
3、迟到数据处理窗口允许迟到前面为了解决乱序流提出了延迟的概念
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3));
以上即窗口延迟触发3秒即让水位线的推进值 当前值 - 3以便争取为乱序数据更多的时间进入窗口。但当延迟完成窗口触发计算和关闭后再来的属于已关闭窗口的数据就不会被统计在内了这些数据也成为迟到数据。本来8.30上课老师等等家远的学生说8.40开始讲课结果你却9.00才到那就门口站着取别听了类比数据不会再被对应窗口统计
Flink窗口允许迟到数据即触发窗口后会先计算当前结果但不关闭窗口触发计算和关窗是两个动作。 以后每来一条迟到数据就触发一次这条数据所在窗口的增量计算。直到水位线被推进到了窗口结束时间 推迟时间。
注意区分延迟和推迟延迟是老师等你到8.40上课触发计算时间延长了推迟则是8.40课开始上了触发计算了但教室门不关你在开始上课后开始上课类比触发计算10分钟的铃声没响之前类比推迟时间为10分钟能到的话你依旧可以进教室听课。如果过了推迟时间你仍没有到那就窗口关闭教室关门你去网吧游荡吧。总结就是
延迟时间操作的是触发计算的时间用来处理乱序问题推迟时间操作的是触发关窗的时间用来处理迟到数据.window(TumblingEventTimeWindows.of(Time.seconds(10))) //窗口10s.allowedLateness(Time.seconds(3)) //触发关窗延迟3秒
还是乱序流的例子多一个allowedLateness
public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS env .socketTextStream(node01, 9527) .map(new WaterSensorMapFunction()); // TODO 1.定义Watermark策略 WatermarkStrategy<WaterSensor> watermarkStrategy WatermarkStrategy // 1.1 指定watermark生成乱序的等待3s .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 1.2 指定 时间戳分配器从数据中提取 .withTimestampAssigner( (element, recordTimestamp) -> { // 返回的时间戳要 毫秒 return element.getTs() * 1000L; }); // TODO 2. 指定 watermark策略 SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) // TODO 3.使用 事件时间语义 的窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(3)) .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs context.window().getStart(); long endTs context.window().getEnd(); String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS); String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS); long count elements.spliterator().estimateSize(); out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据> elements.toString()); } } ) .print(); env.execute(); }}
此时窗口为10s延迟3s触发计算窗口结束时间 推迟时间才触发关闭即水位线到达10313s时才触发关窗。在水位线未被推到13前对于迟到的数据会再次触发计算且是来一条触发一次计算。关窗后再来迟到数据就在不管了不会触发计算。
这也和前面整理的窗口生命周期对上了计算和关窗实际是两个动作窗口销毁的时机关窗是在时间进展 > 窗口最大时间戳end-1ms) 允许迟到时间默认0
在上面的延迟关窗与允许迟到的基础上肯定还是不能囊括所有数据因为乱序程度理论上可以无限大如上的例子对于等了10分钟才开课且到了关教室门的时间还没到的学生让去网吧游荡也不合理类比流中直接丢弃这个数据可以考虑把严重迟到的学生领到保安室对应到Flink那就是把乱序极大的数据使用侧流输出。关键代码
OutputTag<WaterSensor> lateTag new OutputTag<>(late-data, Types.POJO(WaterSensor.class)); //侧流Tag对象
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateTag) //迟到数据侧流输出
//主流process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);
完整demo
public class WatermarkLateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS env .socketTextStream(node01, 9527) .map(new WaterSensorMapFunction()); WatermarkStrategy<WaterSensor> watermarkStrategy WatermarkStrategy .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L); SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy); OutputTag<WaterSensor> lateTag new OutputTag<>(late-data, Types.POJO(WaterSensor.class)); SingleOutputStreamOperator<String> process sensorDSwithWatermark.keyBy(sensor -> sensor.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 推迟2s关窗 .sideOutputLateData(lateTag) // 关窗后的迟到数据放入侧输出流 .process( new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() { Override public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception { long startTs context.window().getStart(); long endTs context.window().getEnd(); String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS); String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS); long count elements.spliterator().estimateSize(); out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据> elements.toString()); } } ); process.print(); // 从主流获取侧输出流打印 process.getSideOutput(lateTag).printToErr(关窗后的迟到数据); env.execute(); }}
执行
5、问如果watermark设置延时等待3s窗口允许迟到2s为什么不直接延时等待5s
答
首先延时时间不能设置太大因为这会导致计算延迟太大失去结果的实时性其次窗口允许迟到是对迟到数据的补偿处理尽量让结果准确修正结果的因此一般延时时间不设置一个较大的值常为秒级而允许迟到时间则可以用来处理大部分迟到数据极端迟到的数据可使用侧流输出获取后再做对应的处理