我们知道,Flink使用WaterMark来对"迟到"的数据进行处理。"迟到"是只有EventTime事件时间处理时才会出现的概念,它是指由于网络延迟或其它原因,导致 进入系统处理的事件的顺序并不是事件真正发生的顺序,而是发生了改变。但是这种"迟到"是很难提前预知的,为了能够准确地表达事件时间的处理进度,就必须用 到水印。在Flink中,水印是一种特殊的元素,每个水印都携带有一个时间戳,当时间戳为T的水印出现时,表示事件时间t<=T的数据都已经到达,即水印后面应该 只能流入事件时间t>T的数据。所以,水印再结合窗口,可以在一定程度上保证数据处理的顺序性(窗口之间有序,窗口内部不一定严格有序)。
Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,这个方法接受的参数类型有AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点水印。
周期性水印使用AssignerWithPeriodicWatermarks产生,默认周期为200ms,也能通过ExecutionConfig.setAutoWatermarkInterval()方法来指定新的 周期。正常的流程上来讲,我们需要先通过实现extractTimestamp()方法来抽取事件时间,然后实现getCurrentWatermark()方法产生水印,不过Flink已经 默认提供了3种内置的实现类,我们可以直接使用来抽取事件事件并产生水印:
-
AscendingTimestampExtractor:它产生的时间戳和水印必须是单调非递减的,我们通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产 生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有三种方式进行处理:IgnoringHandler直接忽略、FailingHandler抛出 RuntimeException、LoggingHandler打印警告日志(默认行为)。一般来说单调递增的事件时间并符合现实中的情况,所以用得不是不多;
-
BoundedOutOfOrdernessTimestampExtractor:它产生的时间戳和水印允许一定的乱序,构造它时传入的参数maxOutOfOrderness就是允许乱序区间的 长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印时间往前调一些。乱序区间的长度需要根据实际 情况谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,导致实时性变弱;
-
IngestionTimeExtractor:它基于系统当前时钟生成时间戳和水印,其实就是IngestionTime摄入时间;
另一种打点水印比周期性水印的应用要少很多,Flink内也没有默认实现,一般适用于需要依赖于事件本身的某些属性决定是否发射水印的情况,使用它需要实现 checkAndGetNextWatermark()方法来产生水印,并且产生的时机完全由用户控制。
不管是使用周期性水印,还是使用打点水印,都需要注意不能过于频繁。因为Watermark对象是会全部流向下游的,并在整个DAG图中流动,也会实打实地占用内存, 过多的水印会造成系统性能下降。另外,水印的生成要尽量早,一般建议在接入Source之后就产生,或者在Source经过简单的变换(如,map、filter等)之后产生。
此外,水印的乱序区间只能够保证一部分迟到数据不被丢弃,但是乱序区间往往不会太长(影响实时性),那么对于那些真正迟到了的数据该怎么办呢?Flink提供了 WindowedStream.allowedLateness()方法来设定窗口允许的最大延迟。正常情况下,窗口触发计算完成之后就会被销毁,但是如果设定了允许延迟后,窗口会 等待allowedLateness的时长后再销毁。在此期间迟到数据仍然可以进入窗口中,并触发新的计算。当然,由于窗口非常耗费内存和CPU资源,所以allowedLateness 的值也要进行仔细斟酌。那么如果在允许延迟后到来的数据该怎么办呢?此时窗口已经被销毁了,肯定也无法触发窗口的计算了,这个时候就要介绍侧输出这个概念了。 侧输出(side output)是Flink的分流机制,迟到数据本身也可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到 指定OutputTag的侧输出流里去,再进行下一步处理(比如,存到外部存储或消息队列等)。