RocketMQ 消费失败消息处理

存储架构 2017-06-12

闲着研究了下RocketMQ消费失败消息的处理逻辑这里记录下~,更细化说这里只讨论Push模式(其实实现还是Pull的模式)非顺序消费的情况~Pull和顺序消息这里暂时不做讨论哈~(还没研究- -)

消费失败处理逻辑

  • 消费成功的情况RockeMQ会通过移动消费offset位点向前来标示消息已被处理
  • 而对于业务处里失败的消息采用的策略是将消息回发回Broker(并存放到一个 %RETRY%XX 的topic中), 大家一听可以想到的是回发broker这时候broker挂了怎么办?
  • 回发失败的时候会在本机启动个task来重试..恩 然后这时候consumer机器挂了了怎么办重试没了,难道丢消息?
  • 所以为了保证consumer掉电不丢消费失败且回发失败的消息,代码里保证offsetManage(local or remote)中offset不会前移超过重发失败消息的offset,这样可以保证在下次需要,如果下次consumer活过来时(这时一定会从offsetManage中取offset, 恩其实正常运行中不会每次都取, 顺序消息除外...),一定可以重拉到消费失败的消息(后面会提到这个的代价是会重复拉到很多上次已经消费的消息,不过业务同学的代码都是幂等的,所以逃~)
  • 对于消费失败但回发成功的消息,会直接更新offset假装认为那几条消息已经被消费成功,因为他们已经转生在 %RETRY%XX topic里作为新消息等待消费了~当前消费者可以专注于干其他事情.(补充: RERTRY topic实际会带上delay所以实际是先 SCHEDULE_TOPIC 然后再 %RETRY%XX , 这个具体见其他同学关于delay消息的解析)

可以看到,重发这种模式是不会丢消息的,即使broker挂了,consumer挂了,一定会消费到,虽然可能获得很多不想要的重复消息- -

为啥这么搞

写本文的原因就是我组几个小伙伴都觉得这个很奇怪,为啥这么弄呢~?个人研究了下理解是这样的....(其实自己刚开始研究RocketMQ很多理解可能有问题,欢迎大家一块讨论学习 哈哈哈)

可以冷静看下当前消息消费场景特点:

  • 给了我们一个Queue,访问的时候需要通过一层网络
  • 为了希望消费者能尽快的获得大量消息,结合上条希望consumer更好是一次获取一批而不是一条消息
  • 因为顺序并不重要,consumer本身应该可以并发消费这批消息
  • 因为并发消费消息,不是等上条ok才能消费下一条,就会有ack先后顺序问题
  • 为了server端ack应该是高效的,每条记录一个状态 vs 已成功offset?
  • 更进一步,获取获取第二批消息能否需要等待上一批消费完成? 其实没必要只要消费者有空闲线程可以先抓过来消费第二批,虽然第一批里某几条处理比较慢,但多数情况下应该能不会一会就恢复,其他线程先干第二批的活, 所以拉取和处理应该分离
  • 其实从上条可以看出对于抓取的速度应该根据消费者处理能力来控制~如果消费还有闲的可以疯狂的从Queue中先抓过来,只要不把没处理成功的给ack掉;如果消费者已经严重delay无力处理则需要降低抓取速度

完整处理

感觉RocketMQ处理这部分的代码挺巧妙...几个核心参与类:

  • ConsumeQueue : Consumer角度消费的一个Queue(有些类似kafka里partition的概念, 一个Queue只会被一个consumer消费,虽然一个consumer可以消费多个Q)
  • PullRequest : 当前consumer下已分配的每个ConsumeQueue消费者端都会新建一个PullRequest,里面记录 nextOffset 即从Server 拉取offset拉取offset消费offset 是两个offset才能第一批没消费完就拉第二批; 这个Request会在rebalanceService中创建,并被多次更新nextOffset多次进入PullRequestQueue来达到持续拉取的循环效果- -(也会被延迟丢Q来控制速率)
  • PullRequestQueue : 一个内存队列,充当 PullMessageService 的入参
  • PullMessageService : 负责拉取消息的 拉取线程 ,不停的读取 PullRequestQueue 根据request拉取消息,然后将消息丢到 ProcessQueue 中并新建 ConsumeRequest 提交到 ConsumeService 处理, 然后生成下一批的PullRequest丢到 PullRequestQueue :继续消费下一批,达到持续循环拉取的作用
  • ConsumeRequest : 虽然叫request但除了要consume的消息数据外,还有具体的消费逻辑(是个Runnable- -); 关键元素就是这批要处理msg列表对这批消息的处理逻辑,run里会调用用户注册的listener,并根据处理情况,对失败消息回发,并根据失败和回发结果, 更新ProcessQueue以及OffsetStore
  • ProcessQueue : 又一个内存队列保存实现是TreeMap,在处理中的消息,处理处理成功或处理失败但回发成功都会从这个Queue中移除, 消费offset 的上报基于ProcessQueue中最小的offset来完成(所以失败未回发成功的不会被移除);另外在ProcessQueue里最大offset和最小offset过大(MaxSpan)时,前面的PullMessageService会减速等一会在基于运行抓取(等一会儿再往PullRequestQueue里扔消息).
  • ConsumeService : 一个ConsumeRequest的Executor可以理解为一个线程池
  • OffsetStore : 维护 消费offset (即offset之前都处理完成)
  • RebalanceService : 负责给consume分配queue,而对于目前讨论过程他的作用是初始化了了对应的PullRequestQueue和首次的PullRequest, offset从offsetStore获取

