再见2020!2020年最后一篇——Flink面向状态编程

1

有状态的Stream应用开发

流处理,以前我们的认知是来一条数据就处理一条,例如:解析某种编码的事件,将事件转换为更易读、已处理的编码格式。而当今天,我们要开发一个流式应用时,往往需要进行事件中间状态的存储。例如:我们需要每隔两秒计算出来最近1小时的访问流量。

Flink的特点就是有状态的流式处理。而有状态会让事情变得复杂起来。当流处理有状态时,一旦出现故障,就需要将出错之前的状态恢复回来,并且在对Flink集群进行扩展时,我们也需要将状态进行重新分配。还有一种需求,当一些计算指标存储在状态中时,我们希望通过HTTP这类方式,能够读取到状态中的数据,以用图形化的方式展示出来。所以,对于Flink状态的开发、管理,是尤其重要的。

在Flink中有三种不同类型的State,一种是Keyed State、一种是Operator State,还有一种是Broadcast state。

2

Keyed State

1

Keyed State介绍

State是一种存储,在Flink中拥有不同结构的State。这里要说的Keyed State,也就是按照key-value形式来组织的State。

Keyd State是专门针对KeyedStream能够使用的state。例如 :针对以下的KeyedStream。

KeyedStream itemByKeyDS = orderItemDS.keyBy(item -> item.getGoodsId());

换句话说,就是按照key进行分区之后的操作才能够使用Keyed State。Keyed state可以确保所有的状态更新都是本地操作,而且可以保证状态数据的一致性。因为一个key肯定属于一个分区,一个key的状态数据无需分布式存储。

2

使用Keyed State

每一个key对应有一个State,这个State可不简单。它的存储结构是非常丰富的。Flink官方把这些结构的操作称之为针对Keyed State的primitive(原语)。后边,我们也使用这个词来描述。

在Keyed State中,Flink支持的primitive有5种,我们分别来一下:

ValueState

ValueState也就是用于存储值的State,例如:我们可以将一个Integer或者是Double等这一类的类型存储在ValueState中。

ListState

表示可以将一个List数据存储在State中。它支持List的大部分操作,例如:添加一个元素到State中、获取元素、迭代列表或者更新整个列表。

ReducingState

Reuce表示用于保存聚合结果的状态。它的接口和ListState类似,但是ReducingState是用于做聚合计算的。

AggregatingState

类似于ReducingState,只不过计算的类型可以和元素的类型不一样。

MapState

可以将key-value存储。例如:可以通过put、get、keys、values等操作。

注意:所有的primitive都支持一个clear操作,可以将当前key的所有状态数据清除。

要获取状态,我们需要构建一个StateDescriptor,我们可以根据要创建的状态来选择不同的Descriptor。例如:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、MapStateDescriptor。通过context才可以获取到State,所以一般使用的是Rich Function中才能使用状态。

3

基于Keyed State的WordCount实现

接下来,我们使用Flink中的Keyed State来实现WordCount。因为WordCount需要将单词进行累加计数,所以此处,我们用ReducingState就比较方便一些,直接往State中添加单词,然后基于ReducingState实现一个Reduce Function。

<1> 把Flink的依赖导入进来

<repositories>
<repository>
<id>aliyunmaven</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>


<properties>
<flink-version>1.12.0</flink-version>
<scala-version>2.12</scala-version>
<mysql-version>5.1.47</mysql-version>
</properties>


<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala-version}</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>


<2> 导入一个log4j.properties配置文件,并将日志输出调整为DEBUG,方便我们观察Flink程序运行过程。

# This affects logging for both user code and Flink
log4j.rootLogger=DEBUG, console


# Uncomment this if you want to _only_ change Flink's logging
# log4j.logger.org.apache.flink=WARN


# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=WARN
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=WARN


# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console


<3> 实现一个自定义Source,用于发射单词。此处我们每秒发送一个单词。

