老夫在之前的大数据计算引擎之Flink概述,大数据计算引擎之Flink流处理基础,里面也有提到过这个Flink里面的 Watermark的 一些事,但是也没具体详细的说。这里补充下

时间概念类型

对于流式数据处理,最大的特点就是数据上具有时间的属性特征,Flink根据时间产生位置不同,将时间分为三种时间概念,分别为事件生成时间 (Event Time),事件接入时间(Ingesion Time)和事件处理事件(Processing Time).
时间概念与Watermark
数据从终端产生,或者从系统产生的过程中生成的时间为事件接入时间,当数据经过消息中间件传入到Flink系统中,在DataSource中接入的时候会生成事件接入时间,当数据在Flink系统中经过各个算子实例执行转换操作的过程中,算子实例所在系统的时间为数据处理时间。下面我们分别来看这三种时间。

事件事件(Event Time)

事件事件是每个独立时间在产生它的设备上发生的时间,这个时间通常在进入Flink之前就已经嵌入到事件中,时间顺序取决于事件产生的地方,和下游数据处理系统的时间无关。事件数据具有不变的事件时间属性,该时间自事件元素产生就不会改变。通常情况下可以再Flink系统中指定时间属性或者设定时间提取器来提取事件时间。

所有进入到Flink流式系统处理的事件,其时间都是在外部系统中产生,经过网络进入到Flink系统内处理的,在理论情况下(所有系统都具有相同的系统时钟),事件时间对应的时间戳一定会早于在Flink系统处理的时间戳,但在实际情况中往往会出现数据记录乱序
延迟到达等问题。基于 Event Time 的时间概念,数据处理过程依赖于数据本身产生时间,而不是Flink系统中Operator所在主机节点的系统时间,这样能够借助于事件产生时间信息来还原事件的先后关系。

接入时间(Ingestion Time)

接入时间是事件进入Flink的时间。在Source Operator 所在主机的系统时钟,每个记录都将 Source 的当前时间作为时间戳记,并且基于时间的操作(如时间窗口)引用该时间戳记。

