大数据测试场景科普 — 流计算篇 (上)

前言

3年前的时候写过关于一些大数据入门基础的文章, 当时学习的是spark。文章链接如下:

大数据介绍:https://testerhome.com/topics/7988

spark基础操作:https://testerhome.com/topics/8040

shuffle和性能测试:https://testerhome.com/topics/8120

离线大数据作业的测试方法:https://testerhome.com/topics/17092

这一篇算是弥补了之前对于流计算的缺失吧。由于我们产品在今年加入了流计算的能力, 并且Flink貌似也有要在流计算领域中一统江山的架势,所以我前段时间借着调研混沌工程方案的契机,也开始学习了Flink并了解我们产品对于流计算的应用场景(PS:混沌工程是流计算中一个比较重要的测试手段)。今天把学习和实践的一些总结分享出来。

什么场景需要流计算

流计算一般都是在一些数据计算的实时性要求很高的场景中出现, 之前在讲spark的时候都是基于离线的批处理计算的, 这种计算方式无法满足产品对实时性的要求。比如如果我们要在大看板上计算PV和UV的数据, 一般都是希望能够实时的观看到这些数据的变化。而计算PV和UV的操作又不好嵌入到业务系统中, 因为对业务侵入性太强并且会影响性能。所以一般的架构可能是如下的样子:

业务系统会将用户行为数据push到消息中间件(kafka)中, 这样达到了解耦和降低性能开销的目的。而flink streaming 服务会订阅kafka的topic进行流处理, 也就是一旦有数据从kafka中发送过来,满足一定条件后就会触发flink的一系列算子进行计算, 数据在flink中的这些算子中进行传递,聚合,计算等操作后, 将经过处理的数据推送给外部的存储系统或者业务系统, 这些系统会将数据做进一步保存和处理后展示在大屏上–这就是一种计算PV和UV的简单的场景了。整个过程之所以叫 ,就是因为数据并不是像传统的方式保存到文件系统中,在保存到了一定的量或者利用定时任务触发批处理计算的方式执行。而是数据就像一条pipeline(流水线)一样,来一个(或者一小批,可以规定时间窗口,可以规定数据个数)就处理一个,并把处理结果传到pipeline的下一个算子上继续处理,这种方式是不是有点像jenkins的piepline~ 实际上很多AI系统,比如推荐系统,反欺诈系统这些对实时性要求比较高的场景都要利用流计算来实时的进行处理。如果要展开它的内部处理过程大概是下面这个图:

重点说明一下上图中的task, 这个task就可以理解为Flink中的算子了, 也叫operator。在Flink中可以定义当数据到来的时候, 都经过哪些算子,按照什么顺序进行计算。比如可以先使用filter算子把没用的数据进行过滤, 再使用map算子对原始数据做一些转换, 后面再使用sum算子进行累加计算出PV。当然这些task是可以并行计算的,Flink可以合并计算结果。要是写代码的话,大概是下面这个样子:

上面是大数据领域经典demo workcount, 计算文件中的词频。上面我用红色框起来的部分就是算子, 先是flatmap做一些处理, 再使用keyBy算子把数据分类, 把有相同的key的数据分到一个组里,然后进行sum的累加计算, 这样就能计算出每个key(单词)的词频了(这个单词出现多少次)。PS:代码里的keyBy(0)中的0是数据的第几列, 意思是按第几列进行分组。

OK, 上面就是简单讲讲什么是流计算以及什么场景需要流计算。flink的算子和运行模式跟spark是很像的,对flink的使用还有疑问的同学可以看看我之前写的spark基础。下面要开始讲测试点了。

从消息中间件说起

