深入理解RabbitMQ中的prefetch_count参数

微信扫一扫,分享到朋友圈

深入理解RabbitMQ中的prefetch_count参数

前提

在某一次用户标签服务中大量用到异步流程,使用了 RabbitMQ 进行解耦。其中,为了提高消费者的处理效率针对了不同节点任务的消费者线程数和 prefetch_count 参数都做了调整和测试,得到一个相对合理的组合。这里深入分析一下 prefetch_count 参数在 RabbitMQ 中的作用。

prefetch_count参数的含义

先从 AMQPAdvanced Message Queuing Protocol ,即高级消息队列协议, RabbitMQ 实现了此协议的 0-9-1 版本的大部分内容)和 RabbitMQ 的具体实现去理解 prefetch_count 参数的含义,可以查阅对应的文档(见文末参考资料)。 AMQP 0-9-1 定义了 basic.qos 方法去限制消费者基于某一个 Channel 或者 Connection 上未进行 ack 的最大消息数量上限。 basic.qos 方法支持两个参数:

global
prefetch_count

这两个参数在 AMQP 0-9-1 定义中的含义和 RabbitMQ 具体实现时有所不同,见下表:

global 参数值 AMQP 0-9-1prefetch_count 参数的含义 RabbitMQprefetch_count 参数的含义
false prefetch_count 值在当前 Channel 的所有消费者共享 prefetch_count 对于基于当前 Channel 创建的消费者生效
true prefetch_count 值在当前 Connection 的所有消费者共享 prefetch_count 值在当前 Channel 的所有消费者共享

或者用简洁的英文表格理解:

global prefetch_count in AMQP 0-9-1 prefetch_count in RabbitMQ
false Per channel limit Per customer limit
true Per connection limit Per channel limit

这里画一个图理解一下:

上图仅仅为了区分协议本身和 RabbitMQ 中实现的不同,接着说说 prefetch_count 对于消费者(线程)和待消费消息的作用。假定一个前提: RabbitMQ 客户端从 RabbitMQ 服务端获取到队列消息的速度比消费者线程消费速度快,目前有两个消费者线程共用一个 Channel 实例。当 global 参数为 false 时候,效果如下:

而当 global 参数为 true 时候,效果如下:

在消费者线程处理速度远低于 RabbitMQ 客户端从 RabbitMQ 服务端获取到队列消息的速度的场景下, prefetch_count 条未进行 ack 的消息会暂时存放在一个队列(准确来说是阻塞队列,然后阻塞队列中的消息任务会流转到一个列表中遍历回调消费者句柄,见下一节的源码分析)中等待被消费者处理。这部分消息会占据 JVM 的堆内存,所以在性能调优或者设定应用程序的初始化和最大堆内存的时候,如果刚好用到 RabbitMQ 的消费者,必须要考虑这些”预取消息”的内存占用量。不过值得注意的是: prefetch_countRabbitMQ 服务端的参数,它的设置值或者快照都不会存放在 RabbitMQ 客户端 。同时需要注意 prefetch_count 生效的条件和特性(从参数设置的一些 demo 和源码上感知):

  • prefetch_count 参数仅仅在 basic.consumeautoAck 参数设置为 false 的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
  • basic.consume 如果在非自动确认模式下忘记了手动调用 basic.ack ,那么 prefetch_count 正是未 ack 消息数量的最大上限。
  • prefetch_count 是由 RabbitMQ 服务端控制,一般情况下能保证各个消费者线程中的未 ack 消息分发是均衡的,这点笔者猜测是 consumerTag 起到了关键作用。

RabbitMQ客户端中prefetch_count源码跟踪

编写本文的时候引入的RabbitMQ客户端版本为:com.rabbitmq:amqp-client:5.9.0

上面说了这么多都只是根据官方的文档或者博客中的理论依据进行分析,其实更加根本的分析方法是直接阅读 RabbitMQJava 客户端源码,主要是针对 basic.qosbasic.consume 两个方法,对应的是 com.rabbitmq.client.impl.ChannelN#basicQos()com.rabbitmq.client.impl.ChannelN#basicConsume() 两个方法。先看 ChannelN#basicQos()

这里的 basicQos() 方法多了一个 prefetchSize 参数,用于限制分发内容的大小上限,默认值 0 代表无限制,而 prefetchCount 的取值范围是 [0,65535] ,取值为 0 也是代表无限制。这里的 ChannelN#basicQos() 实现中直接封装 basic.qos 方法参数进行一次 RPC 调用,意味着直接更变 RabbitMQ 服务端的配置,即时生效,同时参数值完全没有保存在客户端代码中,印证了前面一节的结论。接着看 ChannelN#basicConsume() 方法:

上图已经把关键部分用红圈圈出,因为整个消息消费过程是异步的,涉及太多的类和方法,这里不全量贴出,整理了一个流程图:

整个消息消费过程, prefetch_count 参数并未出现在客户端代码中,又再次印证了前面一节的结论,即 prefetch_count 参数的行为和作用完全由 RabbitMQ 服务端控制。而最终 Customer 或者常用的 DefaultCustomer 句柄是在 WorkPoolRunnable 中回调的,这类任务的执行线程来自于 ConsumerWorkService 内部的线程池,而这个线程池又使用了 Executors.newFixedThreadPool() 去构建,使用了默认的线程工厂类,因此在 Customer#handleDelivery() 方法内部打印的线程名称的样子是 pool-1-thread-*