/**
* 随机生成单词的DataSource
*/
public class WordDataSource extends RichSourceFunction<String> {
private Boolean isCancel;
private String[] words = {"hadoop", "spark", "linux", "flink", "flume", "oozie", "kylin"};
private Random r;


@Override
public void open(Configuration parameters) throws Exception {
this.isCancel = false;
this.r = new Random();
}


@Override
public void run(SourceContext<String> ctx) throws Exception {
while (!this.isCancel) {
ctx.collect(words[r.nextInt(words.length)]);
TimeUnit.SECONDS.sleep(1);
}
}


@Override
public void cancel() {
this.isCancel = true;
}
}


<4> 在Flink的main方法中规划整个应用的pipeline。先将单词转换为元组,然后按照单词进行分组。然后在map中进行计算,并输出结果。

public class WordCountKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> wordDS = env.addSource(new WordDataSource());


SingleOutputStreamOperator<Tuple2<String, Integer>> totalCntDS =
wordDS.map(w -> Tuple2.of(w, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
.keyBy(t -> t.f0, TypeInformation.of(new TypeHint<String>() {}))
.map(new WordCountReducingFunction());


totalCntDS.print();


env.execute("Word Count By Reducing State!");
}
}

大家看到了,能用Lambda表达式的地方我尽量都用了,虽然因为Java编译时擦除编码。所以,我们在Lambda表达式后面使用TypeHint构建匿名内部类,以此来保存被擦除的类型信息。我想在main中展示一个完整的Pipeline,而不想被其他的代码过多干扰,所以,坚持使用lambda来进行简洁表达。

<5> 基于ReducingState来进行聚合计算

public class WordCountReducingFunction extends RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {


private ReducingState<Tuple2<String, Integer>> reducingState;


@Override
public void open(Configuration parameters) throws Exception {
ReducingStateDescriptor<Tuple2<String, Integer>> stateDes = new ReducingStateDescriptor<Tuple2<String, Integer>>("total_cnt", (t1, t2) -> {
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));


reducingState = this.getRuntimeContext().getReducingState(stateDes);
}


@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
if (reducingState != null) {
reducingState.add(value);
}


return reducingState.get();
}
}

简单解释以下代码,在open中,我们先构建了一个ReducingStateDescriptor,我们指定了State的名字,并且指定了作用在该state上的reduce function。然后在map方法中,我们直接将接受到的数据直接添加到reducing state中,然后获取reducing state的聚合数据。

我们一启动程序就可以看到,不断累加的单词了。

其他类型的Keyed State和Reducing state的使用方式类似。这里就不一一演示了,大家可以自己去试试。

4

State TTL

我们可以给State设置TTL(Time to live),以此来配置State的有效期,如果state已经过期,那么存储的值按会被自动清除。而且,如果state是集合类型,那么TTL是单独针对每个元素设置的,也就是说每一个List元素、或者是Map的entry都有独立的TTL。

接下来,给大家演示一下如何设置State的TTL。

还是基于之前的有状态WordCount案例,现在我们要求是:如果一个单词10秒钟没有更新,我们就将它的状态清除。

我们再WordCountReducingFunction中,设置以下状态的TTL。

@Override
public void open(Configuration parameters) throws Exception {
// 设置10秒的TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 创建一个ReudcingStateDescriptor,每增加到State一个元素,都会进行Reudce计算
ReducingStateDescriptor<Tuple2<String, Integer>> stateDes = new ReducingStateDescriptor<Tuple2<String, Integer>>("total_cnt", (t1, t2) -> {
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));


// 设置ttl
stateDes.enableTimeToLive(ttlConfig);


reducingState = this.getRuntimeContext().getReducingState(stateDes);
}

来看一下设置State TTL的代码:

 StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

<1> 此处newBuilder(Time.seconds(10))设置Reducing State的TTL为10秒钟

<2> StateTtlConfig.UpdateType.OnReadAndWrite表示在写入和读取的时候更新TTL

<3> StateTtlConfig.StateVisibility.NeverReturnExpired表示过期后就获取状态不返回值

大家可以来看一下输出数据:

