Flink窗口如何使用

Windows(窗口)是处理无限数据流的核心。窗口将流分解成有限大小的”桶”,在上面可以进行各种计算。本文将重点介绍 Flink 中的窗口,以及常见的窗口类型。

一个窗口处理的 Flink 程序一般结构如下:

(1) Keyed Windows:

stream
.keyBy(...)          <-  keyed versus non-keyed windows
.window(...)         <-  required: "assigner"
[.trigger(...)]       <-  optional: "trigger" (else default trigger)
[.evictor(...)]       <-  optional: "evictor" (else no evictor)
[.allowedLateness()]  <-  optional, else zero
.reduce/fold/apply() <-  required: "function"

(2) Non-Keyed Windows:

stream
.windowAll(...)      <-  required: "assigner"
[.trigger(...)]       <-  optional: "trigger" (else default trigger)
[.evictor(...)]       <-  optional: "evictor" (else no evictor)
[.allowedLateness()]  <-  optional, else zero
.reduce/fold/apply() <-  required: "function"

Keyed Windows 在 Keyed 数据流上使用,Non-Keyed Windows 在非 Keyed 数据流上使用。可以看出,唯一的区别是是否调用了 keyBy() 方法以及 Keyed Windows 使用  window() 方法,Non-Keyed Windows 使用  windowAll() 方法。

在上面,方括号 [...] 中的命令是可选的。这表明 Flink 允许你可以以多种不同的方式自定义你的窗口逻辑,以便更好的满足你的需求。

1. 窗口生命周期

一旦属于这个窗口的第一个元素到达,就会创建该窗口,当时间(事件时间或处理时间)到达规定结束时间加上用户指定的可允许延迟的时间后,窗口将会被删除。举个例子,使用基于事件时间的窗口策略,每隔5分钟创建一个滚动窗口,并且允许可以有1分钟的延迟时间。当第一个带有时间戳的元素位于 12:00 至 12:05 之间时,Flink 创建一个 12:00 至 12:05 的新窗口,当时间戳到达 12:06 时,窗口将被删除。Flink 仅保证对基于时间的窗口进行删除,并不适用于其他类型的窗口,例如,全局窗口。

除此之外,每个窗口都有一个触发器(Trigger)和一个函数(例如 WindowFunction ,  ReduceFunction 或  FoldFunction )。函数用于窗口的计算,而触发器决定了窗口什么时候调用该函数。触发策略可能类似于”当窗口中元素个数大于4时” 或 “当  watermark 到达窗口末尾时”。触发器还可以决定在什么时候清除窗口内容(创建窗口以及删除窗口之间的任何时间点)。在这里,清除仅指清除窗口中的元素,而不是窗口(窗口元数据)。这意味着新数据仍然可以添加到窗口中。

除此之外,你还可以指定一个 Evictor 来删除窗口中的元素(在触发器触发之后以及在使用该函数之前或之后)。

2. Keyed vs Non-Keyed Windows

使用窗口我们要做的第一件事就是为你的数据流指定 key,必须在定义窗口之前完成。需要调用 keyBy() 方法将无限数据流拆分成 Keyed 数据流。在 Keyed 数据流上,事件的任何属性都可以用作 key,如何指定 key 可以参阅 (Flink定义keys的几种方法)。我们还可以允许通过多个并发任务来执行窗口计算,因为每个逻辑 Keyed 数据流可以独立于其它进行。有相同 key 的所有元素将被发送到相同的并发任务上。在非 Keyed 数据流中,原始数据流不会被拆分成多个逻辑 Keyd 数据流,并且所有窗口逻辑将由单个任务执行,即并行度为1。

3. 窗口分配器

在确定数据流是否指定 key 之后,下一步就是定义窗口分配器(WindowAssigners)。窗口分配器定义了元素如何分配给窗口,即指定元素分配给哪个窗口。可以通过在 window() (Keyed 数据流)或  windowAll() (非 Keyed 数据流) 中指定你选择的窗口分配器来完成。

窗口分配器负责将每个传入的元素分配给一个或多个窗口。Flink 内置了一些用于解决常见问题的窗口分配器,例如,滚动窗口,滑动窗口,会话窗口以及全局窗口等。你还可以通过继承 WindowAssigner 类实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都会根据时间将元素分配给窗口,可以是处理时间,也可以是事件时间。

