Flink Window

简介

Window 即窗口操作把一个数据集切分为有限的数据片以便于聚合处理。当面对无边界的数据时,有些操作需要窗口(以定义大多数聚合操作需要的边界:汇总,外链接,以时间区域定义的操作;如最近 5 分钟 xx等)。 另一些则不需要(如过滤,映射,内链接等)。对有边界的数据,窗口是可选的,不过很多情况下仍然是一种有效的语义概念(如回填一大批的更新数据到之前读取无边界数据源处理过的数据,译者注:类似于 Lambda 架构)。 窗口基本上都是基于时间的;不过也有些系统支持基于记录数的窗口。这种窗口可以认为是基于一个逻辑上的时间域,该时间域中的元素包含顺序递增的逻辑时间戳。 窗口可以是对齐的,也就是说窗口应用于所有落在窗口时间范围内的数据。也可以是非对齐的,也就是应用于部分特定的数据子集(如按某个键值筛选的数据子集)。窗口可以分为如下的几种类型,详情如图所示:

窗口类型划分

固定窗口(Fixed) (有时叫滚动窗口 Tumbling)是按固定窗口大小定义的,比如说小时窗口或天窗口。它们一般是对齐窗口,也就是说,每个窗口都包含了对应时间段范围内的所有数据。有时为了把窗口计算的负荷均匀分摊到整个时间范围内,有时固定窗口会做成把窗口的边界的时间加上一个随机数,这样的固定窗口则变成了不对齐窗口。

滑动窗口(Sliding) 按窗口大小和滑动周期大小来定义,比如说小时窗口,每一分钟滑动一次。这个滑动周期一般比窗口大小小,也就是说窗口有相互重合之处。滑动窗口一般也是对齐的;尽管上面的图为了画出滑动的效果窗口没有遮盖到所有的键,但其实五个滑动窗口其实是包含了所有的3个键,而不仅仅是窗口3包含了所有的3个键。固定窗口可以看做是滑动窗口的一个特例,即窗口大小和滑动周期大小相等。

会话窗口(Sessions) 是在数据的子集上捕捉一段时间内的活动。一般来说会话按超时时间来定义,任何发生在超时时间以内的事件认为属于同一个会话。会话是非对齐窗口。如上图,窗口 2 只包含 key1,窗口 3 则只包含 key2。而窗口 1 和 4 都包含了 key3。

而在 Flink 中的窗口可以进行如下的划分:

  • 键控窗口(Keyed Window)
  • 非键控窗口(Non-Keyed Window)

窗口分类

键控窗口

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

非键控窗口

1
2
3
4
5
6
7
8
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

注:[] 代表此操作为可选,建议采用键控窗口否则并行度会降低。

例程

滚动窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<T> input = ...;

// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);

// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);

滑动窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);

// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);

会话窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
DataStream<T> input = ...;

// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);

全局窗口

1
2
3
4
5
6
DataStream<T> input = ...;

input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);

注:此窗口方案仅在您还指定自定义触发器时才有效。

Window Function

在定义了窗口分配器之后,我们需要指定我们想要在每个窗口上执行的计算被称为 Window Function 即窗口函数。

窗口函数分为以下几种:

  • ReduceFunction
  • AggregateFunction
  • ProcessWindowFunction

其中的前两种可以更有效地执行。

ReduceFunction

合并相同 key 的内容

1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});

AggregateFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}

@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}

DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());

ProcessWindowFunction

ProcessWindowFunction 获得一个包含窗口所有元素的可迭代对象,以及一个可以访问时间和状态信息的 Context 对象,这使得它能够提供比其他窗口函数更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为准备好处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());

public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}

具有增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 组合

注:此处暂略,参见官方文档。

参考资料

官方文档


Flink Window
https://wangqian0306.github.io/2022/flink_window/
作者
WangQian
发布于
2022年2月14日
许可协议