Ingestion Time 从概念上讲介于Event Time 和 Processing Time 之间。与处理时间相比 ,它稍微贵一些,但结果却更可预测。由于 Ingestion Time 使用稳定的时间戳(在源处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在处理时间中,每个窗口操作员都可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。

与 Event Time 相比,提取时间程序无法处理任何乱序事件或迟到的数据,但是程序不必指定如何生成水印。

在内部,Ingestion Time与 Event Time 非常相似,但是具有自动时间戳分配和自动水印生成功能。

处理时间(Processing Time)

处理时间(Processing Time)是指正在执行相应操作的机器的系统时间。当用户选择使用 Processing Time 时,所有和时间相关的计算算子,例如:Windows计算,在当前的任务中所有的算子将直接使用其所在主机的系统时间。 Processing Time是Flink系统中最简单的一种时间概念,基于Processing Time 时间概念,Flink的程序性能相对比较高,延时也相对比较低,对接入到系统中的数据相关的计算完全交给算子内部决定,时间窗口计算依赖的时间都是在具体算子运行的过程中产生,不需要做任何时间上的对比和协调。但 Processing Time 时间概念虽然在性能和易用性的角度上具有优势,但考虑到对数据乱序处理的情况, Processing Time 就不是最优的选择。同时在分布式系统中,数据本身不乱序,但每台机器的时间如果不同步,也可能导致数据处理过程中数据乱序的问题,从而影响计算结果。总之 Processing Time 概念适用于时间计算精度要求不是特别高的计算场景,例如:延时非常高的日志数据等。

当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应操作员的计算机的系统时钟。每小时处理时间窗口将包括系统时钟指示整小时的时间之间到达特定操作员的所有记录。例如,如果应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。上。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到记录到达系统(例如从消息队列)到达系统的速度,记录在系统内部操作员之间流动的速度的影响。 ,以及中断(计划的或其他方式)

时间概念的指定

Java版本

1
2
3
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Scala版本

1
2
3
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

Python

1
2
3
env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

时间指定

EventTime 和 Watermark

通常情况下由于网络或者系统等外部因素影响下,书剑数据往往不能及时传输至 Flink 系统中,导致系统的不稳定而造成数据乱序到达或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的进度。具体的来讲,在创建一个基于事件时间的Window 后,需要确定属于该Window的数据元素是否已经全部到达,确定后才可以对Window中的所有数据做计算处理,如果数据并没有全部到达,则继续等待窗口中的数据全部到达后再开始处理。在这种情况下,就需要用到水位线(Watermark)机制,它能够衡量数据处理进度(表达数据到达的完整性),保证事件数据全部到达 Flink 系统,即使数据乱序或者延迟到达,也能够像预期一样计算出正确和连续的结果。Flink会使用最新的事件时间减去固定时间间隔作为 Watermaters ,该时间间隔为用户外部配置的的支持最大延迟到达时间长度,也就是说不会有事件超过该时间间隔到达,否则就认为是迟到事件或者异常事件。例如:设定时间间隔为 5s ,算子会根据接入算子中最新事件的时间减去 5s 来更新其水位线时间戳,当窗口结束时间大于 Operator 水位线时间戳,且窗口中含有事件数据,则会立即出发窗口进行计算。总的来说:水位线的作用就是告知Operator在后面不会小于等于水位线时间戳的事件接入,满足条件既可以出发相应的窗口计算
顺序事件中的Watermark
Flink中衡量事件时间进度的机制是 Watermark 。如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生成,此时 Watermark 的变化和事件时间保持一致,也就是理想状态下的水位线。当所在算子实例的 Watermark 时间戳大于窗口结束时间,同时窗口中含有数据元素,此时便会触发对当前窗口的数据计算。
顺序时间中的Watermark
乱序事件中的 Watermark
显示情况下数据元素往往不是按照其产生顺序接入到Flink系统中的进行处理的,而频繁出现乱序或迟到的情况,这种情况下就需要用到 Wartermaks 来应对。如图所示: Event11 和 Event17 进入到系统中,Flink会根据设定的延时值分别计算出 Watermark W(11) 和 Watermark W(17) ,这两个Watermark到达一个Operator后便立即调整算子基于事件时间的虚拟时间与当前的 Watermark 相匹配,然后再触发相应的计算以及输出操作。
乱序事件中的Watermark
并行数据流中的 Watermark
Watermark 在 Source Operator 中生成,并且在每个 Source Operator 的子 Task 中都会独立生成 Watermark .在Source Operator 的子任务中生成后就会更新该 Task 的Watermark ,且会逐步更新下游算子中的 Watermark 水位线,随后一直保持在该并发之中,知道下一次 Watermark 的生成,并对前面的 Watermark 进行覆盖。如图所示:W(17) 水位线已经将 Source 和 Map 算子中的子任务的时间全部更新到 17 , 并且会一直随着事件向后移动更新下游算子中的事件时间。如果多个 Watermark 同时更新一个算子Task的当前事件时间,Flink会选择最小的水位线来更新。当一个 Window 算子 Task 中水位线大于了 Window 结束时间,就会立即触发窗口计算。
Flink并行数据流中的Watermark

Generating Timestamps / Watermarks

代码层

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

时间戳的分配

为了使用 Event Time , Flink 需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过从元素的某个字段 访问/提取 时间戳来完成的。
时间戳分配与生成 Watermark 齐头并进,Watermark 告诉系统事件时间的进展。
有两种分配时间戳和生成 Watermark 的方法:

  • 在DataStream Source 算子接口的 Source Function 中定义
  • 自定义 Timestamp Assigner 和 Watermark Generator生成

在Source Function直接定义Timestamp和Watermark
在 DataStream Source 算子中指定 EventTime Timestamps, 也就是说在数据进入到 Flink 系统中就直接分配 EventTime 和 Watermark 。用户可需要复写SourceFunction接口中的run() 方法实现数据的生成逻辑,同时需要 SourceContext() 的 collectWithTimestamp() 方法生成 EventTime 时间戳,调用 emitWatermark() 方法生成 Watermarks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input = List(("a", 1L, 1), ...... )
val source: DataStream[(String, Long, Int)] = environment.addSource(
new SourceFunction[(String, Long, Int)] {
// 复写 run 方法, 调用 SourceFunction 接口
override def run(ctx: SourceFunction.SourceContext[(String, Long, Int)]): Unit = {
input.foreach(
in => {
// 调用collectWithTimestamp , 增加 Event Time 抽取
ctx.collectWithTimestamp(in, in._2)
// 调用 emitWatermark ,创建 Watermark ,最大延迟设置为 1
ctx.emitWatermark(new Watermark(in._2 - 1))
}
)
// 设定默认的 Watermark
ctx.emitWatermark(new Watermark(Long.MaxValue))
}
override def cancel(): Unit = {}
}
)

通过Flink提供的 Timestamp Assigner 指定Timestamp 和生成 Watermark
若用户使用了Flink已经定义的外部数据源连接器,就不能实现 SourceFunction 接口来生成流式数据以及相应的 Event Time 和 Watermark , 这种时候,就需要借助 Timestamp Assigner 来管理数据流中的 Timastamp 元素和 Watermark.

1
2
3
4
// 使用系统默认的 Ascending 分配时间信息 和 Watermark
val res1: DataStream[(String, Long, Int)] = value
.assignAscendingTimestamps(t => t._2).keyBy(0).timeWindow(Time.seconds(10)).sum("_2")
res1.print()

自定义Timestamp Assigner 和 Watgermark Generator
前面使用的都是已经定义好了的 Timestamp Watermark, 用户也可以自定义实现 AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks 两个接口来实现和生成 Watermarks. 在定义和实现 AssignerWithPeriodicWatermarks 接口前,需要先在 ExecutionConfig 中调用 setAutoWatermarkInterval() 方法设定 Watermarks 产生的时间周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val value: DataStream[(String, Long, Int)] = environment.fromCollection(input)
value.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long, Int)] {
var currentTimestamp: Long = _
val maxOfOutTime = 1000L
override def getCurrentWatermark: Watermark = {
new Watermark(currentTimestamp - maxOfOutTime)
}

override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
val timestamp = element._2
currentTimestamp = Math.max(timestamp, currentTimestamp)
currentTimestamp
}
})

以上源程序可参考:FlinkWatermarkTest