Flink State

简介

State 是指流计算过程中计算节点的中间计算结果或元数据属性,比如在 aggregation 过程中要在 state 中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的 offset,这些 State 数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。

Flink 内部按照算子和数据分组角度将 State 划分为如下两类:

  • KeyedState 这里面的 key 是我们在 SQL 语句中对应的 GroupBy/PartitionBy 里面的字段,key 的值就是 GroupBy/PartitionBy 字段组成的 Row 的字节数组,每一个 key 都有一个属于自己的 State,key 与 key 之间的 State 是不可见的;
  • OperatorState Flink 内部的 Source Connector 的实现中就会用 OperatorState 来记录 source 数据读取的 offset。
    • OperatorState 的作用范围限定为单个任务,由同一并行任务所处理的所有数据都可以访问到相同的 State
    • OperatorState 对于同一子任务而言是共享的
    • OperatorState 不能由相同或不同算子的另一个子任务访问

自带数据存储后端有如下两种:

  • HashMapStateBackend
  • EmbeddedRocksDBStateBackend

如果没有其他配置,系统将使用 HashMapStateBackend。

通常情况下应该采用 HashMapStateBackend 仅在处理大量 State,超大窗口及大量键值对 State 时应当选择 HashMapStateBackend。

配置

使用 HashMapStateBackend 可以进行如下配置

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

如果使用 EmbeddedRocksDBStateBackend 则需要额外引入如下包:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.14.3</version>
<scope>provided</scope>
</dependency>
1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());

样例

KeyedState

在经过 keyBy 操作之后就可以获得 KeyedStream 对象,针对 KeyedStream 对象可以使用如下 State:

  • ValueState :储存单个值的 State
  • MapState<UK,UV> :储存 Map 形式的 State
  • ListState :储存 List 形式的 State
  • ReducingState :只存储一个元素,而不是一个列表。它的运行方式是将新元素通过 add(value: T) 加入后,与已有的状态元素使用 ReduceFunction 合并为一个元素,并更新至 State。
  • AggregatingState<IN,OUT> :运行方式与 ReducingState 类似,但是可以指定不同的输入输出类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;

@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

// access the state value
Tuple2<Long, Long> currentSum = sum.value();

// update the count
currentSum.f0 += 1;

// add the second field of the input value
currentSum.f1 += input.f1;

// update the state
sum.update(currentSum);

// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}

@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}) // type information
sum = getRuntimeContext().getState(descriptor);
if (sum == null){
sum = Tuple2.of(0L, 0L);
}
}
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();

此外还可以为 State 设置 TTL

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

OperatorState

通常无须使用 OperatorState。它主要是一种特殊类型的状态,主要用在对接数据源或没有可以对状态进行分区的键的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {

private final int threshold;

private transient ListState<Tuple2<String, Integer>> checkpointedState;

private List<Tuple2<String, Integer>> bufferedElements;

public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}

@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() >= threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}

参考资料

state 官方文档

state_backends 官方文档

Apache Flink 漫谈系列(04) - State

Flink状态管理详解:Keyed State和Operator List State深度解析


Flink State
https://wangqian0306.github.io/2022/flink_state/
作者
WangQian
发布于
2022年2月17日
许可协议