2020-12-31 10:57:45 (oozie,1)
2020-12-31 10:57:46 (kylin,1)
2020-12-31 10:57:47 (kylin,2)
2020-12-31 10:57:48 (kylin,3)
2020-12-31 10:57:49 (oozie,2)
2020-12-31 10:57:50 (flink,1)
2020-12-31 10:57:51 (kylin,4)
2020-12-31 10:57:52 (linux,1)
2020-12-31 10:57:53 (oozie,3)
2020-12-31 10:57:54 (linux,2)
2020-12-31 10:57:55 (kylin,5)
2020-12-31 10:57:56 (oozie,4)
2020-12-31 10:57:57 (flume,1)
2020-12-31 10:57:58 (kylin,6)
2020-12-31 10:57:59 (linux,3)
2020-12-31 10:58:00 (oozie,5)
2020-12-31 10:58:01 (linux,4)
2020-12-31 10:58:02 (kylin,7)
2020-12-31 10:58:03 (hadoop,1)
2020-12-31 10:58:04 (hadoop,2)
2020-12-31 10:58:05 (linux,5)
2020-12-31 10:58:06 (hadoop,3)
2020-12-31 10:58:07 (kylin,8)
2020-12-31 10:58:08 (linux,6)
2020-12-31 10:58:09 (flink,1)
2020-12-31 10:58:10 (kylin,9)
2020-12-31 10:58:11 (hadoop,4)
2020-12-31 10:58:12 (flink,2)
2020-12-31 10:58:13 (kylin,10)
2020-12-31 10:58:14 (linux,7)
2020-12-31 10:58:15 (spark,1)
2020-12-31 10:58:16 (flume,1)
2020-12-31 10:58:17 (oozie,1)
2020-12-31 10:58:18 (spark,2)
2020-12-31 10:58:19 (hadoop,5)
2020-12-31 10:58:20 (linux,8)

kylin的单词计数一直没有过期,因为kylin连续在10秒内都有新的元素生成,所以计算后生成的元素TTL没有超过10秒。

2020-12-31 10:58:00 (oozie,5)
2020-12-31 10:58:17 (oozie,1)

oozie超过10秒没有新的元素进来,state ttl过期。

设置TTL后,默认会在读取时自动删除,如果状态配置了backend,则是后台进行垃圾回收。也可以配置禁用后台垃圾回收。

3

Operator State

1

Operator State介绍

前面我们所学习的Keyed State是常用的,但有一些Opterator可能不是Keyed Stream,但也需要用到State。那怎么办呢?例如:比较常用的Kafka Connector。

这时候,就该Operator State登场了。之前我们学习的Keyed State是state是绑定在key上,而Operator State是绑定在Operator的并行度实例上。简单说,就是一个并行度一个状态。Kafka Connector就需要用到Operator State。因为每一个并行度都是从Kafka中的一个topic的某个分区消费数据,而这个并行度为了保证数据不丢失且一致性,就不能将topic、partition、offset保存到默认的ZooKeepper中,而是将这些数据保存在状态中,自己来维护这些元数据 。

而当并行度发生调整时,需要在Operator的并行度上重新分配状态。在大多数的流处理程序开发中,我们大多用不到Operator State。它主要是针对实现一些Source、或者Sink,而Source、Sink都是没有key的,就需要用到Operator State了。

2

使用Operator State

要使用Operator State,有状态的Function可以实现一个CheckpointedFunction接口。

每当执行checkpoint时,会自动调用snapshotState方法,而每次初始化用户自定义的Function时,会自动调用initializeState()方法。不管是第一次初始化Function,还是从某个checkpoint中恢复,这个initializedState都会被调用。所以,我们在initializedState中初始化不同类型的状态,并且将状态恢复的逻辑也放在里面。

当前,Flink支持List的Operator State。可以将彼此独立并支持序列化的对象放在该列表中。也是基于此,支持并行度调整进行了redistribute。我们来看下Flink的redistribute operator状态的。

<1> 基于平均拆分的redistribution

在operator进行还原或者redistribute时,Operator的List State会被平均分为与Operator并行度一样数量的子列表。每个Operator并行度都有一个子列表,子列表中的对象可以为空,也可以包含一个或多个元素。