这里VariableLinkedBlockingQueue就是前一节中的message queue的原型

prefetch_count参数使用

设置 prefetch_count 参数比较简单,就是调用 Channel#basicQos() 方法:

public class RabbitQos {
static String QUEUE = "qos.test";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, true, false, false, null);
channel.basicQos(2);
channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("1------" + Thread.currentThread().getName());
sleep();
}
});
channel.basicConsume("qos.test", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("2------" + Thread.currentThread().getName());
sleep();
}
});
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE, MessageProperties.TEXT_PLAIN, String.valueOf(i).getBytes());
}
sleep();
}
private static void sleep() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (Exception ignore) {
}
}
}

上面是原生的 amqp-client 的写法,如果使用了 spring-amqpspring-boot-starter-amqp ),可以通过配置文件中的 spring.rabbitmq.listener.direct.prefetch 属性指定所有消费者线程的 prefetch_count ,如果要针对部分消费者线程进行该属性的设置,则需要针对 RabbitListenerContainerFactory 进行改造。

prefetch_count参数最佳实践

关于 prefetch_count 参数的设置, RabbitMQ 官方有一篇文章进行了分析: 《Finding bottlenecks with RabbitMQ 3.3》 。该文章分析了消息流控的整个流程,其中提到了 prefetch_count 参数的一些指标:

这里指出了,如果 prefetch_count 的值超过了 30 ,那么网络带宽限制开始占主导地位,此时进一步增加 prefetch_count 的值就会变得收效甚微。也就是说, 官方是建议把 prefetch_count 设置为 30 。这里再参看一下 spring-boot-starter-amqp 中对此参数定义的默认值,具体是 AbstractMessageListenerContainer 中的 DEFAULT_PREFETCH_COUNT

如果没有通过 spring.rabbitmq.listener.direct.prefetch 进行覆盖,那么使用 spring-boot-starter-amqp 中的注解定义的消费者线程中设置的 prefetch_count 就是 250

笔者认为,应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑 prefetch_count 的设置。总结如下(个人经验仅供参考):

  • 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把 prefetch_count 设置为 1
  • 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未 ack 总消息量的内存极限,从而设计一个合理的 prefetch_count 值。
  • 当消费者线程的处理速度十分快,远远大于 RabbitMQ 服务端的消息分发,在网络带宽充足的前提下,设置可以把 prefetch_count 值设置为 0 ,不做任何的消息流控。
  • 一般场景下,建议使用 RabbitMQ 官方的建议值 30 或者 spring-boot-starter-amqp 中的默认值 250

小结

小结一下:

  • prefetch_countRabbitMQ 服务端的参数,设置后即时生效。
  • prefetch_count 对于 AMQP-0-9-1 中的定义与 RabbitMQ 中的实现不完全相同。
  • prefetch_count 值设置建议使用框架提供的默认值或者通过分组实验结合数据报大小进行计算和评估出一个合理值。

彩蛋

笔者把文章发布到公众号和朋友圈后,笔者的师傅作了点评,指出其中的一点不足:

确实如此, prefetch_count 的本质作用就是消费者的流控,官方的那篇文章也提到了网络和带宽的重要性,所以要考虑 RTTRound-Trip Time ,往返时延),这里的 RTT 概念来源于《计算机网络原理》:

The RTT includes packet-propagation delays, packet-queuing delays and packet -processing delay.

也就是说 RTT = 数据包传播时延(往返)+ 数据包排队时延(路由器和交换机的)+ 数据处理时延(应用程序处理耗时,用在本文的场景就是消费者处理消息的耗时)。假设 RTT 中只计算网络的时延,不包含数据处理的时延,那么数据包往返需要 2RTT ,也就是一条消费消息处理的数据包的往返, RTT 越大,那么数据传输成本越高,应该允许客户端”预取”更多的未 ack 消息避免消费者线程等待。这样就可以计算出单个消费者线程处理达到最饱和状态下的”预取”消息量: prefetch_count = 2RTT / 消费者线程处理单条消息的耗时 。依照此概念举例:

  • RTT30ms ,而消费者线程处理单条消息的耗时为 10ms ,此时,消费速率占优势,可以考虑把 prefetch_count 设置为 6 或者更大的值(考虑堆内存极限的限制)。
  • RTT30ms ,而消费者线程处理单条消息的耗时为 200msRTT 占优势,消费速率滞后,此时考虑把 prefetch_count 设置为 1 即可。

思考:为什么spring-boot-starter-amqp把prefetch_count默认值设置为250这么高的值,很少开发者改动它却没有出现明显问题?

(本文完 c-4-d e-a-20201017)

微信扫一扫,分享到朋友圈

深入理解RabbitMQ中的prefetch_count参数

binlog server伪装master恢复增量数据

上一篇

糜子栽培技术

下一篇

你也可能喜欢

深入理解RabbitMQ中的prefetch_count参数

长按储存图像,分享给朋友