好像现在业界主流的能支持流计算的消息中间件也就只有kafka了,所以我下面都用kafka来举例(实际上我也只用过kafka,请原谅我知识上的匮乏),这里我要讲一下kafka的精准一次性语义, 之所以讲这个是要开始讲述流计算中最重要最难以验证的一个场景(对,我就是想先讲难的,重要的) —- 数据一致性。什么是数据一致性呢,就是不论在任何情况下数据被处理的结果都是一致的。这里说的任何情况包括但不限于:

  • 计算任务异常重启后导致之前已经计算过的数据丢失

  • 计算任务异常重启后导致之前已经计算过的数据在本次任务进行重试的时候造成的数据重复计算

  • 计算任务不能因为网络延迟,异常等因素导致数据传递给下游系统失败后导致的数据丢失

那么我们看kafka是怎么处理这种情况的, 当我们使用kafka的producer向broker推送消息的时候,怎么能保证本次推送的消息不会因为各种异常导致数据丢失呢?很多小伙伴可能已经想到了重试, 如果因为网络异常等原因导致push请求异常的话,那么我们重试几次就好了,毕竟kafka开了高可用模式,集群上会有其他的broker提供服务,就算当前的broker彻底跪了数据也不会丢失的。但是我们是否想过一个问题,重试请求是可以随便执行的么?或者说程序怎么能确定本次失败的推送请求就是真的失败了,也就是数据没有保存到kafka上。在kafka中确认消息是否推送成功是需要producer和broker互相交换ACK的, 也就是producer在把消息推送给broker后,broker在保存成功后要给producer回一个ACK让 客户端知道消息已经保存成功了。那么如果我们的异常是发生在broker已经保存好数据和把ACK发送到客户端之间呢?也就是数据已经保存好了, 只是没有给客户端返回ACK,所以客户端认为这个推送消息的请求是失败的。那么这个时候如果我们执行了retry的逻辑,实际上数据就出现了重复的场景。

这么解释大家是不是就明白了retry的逻辑不是能随随便便加的,它有一个前提条件, 就是它要retry的那个接口必须是幂等的。这个我再当初讲混沌工程的时候也提过, 一个高可用的系统,它的接口必须是幂等的, 因为高可用的模式说白了就是上游系统retry,下游系统多副本负载均衡+幂等。只有下游系统有幂等的能力,上游系统才敢执行重试操作, 否则的话就是数据重复写。那么幂等是什么意思呢, 大白话就是接口自己能判断出当前的请求是不是之前已经发送过的重复数据了,如果是重复数据它是不处理的。行话就是同样的数据不论计算多少次都不对结果造成影响,此为幂等。

而kafka的精准一次性语义中定义了几个级别的模式, 其中有一个叫exactly once(精准一次性语义,意思是我保证针对一个数据不管你重复发送多少次,服务端都只计算一次)这种模式就可以解决这个问题。在producer中可以设置幂等和分布式事务相关的参数和代码, 一旦这样设置了,那么就拥有了幂等属性, kafka内部会根据算法计算出消息的唯一id,broker只要查询消息的id在之前是否有保存过就可以判断出当前消息是否是重复数据了(大概是这样,细节没研究过)。这样客户端就可以肆无忌惮的进行重试而不必担心数据重复计算。

再谈Flink的exactly once

通过讲述Kafka的精准一次性语义也就是exactly once 是为了跟大家讲述什么是数据一致性以及保证数据一致的方法和重要性。由于kafka本身提供了这种特性所以要保证消息传送到kafka的数据一致性是比较容易的, 正因为很容易一般不容易出错所以很多团队都忘了去测试这个场景(有时候研发会忘了设置这个参数导致出现bug,所以最好还是需要测一下)。当时光保证kafka的精准一次性是不行, 我们是一个业务场景, 我们需要的是端到端的一致性, 得是全链路的一致性。所以现在我们来看看Flink这一层怎么做的exactly once。

Checkpoint

讲到这里就必须要说明一下大名鼎鼎的checkpoint了, 基本上checkpoint是所有分布式框架都要有的机制,spark如此flink亦如此。checkpoint就是一种保存我们在计算过程中的数据的方式, 它会根据设置周期性的触发checkpoint来保存我们计算的中间结果。我们还是用PV的案例说明:

我们从Kafka读取到一条条的消息,从消息中解析出app_id,然后将统计的结果放到内存中一个Map集合,app_id做为key,对应的pv做为value,每次只需要将相应app_id 的pv值+1后put到Map中即可。

这里简要说明一下kafka的offset, 这个是消费消息的客户端也就是consumer要使用offset来记录我已经读取到了消息队列中的哪一条数据, 根据这个offset我可以知道下一次我要读取的消息的位置。即便是程序崩溃了, 只要offset能够保存下来就知道恢复后应该从哪个消息开始读取了。所以在这个机制下,flink的Source task记录了当前消费到kafka test topic的所有partition的offset。所以flink会根据策略周期性的触发checkpoint事件以流的方式传递给所有的算子, 算子收到checkpoint命令后就会把中间状态保存起来, 比如在我们的案例里保存的就是kafka的offset, 比如我们设置每30s触发一次checkpoint, 那么30s后checkpoint触发,保存的数据为:

chk-100

offset:(0,1000)

pv:(app1,50000)(app2,10000)

该状态信息表示第100次CheckPoint的时候, partition 0 offset消费到了1000,pv统计结果为(app1,50000)(app2,10000)。那么如果任务挂了,这时候怎么办?比如:

  • 假如我们设置了三分钟进行一次CheckPoint,保存了上述所说的 chk-100 的CheckPoint状态后,过了十秒钟,offset已经消费到 (0,1100),pv统计结果变成了(app1,50080)(app2,10020),但是突然任务挂了,怎么办?

  • flink只需要从最近一次成功的CheckPoint保存的offset(0,1000)处接着消费即可,当然pv值也要按照状态里的pv值(app1,50000)(app2,10000)进行累加,不能从(app1,50080)(app2,10020)处进行累加,因为 partition 0 offset消费到 1000时,pv统计结果为(app1,50000)(app2,10000)。

上面讲的并行度为1的情况, 那么如果并行度是N的情况,checkpoint会在并行的算子里触发,这个时候Flink会选择是保持多个checkpoint一起执行完后在统一往后运算(exactly once), 还是选择不去协调,任意一个算子运行完checkpoint后就当前线程就继续往下运算(at least once),因为at least once模式会造成并行的算子的checkpoint不是同时触发和结束, 所以他们保存的中间态数据有偏差,也就是数据是会不一致。所以如果业务场景有数据强一致性的需求,那么需要将checkpoint模式设置为exactly once。这里大家能明白了么?我们通过把kafka的offset和我们已经计算好的结果都通过checkpoint进行保存来防止数据丢失或重复计算的情况。代码差不多如下:

当然上面是checkpoint策略, 在实际开发算子任务的时候,要把什么数据通过checkpoint保存到flink的state backend是需要先调用对应的state 方法来执行的。

贴一个checkpoint的图:

说回数据一致性