<2> 基于Union的redistribution

这种方式的redistribution,每个operator并行度都会读取完整的List State列表,checkpoint metadata会存储每个operator并行度的offset。根据offset来分配List State。这种方式,如果List State的基数较大时,不要使用这种方式的redistribution。容易在OOM。

3

有状态的Source实现

我们还是通过一个示例来演示如何使用Operator State。还基于之前的WordCount案例,之前我们的自定义的Source发送单词是没有状态的。现在,我们想实现这样一共功能,每当发送了10个单词后,发送一个primitive单词。基于此,我们需要在Source中使用状态,记录下来当前发送的单词数量。

<1> 设计有状态的source

public class WordDataSourceWithState extends RichSourceFunction<String> implements CheckpointedFunction {
private Boolean isCancel;
private String[] words = {"hadoop", "spark", "linux", "flink", "flume", "oozie", "kylin"};
private Random r;


private BigInteger totalCount;
private ListState<BigInteger> listState;


@Override
public void open(Configuration parameters) throws Exception {
this.isCancel = false;
this.r = new Random();
this.totalCount = BigInteger.valueOf(0);
}


@Override
public void run(SourceContext<String> ctx) throws Exception {
while (!this.isCancel) {
if(totalCount.intValue() % 10 == 0) {
ctx.collect("primitive");
}
else {
ctx.collect(words[r.nextInt(words.length)]);
}
totalCount = totalCount.add(BigInteger.valueOf(1));
TimeUnit.SECONDS.sleep(1);
}
}


@Override
public void cancel() {
this.isCancel = true;
}


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 快照状态
listState.clear();
listState.add(totalCount);
}


@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 1. 构建StateDesccriptor
ListStateDescriptor<BigInteger> totalCountState =
new ListStateDescriptor<BigInteger>("total_count", TypeInformation.of(BigInteger.class));


// 2. 构建Operator State
listState = context.getOperatorStateStore().getListState(totalCountState);


// 3. 恢复totalCount
if(context.isRestored()) {
Iterable<BigInteger> iterTotalCnt = listState.get();
Iterator<BigInteger> iterator = iterTotalCnt.iterator();
if(iterator.hasNext()) {
totalCount = iterator.next();
}
}
}
}


这里,我们实现了CheckpointedFunction接口,该接口有两个方法。在initializeState方法中,我们进行了状态的初始化,并将状态的值赋值给成员变量。而在snapshotState方法中,我们将当前最新的单词数量放入到状态中。

<2> 在Flink的main中,设置checkpoint的相关配置。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend("file:///d:/code/flinkwork/chk", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

程序运行时,会在d:/code/flinkwork/chk目录下生成checkpoint。

4

Broadcast State

Broadcast State是Operator State的特殊类型。使用广播状态,可以将一个流中的数据广播到下游所有的任务中。通过Broadcast state,我们可以让下游的每一个任务,保持一模一样的状态,然后下次再来一个数据,就可以跟上一次的数据进行对比处理。Broadcast State比较适合这种场景,假设我们做一个规则匹配事件处理系统,规则是一个低吞吐的流、而事件是一个高吞吐的流。我们可以将规则以Broadcast State的方式发送到下游每一个任务中,然后当有事件需要处理时,可以从广播中读取之前的规则,进行处理。

broadcast state是一个Map结构,它比较适合于有一个broadcast stream、另外一个不是broadcast stream的operator。

1

带有规则的单词计数

我们现在有另外一个规则流,这个流中会发送不需要进行单词统计的单词。我们通过广播流将流里的数据发送到所有下游的Task。然后进行单词的有规则的计数。

<1> 先来实现一个用于单词计数匹配计算的规则流数据源(WordRuleSource):

public class WordRuleSource extends RichSourceFunction<String> {


private volatile Boolean isCancel;


@Override
public void open(Configuration parameters) throws Exception {
this.isCancel = false;
}


@Override
public void run(SourceContext<String> ctx) throws Exception {
String[] wordsIgnore = {"flume", "oozie", "kylin"};


for (String word : wordsIgnore) {
ctx.collect(word);
}


TimeUnit.SECONDS.sleep(1);
}


@Override
public void cancel() {
this.isCancel = true;
}
}

我们后续要基于这个WordRuleSource中的三个单词进行过滤,如果单词属于flume、oozie或者是kylin。我们就不进行单词统计。

<2> 构建Flink基于Rule的单词统计pipeline。

public class WordCountRuleKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


env.enableCheckpointing(1000);
env.setStateBackend(new FsStateBackend("file:///d:/code/flinkwork/chk", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


DataStreamSource<String> ruleDS = env.addSource(new WordRuleSource());
DataStreamSource<String> wordDS = env.addSource(new WordDataSourceWithState());


MapStateDescriptor<String, String> wordResultMapState =
new MapStateDescriptor<>("word_rule", TypeInformation.of(String.class), TypeInformation.of(String.class));
BroadcastStream<String> wordruleBroadcastStream = ruleDS.broadcast(wordResultMapState);


SingleOutputStreamOperator<Tuple2<String, Integer>> totalCntDS =
wordDS.map(w -> Tuple2.of(w, 1), TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}))
.keyBy(t -> t.f0, TypeInformation.of(new TypeHint<String>() {}))
.connect(wordruleBroadcastStream)
.process(new KeyedRuleBroadcastProcessFunction());


totalCntDS.addSink(new MyPrintSink());


env.execute("Word Count By Reducing State!");
}
}

