RabbitMQ死信队列DLX应用

微信扫一扫,分享到朋友圈

RabbitMQ死信队列DLX应用

进入死信队列的场景:

  1. 消息被拒绝 (basic.reject / basic.nack) 并且 requeue = false
  2. 消息 TTL 过期(在 RabbitMQ 3.5.8版本之前,实现消息的延迟发送就是依靠消息过期进入死信队列然后进行消费来完成的);
  3. 队列达到最大长度;

1、使用原生API实现死信队列DLX的应用

1.1、生产者

@Slf4j
public class ProducerDLX {
public static final String HOST = "148.70.153.63";
public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(System.getProperty("password"));
connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
// 创建连接和通道
@Cleanup Connection connection = connectionFactory.newConnection();
@Cleanup Channel channel = connection.createChannel();
// 创建死信队列DLX
String dlxExchangeName = "DLXExchange", dlxQueueName = "DLXQueue", dlxRoutingKey = "DLX";
channel.exchangeDeclare(dlxExchangeName, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
// 创建消息会自动过期的队列,并和指定的死信交换机绑定
String exchangeName = "amq.direct", queueName = "TestDLXQueue", routingKey = "DLX";
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-message-ttl", 30 * 1000); // 设置队列里消息的ttl的时间30s
argMap.put("x-dead-letter-exchange", dlxExchangeName); // 给队列设置死信交换机
channel.queueDeclare(queueName, true, false, false, argMap);
channel.queueBind(queueName, exchangeName, routingKey);
// 发送消息
String msg = "测试死信队列";
// 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
log.info("now:[{}],发送消息:[{}]", DateUtil.now(), msg);
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
复制代码
  1. 创建死信队列 DLXQueue 并和指定交换机 DLXExchange 进行绑定(其实也是普通的队列、普通的交换机)。
  2. 创建另外一个正常的消息队列 TestDLXQueue ,设置队列的 TTL 过期时间,同时通过 x-dead-letter-exchange 属性指定死信交换机 DLXExchange

1.2、测试消息过期进入死信队列

运行 main 函数,推送消息给 TestDLXQueue 队列。可以先看到消息先在 TestDLXQueue 队列中。

等到30秒后没有被消费,则会把消息推送到 DLXQueue 死信队列中。

1.3、死信队列的消费者

@Slf4j
public class ConsumerDLX {
public static final String HOST = "148.70.153.63";
public static final String USER_NAME = "libai";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(AMQP.PROTOCOL.PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(System.getProperty("password"));
connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
// 创建连接和通道
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("now:[{}],消费消息:[{}]", DateUtil.now(), new String(body));
}
};
channel.basicConsume("DLXQueue", true, consumer);
}
}
复制代码

1.4、运行测试

now:[2020-09-28 20:00:10],发送消息:[测试死信队列]
now:[2020-09-28 20:00:40],消费消息:[测试死信队列]
复制代码

主要过程: 生产者 —> 原交换机 amq.direct —> 原队列 TestDLXQueue (超过 TTL 之后) —> 死信交换机 DLXExchange —> 死信队列 DLXQueue —> 最终消费者。

2、Springboot整合RabbitMQ实现死信队列DLX的应用

2.1、配置死信队列

@Bean
public Queue DLXQueue() {
return new Queue("DLX_QUEUE", true, false, false);
}
@Bean
public DirectExchange DLXExchange() {
return new DirectExchange("DLX_EXCHANGE", true, false);
}
@Bean
public Binding bindingDLX() {
return BindingBuilder.bind(DLXQueue()).to(DLXExchange()).with("DLX");
}
复制代码

创建死信队列 DLX_QUEUE 并和指定交换机 DLX_EXCHANGE 进行绑定(其实也是普通的队列、普通的交换机)。

2.2、配置消息队列

@Bean
public Queue testDLXQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 30000); // 队列中的消息未被消费则30秒后过期
map.put("x-dead-letter-exchange", "DLX_EXCHANGE"); // 给队列设置死信交换机
return new Queue("TEST_DLX_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange testDLXExchange() {
return new DirectExchange("TEST_DLX_EXCHANGE", true, false);
}
@Bean
public Binding bindingTestDLX() {
return BindingBuilder.bind(testDLXQueue()).to(testDLXExchange()).with("DLX");
}
复制代码

创建另外一个正常的消息队列 TEST_DLX_QUEUE ,设置队列的 TTL 过期时间,同时通过 x-dead-letter-exchange 属性指定死信队列对应的交换机。

2.3、生产者

@RestController
public class DLXController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/testDLX")
public String testDLX() {
rabbitTemplate.convertAndSend("TEST_DLX_EXCHANGE", "DLX", "测试死信队列");
return "ok";
}
}
复制代码

等到30秒后没有被消费,则会把消息推送到 DLX_QUEUE 死信队列中。

3、死信队列实现消息延迟发送的缺点

  1. 如果统一用队列来设置消息的 TTL ,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟……需要创建很多队列来路由消息。
  2. 如果单独设置消息的 TTL ,则可能会造成队列中的消息阻塞,即前一条消息没有出队(没有被消费),后面的消息无法投递。比如第一条消息过期 TTL 是30min,第二条消息 TTL 是10min。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递。
  3. 可能存在一定时间误差。

所以在 RabbitMQ 3.5.8版本之后,可以利用官方的 rabbitmq-delayed-message-exchange 插件来实现消息的延迟发送,可以避免上面所说的问题。 RabbitMQ实现消息延迟推送

参考资料

代码地址

  • github:https://github.com/senlinmu1008/spring-boot/tree/master/rabbitmq-ttl-dlx
  • gitee:https://gitee.com/ppbin/spring-boot/tree/master/rabbitmq-ttl-dlx

个人网站

微信扫一扫,分享到朋友圈

RabbitMQ死信队列DLX应用

RabbitMQ实现RPC调用

上一篇

新理论认为宇宙气泡可能制造了暗物质

下一篇

你也可能喜欢

RabbitMQ死信队列DLX应用

长按储存图像,分享给朋友