请参阅 Flink事件时间与处理时间 ,了解处理时间和事件时间之间的差异以及如何生成时间戳和 watermarks

基于时间的窗口会有开始时间戳(闭区间)和结束时间戳(开区间),它们共同描述了窗口的大小。在代码中,Flink 在使用基于时间的窗口时使用 TimeWindow,该窗口具有用于查询开始和结束时间戳的方法,以及用于返回给定窗口的最大允许时间戳的 maxTimestamp() 方法。

在下文中,我们将展示 Flink 的内置窗口分配器的工作原理以及它们在 DataStream 程序中的使用方式。下面分配器运行图中,紫色圆圈表示数据流中的元素,根据某些 key 进行分区(在我们这个例子中为 user1,user2 和 user3),x轴显示时间进度。

3.1 滚动窗口

滚动窗口分配器将每个元素分配给固定大小的窗口。滚动窗口有固定的大小且不重叠。例如,如果指定大小为5分钟的滚动窗口,每五分钟都会启动一个新窗口,如下图所示:

以下代码显示如何使用滚动窗口:

Java版本:

DataStream<T> input = ...;
// 基于事件时间的滚动窗口
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 基于处理时间的滚动窗口
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 基于事件时间的每日滚动窗口会-8小时的偏移。
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)

也可以通过使用 Time.milliseconds(x) ,  Time.seconds(x) ,  Time.minutes(x) 来指定时间间隔。

如上面例子中所示,滚动窗口分配器还可以使用一个可选的偏移量参数,用来改变窗口的对齐方式。例如,没有偏移量的情况下,窗口大小为1小时的滚动窗口与 epoch (指的是一个特定的时间: 1970-01-01 00:00:00 UTC )对齐,那么你将获得如 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999 之类的窗口。如果你想改变,可以给一个偏移量。以15分钟的偏移量为例,那么你将获得 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999 之类的窗口。偏移量的一个重要应用是将窗口调整为  timezones 而不是  UTC-0 。例如,在中国,你必须指定  Time.hours(-8) 的偏移量。

3.2 滑动窗口

滑动窗口分配器将每个元素分配给固定窗口大小的窗口。与滚动窗口分配器类似,窗口的大小由 window size 参数配置。还有一个 window slide 参数用来控制滑动窗口的滑动大小。因此,如果滑动大小小于窗口大小,则滑动窗口会重叠。在这种情况下,一个元素会被分配到多个窗口中。

例如,窗口大小为10分钟,滑动大小为5分钟的窗口。这样,每5分钟会生成一个窗口,每个窗口包含最后10分钟内到达的事件,如下图所示:

Java版本:

DataStream<T> input = ...;
// 基于事件时间的滑动窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 基于处理时间的滑动窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 基于处理时间的滑动窗口 偏移量-8
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)

3.3 会话窗口

会话窗口分配器通过活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时会关闭。会话窗口分配器需要配置一个会话间隙,定义了所需的不活动时长。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

Java版本:

DataStream<T> input = ...;
// 基于事件时间的会话窗口
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 基于处理时间的会话窗口
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...
// event-time session windows
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)

由于会话窗口没有固定的开始时间和结束时间,因此它们的执行与滚动窗口和滑动窗口不同。在内部,会话窗口算子为每个到达记录创建一个新窗口,如果它们之间的距离比定义的间隙要小,那么窗口会合并在一起。为了能合并,会话窗口算子需要一个合并触发器和合并窗口函数,例如,ReduceFunction 、AggregateFunction 或 ProcessWindowFunction。

3.4 全局窗口

全局窗口分配器将具有相同 key 的所有元素分配给同一个全局窗口。仅当我们指定自定义触发器时,窗口才起作用。否则,不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束的点(译者注:即本身自己不知道窗口的大小,计算多长时间的元素)。

Java版本:

DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)

欢迎关注我的公众号和博客:

原文:Windows

大数据生态
我还没有学会写个人说明!
上一篇

javaweb练手项目jsp+servlet简易购物车系统

下一篇

攻击下泄漏隐私信息!GPT-2:当时我害怕极了

你也可能喜欢

评论已经被关闭。

插入图片