码中有几处需要额外留意:

1、首先我们需要基于WordRule Stream创建一个MapState,然后将MapState广播出去。

2、我们将单词流使用connect和广播规则流关联在一起

3、最后使用process来进行自定义处理.

<3> 我们向process方法传递了一个KeyedBroadcastProcessFunction,在该类中分别处理规则元素(广播)和单词流(KeyedStream)元素。

public class KeyedRuleBroadcastProcessFunction extends KeyedBroadcastProcessFunction<String,
Tuple2<String, Integer>,
String,
Tuple2<String, Integer>> {


private ReducingStateDescriptor<Tuple2<String, Integer>> reducingStateDescriptor;
private MapStateDescriptor<String, String> mapStateDescriptor;


private ReducingState<Tuple2<String, Integer>> reducingState;
private MapStateDescriptor<String, String> broadcastMapState;


@Override
public void open(Configuration parameters) throws Exception {
reducingStateDescriptor =
new ReducingStateDescriptor<Tuple2<String, Integer>>("total_cnt", (t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1)
, TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
mapStateDescriptor =
new MapStateDescriptor<>("word_rule", TypeInformation.of(String.class), TypeInformation.of(String.class));


reducingState = this.getRuntimeContext().getReducingState(reducingStateDescriptor);
}


@Override
public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Iterable<Map.Entry<String, String>> wordRule = ctx.getBroadcastState(mapStateDescriptor).immutableEntries();
Iterator<Map.Entry<String, String>> iterator = wordRule.iterator();


Boolean hasTheWordInRule = false;
while(iterator.hasNext()) {
if(iterator.next().getKey().equals(value.f0)) {
hasTheWordInRule = true;
continue;
}
}


if(!hasTheWordInRule) {
if(reducingState != null) {
reducingState.add(value);
}


out.collect(reducingState.get());
}
}


@Override
public void processBroadcastElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
ctx.getBroadcastState(mapStateDescriptor).put(value, value);
}
}

在processBroadcastElement方法中,我们将广播流中的元素,一个一个地添加到MapState中。processElement中处理每一个单词流中的元素,并根据状态中的单词进行过滤。然后用Reducing State获取计算结果。

Flink中的状态使用起来是没有什么开发成本的,所以当我们要进行有状态地计算时,首先应该考虑的就是State,面向State来进行Flink编程。

参考文献:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html

THE

END

Flink
我还没有学会写个人说明!
上一篇

[git管理]使用shell脚本和nodejs实现快速commit和打tag

下一篇

研究人员提出有关净化滤水膜工作原理的新见解

你也可能喜欢

评论已经被关闭。

插入图片