Windows(窗口)是处理无限数据流的核心。窗口将流分解成有限大小的”桶”,在上面可以进行各种计算。本文将重点介绍 Flink 中的窗口,以及常见的窗口类型。
一个窗口处理的 Flink 程序一般结构如下:
(1) Keyed Windows:
stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function"
(2) Non-Keyed Windows:
stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function"
Keyed Windows 在 Keyed 数据流上使用,Non-Keyed Windows 在非 Keyed 数据流上使用。可以看出,唯一的区别是是否调用了 keyBy()
方法以及 Keyed Windows 使用 window()
方法,Non-Keyed Windows 使用 windowAll()
方法。
在上面,方括号 [...]
中的命令是可选的。这表明 Flink 允许你可以以多种不同的方式自定义你的窗口逻辑,以便更好的满足你的需求。
1. 窗口生命周期
一旦属于这个窗口的第一个元素到达,就会创建该窗口,当时间(事件时间或处理时间)到达规定结束时间加上用户指定的可允许延迟的时间后,窗口将会被删除。举个例子,使用基于事件时间的窗口策略,每隔5分钟创建一个滚动窗口,并且允许可以有1分钟的延迟时间。当第一个带有时间戳的元素位于 12:00 至 12:05 之间时,Flink 创建一个 12:00 至 12:05 的新窗口,当时间戳到达 12:06 时,窗口将被删除。Flink 仅保证对基于时间的窗口进行删除,并不适用于其他类型的窗口,例如,全局窗口。
除此之外,每个窗口都有一个触发器(Trigger)和一个函数(例如 WindowFunction
, ReduceFunction
或 FoldFunction
)。函数用于窗口的计算,而触发器决定了窗口什么时候调用该函数。触发策略可能类似于”当窗口中元素个数大于4时” 或 “当 watermark
到达窗口末尾时”。触发器还可以决定在什么时候清除窗口内容(创建窗口以及删除窗口之间的任何时间点)。在这里,清除仅指清除窗口中的元素,而不是窗口(窗口元数据)。这意味着新数据仍然可以添加到窗口中。
除此之外,你还可以指定一个 Evictor 来删除窗口中的元素(在触发器触发之后以及在使用该函数之前或之后)。
2. Keyed vs Non-Keyed Windows
使用窗口我们要做的第一件事就是为你的数据流指定 key,必须在定义窗口之前完成。需要调用 keyBy()
方法将无限数据流拆分成 Keyed 数据流。在 Keyed 数据流上,事件的任何属性都可以用作 key,如何指定 key 可以参阅 (Flink定义keys的几种方法)。我们还可以允许通过多个并发任务来执行窗口计算,因为每个逻辑 Keyed 数据流可以独立于其它进行。有相同 key 的所有元素将被发送到相同的并发任务上。在非 Keyed 数据流中,原始数据流不会被拆分成多个逻辑 Keyd 数据流,并且所有窗口逻辑将由单个任务执行,即并行度为1。
3. 窗口分配器
在确定数据流是否指定 key 之后,下一步就是定义窗口分配器(WindowAssigners)。窗口分配器定义了元素如何分配给窗口,即指定元素分配给哪个窗口。可以通过在 window()
(Keyed 数据流)或 windowAll()
(非 Keyed 数据流) 中指定你选择的窗口分配器来完成。
窗口分配器负责将每个传入的元素分配给一个或多个窗口。Flink 内置了一些用于解决常见问题的窗口分配器,例如,滚动窗口,滑动窗口,会话窗口以及全局窗口等。你还可以通过继承 WindowAssigner
类实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都会根据时间将元素分配给窗口,可以是处理时间,也可以是事件时间。
请参阅 Flink事件时间与处理时间 ,了解处理时间和事件时间之间的差异以及如何生成时间戳和 watermarks
。
基于时间的窗口会有开始时间戳(闭区间)和结束时间戳(开区间),它们共同描述了窗口的大小。在代码中,Flink 在使用基于时间的窗口时使用 TimeWindow,该窗口具有用于查询开始和结束时间戳的方法,以及用于返回给定窗口的最大允许时间戳的 maxTimestamp() 方法。
在下文中,我们将展示 Flink 的内置窗口分配器的工作原理以及它们在 DataStream 程序中的使用方式。下面分配器运行图中,紫色圆圈表示数据流中的元素,根据某些 key 进行分区(在我们这个例子中为 user1,user2 和 user3),x轴显示时间进度。
3.1 滚动窗口
滚动窗口分配器将每个元素分配给固定大小的窗口。滚动窗口有固定的大小且不重叠。例如,如果指定大小为5分钟的滚动窗口,每五分钟都会启动一个新窗口,如下图所示:
以下代码显示如何使用滚动窗口:
Java版本:
DataStream<T> input = ...; // 基于事件时间的滚动窗口 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 基于处理时间的滚动窗口 input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 基于事件时间的每日滚动窗口会-8小时的偏移。 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
Scala版本:
val input: DataStream[T] = ... // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
也可以通过使用 Time.milliseconds(x)
, Time.seconds(x)
, Time.minutes(x)
来指定时间间隔。
如上面例子中所示,滚动窗口分配器还可以使用一个可选的偏移量参数,用来改变窗口的对齐方式。例如,没有偏移量的情况下,窗口大小为1小时的滚动窗口与 epoch
(指的是一个特定的时间: 1970-01-01 00:00:00 UTC
)对齐,那么你将获得如 1:00:00.000 - 1:59:59.999
, 2:00:00.000 - 2:59:59.999
之类的窗口。如果你想改变,可以给一个偏移量。以15分钟的偏移量为例,那么你将获得 1:15:00.000 - 2:14:59.999
, 2:15:00.000 - 3:14:59.999
之类的窗口。偏移量的一个重要应用是将窗口调整为 timezones
而不是 UTC-0
。例如,在中国,你必须指定 Time.hours(-8)
的偏移量。
3.2 滑动窗口
滑动窗口分配器将每个元素分配给固定窗口大小的窗口。与滚动窗口分配器类似,窗口的大小由 window size
参数配置。还有一个 window slide
参数用来控制滑动窗口的滑动大小。因此,如果滑动大小小于窗口大小,则滑动窗口会重叠。在这种情况下,一个元素会被分配到多个窗口中。
例如,窗口大小为10分钟,滑动大小为5分钟的窗口。这样,每5分钟会生成一个窗口,每个窗口包含最后10分钟内到达的事件,如下图所示:
Java版本:
DataStream<T> input = ...; // 基于事件时间的滑动窗口 input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 基于处理时间的滑动窗口 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>); // 基于处理时间的滑动窗口 偏移量-8 input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>);
Scala版本:
val input: DataStream[T] = ... // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // sliding processing-time windows input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) // sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>)
3.3 会话窗口
会话窗口分配器通过活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时会关闭。会话窗口分配器需要配置一个会话间隙,定义了所需的不活动时长。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
Java版本:
DataStream<T> input = ...; // 基于事件时间的会话窗口 input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // 基于处理时间的会话窗口 input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>);
Scala版本:
val input: DataStream[T] = ... // event-time session windows input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) // processing-time session windows input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>)
由于会话窗口没有固定的开始时间和结束时间,因此它们的执行与滚动窗口和滑动窗口不同。在内部,会话窗口算子为每个到达记录创建一个新窗口,如果它们之间的距离比定义的间隙要小,那么窗口会合并在一起。为了能合并,会话窗口算子需要一个合并触发器和合并窗口函数,例如,ReduceFunction 、AggregateFunction 或 ProcessWindowFunction。
3.4 全局窗口
全局窗口分配器将具有相同 key 的所有元素分配给同一个全局窗口。仅当我们指定自定义触发器时,窗口才起作用。否则,不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束的点(译者注:即本身自己不知道窗口的大小,计算多长时间的元素)。
Java版本:
DataStream<T> input = ...; input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>);
Scala版本:
val input: DataStream[T] = ... input .keyBy(<key selector>) .window(GlobalWindows.create()) .<windowed transformation>(<window function>)
欢迎关注我的公众号和博客:
原文:Windows