探索RocketMQ的重复消费和乱序问题

微信扫一扫,分享到朋友圈

探索RocketMQ的重复消费和乱序问题

前言

在之前的MQ专题中,我们已经解决了消息中间件的一大难题,消息丢失问题。

但MQ在实际应用中不是说保证消息不丢失就万无一失了,它还有两个令人头疼的问题:重复消费和乱序。

今天我们就来聊一聊这两个常见的问题,看看RocketMQ是如何解决这两个问题的。

为什么会重复消费

首先我们来聊一聊重复消费的问题,要解决一个问题最开始的一步当然是去查找问题发生的原因了。

那出现重复消费的原因到底是什么呢?

我们先来思考一下生产者发送消息这一过程中是不是有可能重复发送消息到MQ呢?

答案是肯定的,比如生产者发送消息的时候使用了重试机制,发送消息后由于网络原因没有收到MQ的响应信息,报了个超时异常,然后又去重新发送了一次消息。

但其实MQ已经接到了消息,并返回了响应,只是因为网络原因超时了。

这种情况下,一条消息就会被发送两次。

当然,这只是列举了一种情况,实际有很多情况会造成消息的重新发送。

那么假如生产者没有重复发送消息,消费者就能保证不重复消费了吗?

当然不能保证,我们知道,在消费者处理了一条消息后会返回一个offset给MQ,证明这条消息被处理过了。

但是,假如这条消息已经处理过了,在返回offset给MQ的时候服务宕机了,MQ就没有接收到这条offset,那么服务重启后会再次消费这条消息。

如何解决重复消费

解决重复消费的关键就是引入 幂等性机制 ,什么是幂等性机制呢?我们可以把它理解成,假如一个接口被重复调用,依然可以保证数据的准确性。

对于生产者重复发送消息到MQ这一过程,其实我们没有必要去保证幂等性,只要在消费者处理消息时保证幂等性就可以了。

这块其实就比较简单了,只要处理消息之前先根据业务判断一下本次操作是否已经执行过了,如果已经执行过了,那就不再执行了,这样就可以保证消费者的幂等性。

举个例子,比如每条消息都会有一条唯一的消息ID,消费者接收到消息会存储消息日志,如果日志中存在相同ID的消息,就证明这条消息已经被处理过了。

消息重试、延时消息、死信队列

解决完重复消费问题,我们来思考一种极端情况,比如某一时刻,消费者操作的数据库宕机了,这个时候消费者会发生异常,当然不能返回给MQ一个CONSUME_SUCCESS了,我们可以返回RECONSUME_LATER,他的意思是我现在没法处理这些消息,一会再来试试能不能处理。

简单来说,RocketMQ会有一个针对当前Consumer Group的重试队列,如果你返回了RECONSUME_LATER,MQ会把你的这批消费放到当前消费组的重试队列中,然后过一段时间重试队列中的消息会再次发送给消费者,默认可以重试16次,每次重试的间隔是不同的,这个时间间隔是可以配置的,默认配置如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

细心的小伙伴会发现,这个配置一共有18个时间,为什么最多重试16次,配置中却有18个时间呢,这里就要说到 延时消息 了。

上边的配置其实不是针对重试队列的,而是针对 延时消息 的,18个时间分别代表延迟level1-level18,延时消息大概流程如下:

1 所有的延迟消息到达broker后,会存放到SCHEDULE_TOPIC_XXX的topic下(这个topic比较特殊,对客户端是不可见的,包括使用rocketmq-console,也查不到这个topic)

2 SCHEDULE_TOPIC_XXX这个topic下存在18个队列,每个队列中存放的消息都是同一个延迟级别消息

3 broker端启动了一个timer和timerTask的任务,定时从此topic下拉取数据,如果延迟时间到了,就会把此消息发送到指定的topic下,完成延迟消息的发送

刚才我们说如果你返回了RECONSUME_LATER,消息就会进入重试队列,其实不完全准确。

当MQ接收到RECONSUME_LATER后,首先会完成消息的转换,把消息存到延时队列中,然后再根据消息的延时时间保存到重试队列中。

如果重试了16次之后依然无法处理,就会把这些消费放入 死信队列 。死信队列中的消息RocketMQ不会再做处理,这部分数据要怎么处理就要看我们的业务场景了,我们可以做一个后台线程去订阅这个死信队列,完成后续消息的处理。

消息乱序

接下来我们聊一聊消息乱序问题,为什么会出现这个问题呢,这个其实不难理解。

我们都学过,每个Topic可以有多个MessageQueue,写入消息的时候实际上会平均分配给不同的MessageQueue。

然后假如我们有一个Consume Group,这个消费组中的每台机器都会负责一部分MessageQueue,那么就会导致消息的顺序乱序问题。

举个例子,生产者发送了两条顺序消息,先是insert,后是update,分别分配到两个MessageQueue中,消费者组中的两台机器分别处理两个队列的消息,这个时候是无法保证顺序性的,有可能会先执行update,后执行insert,导致数据发生错误。

那么如何解决消息乱序问题呢?

其实道理也很简单,把需要保持顺序的消息都放入到同一个MessageQueue中,让同一台机器处理不就可以了吗。

我们完全可以根据唯一ID与队列的数量进行hash运算,保证这些消息进入到同一个队列中,最简单的算法就是取余运算了。

现在我们能保证这批消息进入到同一个队列中了,似乎这样就能保证消息不会乱序了,但真的是这样吗?

上文我们说到如果消费者数据库出现问题,使用重试队列重试消息,那么对于需要保证顺序的消息也可以使用这套方案吗?

肯定是不能的,如果使用重试机制是无法保证顺序性的。

RocketMQ提供了另一个状态,SUSPEND_CURRENT_QUEUE_A_MOMENT,意思是先等一会,再接着处理这批消息,而不是把这批消息放入重试队列里去处理其他消息。

所以我们只要返回这个状态就可以了。

总结

好了,到这里关于RocketMQ重复消费和乱序问题的产生原因和解决方案我们就介绍完了,同时也介绍了RocketMQ的重试机制、延时消息和死信队列。

有些地方可能比较复杂,可能需要小伙伴们重复阅读几次才能理解,如果哪里有想不清楚的,或者有疑问的可以联系王子共同探讨。

微信扫一扫,分享到朋友圈

探索RocketMQ的重复消费和乱序问题

python+flask编写一个简单的登录接口例子

上一篇

iPhone 12 Pro Max拆解电池规格曝光 容量为3687mAh

下一篇

你也可能喜欢

探索RocketMQ的重复消费和乱序问题

长按储存图像,分享给朋友