正如在上一节 timestamp 与 watermark 处理中提到的,Flink 提供了抽象类来让开发者给自己的 timestamps(时间戳)赋值并发送他们自己的watermark(水印)。更确切地说,开发者需要依照不同的使用场景来实现接口 AssignerWithPeriodicWatermarks
或接口 AssignerWithPunctuatedWatermarks
。简而言之,前一个接口将会周期性发送 watermark,而第二个接口则根据到达数据的一些属性发送 watermark,例如:一旦在流中碰到一个特殊的元素, 便发送 watermark。
为了进一步简化开发者开发类似的任务,Flink自带了一些预先实现的时间戳分配器(timestamp assigners)。本节列举了关于这些时间戳分配器的相关内容。除了开箱即用的函数,这些预先实现的分配器还可以作为自定义分配器的示例。
周期性的 watermark 生成有一种最简单的特殊情况–给定源任务锁定的 timestamp 呈升序状态。在这种情况下,由于没有更早的 timestamp 出现,当前 timestamp 可以一直扮演着 watermark 的角色。
请注意:上述的情况只有在 timestamp 是递增的并行数据源任务时才是必要的。例如:在一个特定设置中,一个 Kafka 分区是由一个并行数据源实例读取的,那么 timestamp 只需要在每个 Kafka 分区中增加即可。每当并行流被关闭、统一、链接或是合并的时候,Flink的 watermark 合并机制都会产生正确的 watermark。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
周期性 watermark 产生的另外一种情况是在当 watermark 滞后于流中的一个固定时间段内观察到的最大(即 event-time)时间戳。该情况包括预先知道在流中将会遇到的最大 lateness (延迟)的情况,例如创建一个测试用的自定义 source 时,它的element的时间戳会分布在一个固定的时间段内。对于这些情况,FLink 提供了 BoundedOutOfOrdernessTimestampExtractor
作为 maxOutOfOrderness
的一个参数。即在一个 element(元素)被给定窗口,在计算最终结果忽略之前(即该element过期前),所允许该 element 迟到的最大 lateness(延迟)。lateness 与 t-t_w
(t 减 t_w,译者注)相对应,其中 t
指代元素的 timestamp (event-time) ,而 t_w
则指代先前的 watermark。如果 lateness>0
,则可认为此时该 element 延迟,而且在默认情况下,当计算相应窗口结果时,该 element 会被忽略掉。想了解更多关于使用延迟元素的知识,请参阅 allowed lateness的相关文档。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))