简单画了个图说明上面几个类的关系~(手指在ipad上画得没有笔所以特别难看- -先将就吧)

Notes - Page 1.png

失败重试的细节好像没画出来,= = 画图不好画。。结合上面描述看代码哈~- -

最终达到的效果

  • 从Server是批量拉取的
  • 拉取线程不需要等待上一批被处理就能开始拉取下一批,只要ProcessQueue没超 MaxSpan (也就是消费某几条卡主太久), 就可以一直拉取
  • 消费listener可以并发消费,并各自返回完成状态, 部分消费者卡一段时间不影响其他消费者消费
  • Consumer保证实际消费端offset保证offset之前必须是已处理成功或处理失败但已回发成功
  • 回发不成功会本地重试且远端offset不会前移
  • 如果重启或被新分配队列会从offsetStrore获取初始offset,所以可能会有不必要的重复消息,所以消息处理需要做好幂等

总结

  • 可以批量,且拉取和处理分离,同时保证不丢数据,提升效率同时代价是重复消息
  • 通过只记录offset提升ack效率(可以一次ack一批,并且不用每条记录记录状态)
  • 通过分离拉取offset(in PullRequest)和消费offset(in OffsetStore)分离拉取和处理进度,提升拉取效率,并根据消费者处理卡主情况做拉取阀值控制
  • 通过回发消息加速批次中其他已处理成功消息的ack消费offset,如果不回发那消费offset不能迁移,重启之类会导致更多的消息重发, 而有回发后只要回发成功了就可以前移offset(反正不关心顺序只要求效率)
  • 对于回发失败,只能不前移消费offset。。。然后通过本地重试做非consumer宕机情况的优化

您可能感兴趣的

RocketMQ 源码学习 3 :Remoting 模块 rocketmq-remoting 模块是 RocketMQ 中负责网络通信的模块,被其他所有需要网络通信的模块依赖。它是基于 Netty 实现的,避免了网络编程很多 tricky 的问题。 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,一个 Reac...
RocketMQ(四)——消息重试 对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。 一、 Producer端重试 生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导...
RocketMQ大数据畅想 刚刚过去的双十一,阿里自主研发的消息中间件RocketMQ,充分展现了它的低延迟特性,大部分消息请求落在2ms内,慢请求也都落在20ms内,这无疑给追求快速响应的在线交易系统(OLTP)带去了福音。 也是在今年11月份,RocketMQ进入Apache孵化。这款最初设计来为淘宝交易系统异步解耦、...
rocketmq定时清理commitlog文件源码分析 rocketmq的配置参数 // 何时触发删除文件, 默认凌晨4点删除文件 @ImportantField private String deleteWhen = "04";猜想rocketmq会起一个一天执行一次的定时任务。 但看了代码发现并不是这样。 在存储服务启...
消息中间件架构面面观 21CTO社区导读: 本文在可用性和可靠性的基础上,讨论各种架构的优缺点,最后给出自己关于消息中间件的架构思考。 Kafka 首先还是来看Kafka的系统架构(做消息中间件逃不开要去了解Kafka)。 Kafka ecosystem包含以下几块内容: P...
简书

责编内容来自:简书 (本文源链)
我还没有学会写个人说明!

阅读提示:酷辣虫无法对本内容的真实性提供任何保证,请自行验证并承担相关的风险与后果!本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。