好了上面说了那么多东西, 但是好像kafka和Flink 都已经把数据一致性保证好了, 那还需要我们测试什么一致性么?那不是变成了在测试kafka或者flink么?我想一定会有同学这么问, 那么我在这里解释下:

  • 即便kafka和flink有exactly once 语义, 但是开启这些语义需要对应的参数调整, 并且需要编码的时候进行处理, 比如kafka里在开启了exactly once 语义后, 也需要研发在代码里显示调用分布式事务进行数据计算, flink里对于kafka的offset和计算结果的保存也需要显示在代码里调用类似valueState来进行保存和处理。也就是你们的产品研发同学是否编码正确决定了数据一致性。

  • 在我们的流计算里, flink上下游都会对接不同的系统, 上游可以是kafka,也可以是业务系统暴露出来的socket服务,也可以其他的源。所以你在使用非kafka也就是没有exactly once语义支持的系统的时候,就需要研发去开发相应的方案来解决这个问题。同理输出方, 流是有数据的源,也有在经过flink计算之后输出的系统,这个系统可以是另外一个kafka,也可以是mysql, 也可以是业务系统的接口。那么输出方是否有exactly once语义支持呢?非kafka的场景下,基本上也是没有的, 也需要研发来开发对应的方案。也就是说我程序中Flink的CheckPoint语义设置了 Exactly Once,但是我在计算的过程中需要实时的把计算结果保存到mysql里,那异常出现的时候根据checkpoint机制,我们从上一个checkpoint记录中保存的offset去重新读取并计算消息, 这时候我的mysql中看到岂不是看到了数据重复了?比如程序中设置了1分钟1次CheckPoint,但是5秒向mysql写一次数据,并commit。所以我们要求的是Flink的end to end的精确一次都必须实现。如果你的chk-100成功了,过了30秒,由于5秒commit一次数据库,所以实际上已经写入了6批数据进入mysql,但是突然程序挂了,从chk100处恢复,这样的话,之前提交的6批数据就会重复写入,所以出现了重复消费。Flink的精确一次有两种情况,一个是Flink内部的精确一次,一个是端对端的精确一次。这里面有点绕,我解释的有点啰嗦。

所以根据上面说的,虽然flink提供了exactly once 语义, 但是它的exactly once 语义只保证flink自己的数据计算过程,而不是端到端的。想要保证数据一致性,还是需要研发同学针对业务场景进行特殊的设计。也就是开发自己产品的exactly once 语义。所以我们还是要针对端到端的场景进行测试。

测试的注意事项

  • 首先弄清楚产品中流计算的架构,都有哪些数据源,数据又发送到哪些地方。这一步至关重要, 因为端到端的数据一致性场景,在这一条流式链条里,任何一个点没有做到精准一次性语义都会导致数据不一致,所以我们要测试所有的点。

  • 完成第一步后在每一个点进行故障注入,故意让任务失败,让服务挂掉, 属于混沌工程式的测试方法, 就是想尽办法让这个流式的链条中的服务出故障来验证数据一致性。没有测试工具的同学可以去看一下阿里开源的chaos blade。注意:一个场景的故障注入要反复进行,比如30分钟内每隔3分钟都随机找到一个flink task manager进行kill来注入故障, 有些时候只注入一次故障是发现不了bug的,因为我们是有状态计算,有状态计算的场景很多是在特殊的状态下发生故障才会出错。所以要反复注入故障来最大概率的触发bug。

  • 自动化测试中case要验证数据一致性的点,比如在kafka->flink-mysql 的这个场景里,你往数据源kafka里灌入了1000个消息,如果正确的逻辑是经过计算后要往mysql存入10条记录, 那么你要去验证这10条记录的正确性。是否有数据丢失或者重复的结算结果出现。

注意:做这个测试前, 先确定你们是否有数据一致性的强需求。有些场景真的会觉得数据丢了就丢了。。。。。

结尾

好了写这么多, 今天罗里吧嗦的写了一大堆好像就说了一个数据一致性的测试。之前在社区跟人讨论的时候,有很多同学其实不赞同这种深入研发架构的测试方式。而我前两天刷脉脉的时候也在匿名区看到有人发消息, 讨论区里对qa是否要测试这种场景有很大的争议。所以我花了很大的篇幅解释一下做这种测试的必要性。下次我们将其他的测试方法。

↙↙↙阅读原文可查看相关链接,并与作者交流,讨论区更精彩

打下广告,社区线上沙龙,12月27日,就是本周日下午,WeTest,微信,网易和大家一起探讨 devops下的质量保障!由于社区小助手失联了。请大家加下面两位同学的微信,给大家拉到讨论群中去~

xiaozhao129540

oscarx1982

TesterHome
我还没有学会写个人说明!
上一篇

防火墙 Keepalived 异常双活恢复后部分外网访问中断问题分析

下一篇

SpaceX庆祝首次回收助推器5周年 今年连续成功20次

你也可能喜欢

评论已经被关闭。

插入图片