简单模式队列
在这部分的使用指南中,我们要用 Java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 “Hello World” 消息。
在下面的图中,”P” 代表生产者,而 “C” 代表消费者。中间的就是一个 Queue,一个消息缓存区。
创建项目
添加依赖
<!-- rabbitmq依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
Sending
我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。
Send.java
package com.xxxx.simple.send; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 简单模式队列-消息发送者 */ public class Send { // 队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); /** * 声明队列 * 第一个参数queue:队列名称 * 第二个参数durable:是否持久化 * 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。 * 这里需要注意三点: * 1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。 * 2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。 * 3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。 * 这种队列适用于只限于一个客户端发送读取消息的应用场景。 * 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。 * 这种队列适用于临时队列。 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 创建消息 String message = "Hello World!"; // 将产生的消息放入队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
消息发送成功以后,通过RabbitMQ管理界面可以看到队列的相关信息
Receiving
消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。
Recv.java
package com.xxxx.simple.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 简单模式队列-消息接收者 */ public class Recv { // 队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 Connection connection = factory.newConnection(); // 获取通道 Channel channel = connection.createChannel(); // 指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // ---------------------之前旧版本的写法-------begin----------- /* // 获取消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取消息并在控制台打印 String message = new String(body, "utf-8"); System.out.println(" [x] Received '" + message + "'"); } }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); */ // ---------------------之前旧版本的写法--------end------------ // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
消息接收成功以后,通过RabbitMQ管理界面可以看到队列的相关信息
测试
运行Send.java
运行Recv.java
总结
问题:如果任务量很大,消息得不到及时的消费会造成队列积压,问题非常严重,比如内存溢出,消息丢失等。
解决:配置多个消费者消费消息。
总结: 简单队列-处理消息效率不高,吞吐量较低,不适合生成环境
Work queues-工作模式队列
工作模式队列-消息轮询分发(Round-robin)
通过Helloworld工程我们已经能够构建一个简单的消息队列的基本项目,项目中存在几个角色:生产者、消费者、队列,而对于我们真实的开发中,对于消息的消费者通过是有多个的,比如在实现用户注册功能时,用户注册成功,会给响对应用户发送邮件,同时给用户发送手机短信,告诉用户已成功注册网站或者app 应用,这种功能在大部分项目开发中都比较常见,而对于helloworld 的应用中虽然能够对消息进行消费,但是有很大问题: 消息消费者只有一个,当消息量非常大时,单个消费者处理消息就会变得很慢,同时给节点页带来很大压力,导致消息堆积越来越多。对于这种情况,RabbitMQ 提供了工作队列模式,通过工作队列提供做个消费者,对MQ产生的消息进行消费,提高MQ消息的吞吐率,降低消息的处理时间 。处理模型图如下
Sending
Send.java
package com.xxxx.work.roundRobin.send; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作模式队列-轮询分发-消息发送者 */ public class Send { // 队列名称 private final static String QUEUE_NAME = "work_roundRobin"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 创建消息 for (int i = 1; i <= 20; i++) { String message = "Hello World! ----- " + i; // 将产生的消息放入队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
Receiving
Recv01.java
package com.xxxx.work.roundRobin.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作模式队列-轮询分发-消息接收者 */ public class Recv01 { // 队列名称 private final static String QUEUE_NAME = "work_roundRobin"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 Connection connection = factory.newConnection(); // 获取通道 Channel channel = connection.createChannel(); // 指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 模拟程序执行所耗时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Recv02.java
package com.xxxx.work.roundRobin.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作模式队列-轮询分发-消息接收者 */ public class Recv02 { // 队列名称 private final static String QUEUE_NAME = "work_roundRobin"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 Connection connection = factory.newConnection(); // 获取通道 Channel channel = connection.createChannel(); // 指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received02 '" + message + "'"); // 模拟程序执行所耗时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }; // 监听队列 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
测试
运行Send
运行Recv
总结
从结果可以看出消息被平均分配到两个消费方,来对消息进行处理,提高了消息处理效率,创建多个消费者来对消息进行处理。这里RabitMQ采用轮询来对消息进行分发时保证了消息被平均分配到每个消费方,但是引入新的问题:真正的生产环境下,对于消息的处理基本不会像我们现在看到的这样,每个消费方处理的消息数量是平均分配的,比如因为网络原因,机器cpu,内存等硬件问题,消费方处理消息时同类消息不同机器进行处理时消耗时间也是不一样的,比如1号消费者消费1条消息时1秒,2号消费者消费1条消息是5秒,对于1号消费者比2号消费者处理消息快,那么在分配消息时就应该让1号消费者多收到消息进行处理,也即是我们通常所说的”能者多劳”,同样Rabbitmq对于这种消息分配模式提供了支持。
问题:任务量很大,消息虽然得到了及时的消费,单位时间内消息处理速度加快,提高了吞吐量,可是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。
解决:采用消息公平分发。
总结:工作队列-消息轮询分发-消费者收到的消息数量平均分配,单位时间内消息处理速度加快,提高了吞吐量。
工作模式队列-消息公平分发(fair dispatch)
在案例01中对于 消息分发采用的是默认轮询分发,消息应答采用的自动应答模式 ,这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。
为了解决这个问题, 我们使用 basicQos(prefetchCount = 1)
方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送 。执行模型图如下:
Sending
Send.java
package com.xxxx.work.fair.send; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作模式队列-公平分发-消息发送者 */ public class Send { // 队列名称 private final static String QUEUE_NAME = "work_fair"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 创建消息 for (int i = 1; i <= 20; i++) { String message = "Hello World! ----- " + i; // 将产生的消息放入队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
Receiving
Recv01.java
package com.xxxx.work.fair.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作模式队列-公平分发-消息接收者 */ public class Recv01 { // 队列名称 private final static String QUEUE_NAME = "work_fair"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 模拟程序执行所耗时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Recv02.java
package com.xxxx.work.fair.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 工作队列-公平分发-消息接收者 */ public class Recv02 { // 队列名称 private final static String QUEUE_NAME = "work_fair"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 指定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received02 '" + message + "'"); // 模拟程序执行所耗时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
测试
运行Send
运行Recv
总结
从结果可以看出1号消费者消费消息数量明显高于2号,即消息通过fair 机制被公平分发到每个消费者。
问题:生产者产生的消息只可以被一个消费者消费,可不可以被多个消费者消费呢?
解决:采用发布与订阅模式。
总结:工作队列-公平轮询分发-根据不同消费者机器硬件配置,消息处理速度不同,收到的消息数量也不同,通常速度快的处理的消息数量比较多,最大化使用计算机资源。适用于生成环境。
Publish/Subscribe-消息的发布与订阅模式队列
对于微信公众号,相信每个人都订阅过,当公众号发送新的消息后,对于订阅过该公众号的所有用户均可以收到消息,这个场景大家都能明白,同样对于RabbitMQ消息的处理也支持这种消息处理,当生产者把消息投送出去后,不同的消费者均可以对该消息进行消费,而不是消息被一个消费者消费后就立即从队列中删除,对于这种消息处理,我们通常称之为消息的发布与订阅模式,凡是消费者订阅了该消息,均能够收到对应消息进行处理,比较常见的如用户注册操作。模型图如下:
从图中看到:
- 消息产生后不是直接投送到队列中,而是将消息先投送给Exchange交换机,然后消息经过Exchange 交换机投递到相关队列
- 多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。
具体实现核心代码如下:
Sending
Send.java
package com.xxxx.publish.subscribe.fanout.send; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发布与订阅模式队列-fanout广播模式-消息发送者 */ public class Send { // 队列名称 // 如果不声明队列,会使用默认值,RabbitMQ会创建一个排他队列,连接断开后自动删除 //private final static String QUEUE_NAME = "ps_fanout"; // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 声明队列 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定交换机 fanout:广播模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 创建消息,模拟发送手机号码和邮件地址 String message = "18600002222|12345@qq.com"; // 将产生的消息发送至交换机 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
Receiving
这里对于消费者,消费消息时,消息通过交换机Exchange被路由到指定队列,绑定队列到指定交换机来实现,一个消费者接到消息后用于邮件发送模拟,另一消费者收到消息,用于短信发送模拟。
Recv01.java
package com.xxxx.publish.subscribe.fanout.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发布与订阅模式队列-fanout广播模式-消息接收者 */ public class Recv01 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 fanout:广播模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, ""); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息,按|分割以后一个消费者发短信,一个消费者发邮件 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Recv02.java
package com.xxxx.publish.subscribe.fanout.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 发布与订阅模式队列-fanout广播模式-消息接收者 */ public class Recv02 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 fanout:广播模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, ""); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息,按|分割以后一个消费者发短信,一个消费者发邮件 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
测试
运行Send
运行Recv
总结
从结果可以看出生产者发送了一条消息,用于邮件发送和短信发送的消费者均可以收到消息进行后续处理。
问题:生产者产生的消息所有消费者都可以消费,可不可以指定某些消费者消费呢?
解决:采用direct路由模式。
Routing-路由模式队列
通过案例03,可以看到,生产者将消息投送给交换机后,消息经交换机分发到不同的队列即:交换机收到消息,默认对于绑定到每个交换机的队列均会接收到交换机分发的消息,对于案例03的交换机的消息分发Exchange Types为 fanout
类型,通常在真正项目开发时会遇到这种情况:在对项目信息输出日志进行收集时,会把日志(error warning,info)分类进行输出,这时通过Exchange Types中的 direct
类型就可以实现,针对不同的消息,在对消息进行消费时,通过 Exchange types
以及 Routing key
设置的规则 ,便可以将不同消息路由到不同的队列中然后交给不同消费者进行消费操作。模型图如下:
从图中可以看出:
- 生产者产生的消息投给交换机
- 交换机投送消息时的Exchange Types为direct类型
- 消息通过定义的Routing Key被路由到指定的队列进行后续消费
具体实现核心代码如下:
Sending
Send.java
package com.xxxx.publish.subscribe.direct.send; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * direct路由模式队列-消息发送者 */ public class Send { // 队列名称 // 如果不声明队列,会使用默认值,RabbitMQ会创建一个排他队列,连接断开后自动删除 // 交换机名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 绑定交换机 direct:路由模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 创建消息,模拟收集不同级别日志 String message = "INFO消息"; //String message = "WARNING消息"; //String message = "ERROR消息"; // 设置路由routingKey String routingKey = "info"; //String routingKey = "error"; // 将产生的消息发送至交换机 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
Receiving
消费者对消息进行后续消费时,对于接收消息的队列在对消息进行接收时,绑定到每一个交换机上的队列均会指定其Routing Key规则,通过路由规则将消息路由到执行队列中。
消费者01 routingKey=info和warning,对应级别日志消息均会路由到该队列中。
Recv01.java
package com.xxxx.publish.subscribe.direct.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * direct路由模式队列-消息接收者 */ public class Recv01 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 direct:路由模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 设置路由routingKey String routingKeyInfo = "info"; String routingKeyWarning = "warning"; // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKeyInfo); channel.queueBind(queueName, EXCHANGE_NAME, routingKeyWarning); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息,按|分割以后一个消费者发短信,一个消费者发邮件 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
消费者02 routingKey=error,对应级别日志消息均会路由到该队列中。
Recv02.java
package com.xxxx.publish.subscribe.direct.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * direct路由模式队列-消息接收者 */ public class Recv02 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 direct:路由模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 设置路由routingKey String routingKey = "error"; // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received02 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
测试
运行Send
运行Recv
总结
从结果可以看出生产者发送了多条设置了路由规则的消息,消费者可以根据具体的路由规则消费对应队列中的消息,而不是所有消费者都可以消费所有消息了。
问题:生产者产生的消息如果场景需求过多需要设置很多路由规则,可不可以减少?
解决:采用topic主题模式。
Topics-主题模式队列
通过案例04看到消息通过交换机Exchange Type以及Routing Key规则,可以将消息路由到指定的队列,也符合在工作中的场景去使用的一种方式,对于RabbitMq 除了 direct
模式外,Mq 同样还提供了 topics
主题模式来对消息进行匹配路由,比如在项目开发中,拿商品模块来说,对于商品的查询功能在对商品进行查询时我们将查询消息路由到查询对应队列,而对于商品的添加、更新、删除等操作我们统一路由到另外一个队列来进行处理,此时采用direct 模式可以实现,但对于维护的队列可能就不太容易进行维护,如果涉及模块很多,此时对应队列数量就很多,此时我们就可以通过 topic
主题模式来对消息路由时进行匹配,通过指定的匹配模式将消息路由到匹配到的队列中进行后续处理。对于routing key匹配模式定义规则举例如下:
- routing key为一个句点号
.
分隔的字符串(我们将被句点号.
分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” - routing key中可以存在两种特殊字符
*
与#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是零个)
例如:
以上图中的配置为例:
- routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,
- routingKey=”lazy.orange.fox”的消息会路由到Q1,Q2,
- routingKey=”lazy.brown.fox”的消息会路由到Q2,
- routingKey=”lazy.pink.rabbit”的消息会路由到Q2;
- routingKey=”quick.brown.fox”;
- routingKey=”orange”;
- routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。
具体实现核心代码:
Sending
这里以商品模块为例,商品查询路由到商品查询队列,商品更新路由到商品更新(添加,更新,删除操作)队列中。
Send.java
package com.xxxx.publish.subscribe.topic.send; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * topic主题模式队列-消息发送者 */ public class Send { // 队列名称 // 如果不声明队列,会使用默认值,RabbitMQ会创建一个排他队列,连接断开后自动删除 // 交换机名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = null; Channel channel = null; try { // 通过工厂创建连接 connection = factory.newConnection(); // 获取通道 channel = connection.createChannel(); // 绑定交换机 topic:主题模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 创建消息,模拟商品模块 String message = "商品查询操作"; //String message = "商品更新操作"; // 设置路由routingKey String routingKey = "select.goods.byId"; //String routingKey = "update.goods.byId.andName"; // 将产生的消息发送至交换机 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { try { // 关闭通道 if (null != channel && channel.isOpen()) channel.close(); // 关闭连接 if (null != connection && connection.isOpen()) connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }
Receiving
Recv01.java
package com.xxxx.publish.subscribe.topic.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * topic主题模式队列-消息接收者 */ public class Recv01 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 topic:主题模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 设置路由routingKey String routingKey = "select.goods.*"; // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息,按|分割以后一个消费者发短信,一个消费者发邮件 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received01 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Recv02.java
package com.xxxx.publish.subscribe.topic.recv; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * topic主题模式队列-消息接收者 */ public class Recv02 { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 绑定交换机 topic:主题模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 获取队列名称 String queueName = channel.queueDeclare().getQueue(); // 设置路由routingKey String routingKey = "update.goods.#"; // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received02 '" + message + "'"); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
测试
运行Send
运行Recv
总结
从结果可以看出生产者发送了多条设置了路由匹配规则(主题)的消息,根据不同的路由匹配规则(主题),可以将消息根据指定的routing key路由到匹配到的队列中,也是在生产中比较常见的一种消息处理方式。
问题:RabbitMQ本身是基于异步的消息处理,是否可以同步实现?
解决:采用RPC模式。
RPC-远程过程调用模式队列
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14种properties,这些属性会随着消息一起发送)中设置两个值
replyTo
(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId
(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败) - 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到
replyTo
指定的Queue,同时携带correlationId
属性
客户端之前已订阅 replyTo
指定的Queue,从中收到服务器的应答消息后,根据其中的 correlationId
属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
具体实现核心代码:
Server
Server.java
package com.xxxx.rpc.server; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * RPC模式队列-服务端 */ public class RPCServer { // 队列名称 private static final String RPC_QUEUE_NAME = "rpc_queue"; /** * 计算斐波那契数列 * * @param n * @return */ private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] args) { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); try { // 通过工厂创建连接 final Connection connection = factory.newConnection(); // 获取通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); /* 限制RabbitMQ只发不超过1条的消息给同一个消费者。 当消息处理完毕后,有了反馈,才会进行第二次发送。 */ int prefetchCount = 1; channel.basicQos(prefetchCount); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); // 获取消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 获取replyTo队列和correlationId请求标识 AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { // 接收客户端消息 String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); // 服务端根据业务需求处理 response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { // 将处理结果发送至replyTo队列同时携带correlationId属性 channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); // 手动回执消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread // RabbitMq消费者工作线程通知RPC服务器其他所有线程运行 synchronized (monitor) { monitor.notify(); } } }; // 监听队列 /* autoAck = true代表自动确认消息 autoAck = false代表手动确认消息 */ boolean autoAck = false; channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); // Wait and be prepared to consume the message from RPC client. // 线程等待并准备接收来自RPC客户端的消息 while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Client
Client.java
package com.xxxx.rpc.client; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; /** * RPC模式队列-客户端 */ public class RPCClient implements AutoCloseable { private Connection connection; private Channel channel; // 队列名称 private String requestQueueName = "rpc_queue"; // 初始化连接 public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] args) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 10; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); // 请求服务端 String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } // 请求服务端 public String call(String message) throws IOException, InterruptedException { // correlationId请求标识ID final String corrId = UUID.randomUUID().toString(); // 获取队列名称 String replyQueueName = channel.queueDeclare().getQueue(); // 设置replyTo队列和correlationId请求标识 AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); // 发送消息至队列 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); // 设置线程等待,每次只接收一个响应结果 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); // 接受服务器返回结果 String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { // 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回false response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); // 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值 String result = response.take(); // 从消息队列中丢弃该值 channel.basicCancel(ctag); return result; } // 关闭连接 public void close() throws IOException { connection.close(); } }
测试
运行Server
运行Client
查看Server