RabbitMQ 基本使用

消息队列–Message Queue

应用场景

  • 异步处理:将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理,降低响应时间。

  • 解耦合:MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合,不会因为消费者出现问题而导致整个应用不可用。

  • 削峰填谷:根据二八法则,80% 的请求出现在 20% 的时间里,假如请求数 10000,时间 10 秒,那么 2 秒 8000 个请求(峰)、后面 8 秒 2000 个请求(谷),峰期 4000 QPS 数据库可能撑不住,使用消息队列后,按照数据库承受能力慢慢消费,将 2 秒的峰值数据平摊到 8 秒里面处理,数据库就不会出现问题(2 秒 8000 个请求处理不完就会积压在 MQ 里面,称为削峰;8 秒本来只需要处理 2000 个请求,但是有积压数据待处理,即为填谷)。

AMQP 和 JMS

MQ 是消息通信的模型;实现 MQ 的大致有两种主流方式:AMQP、JMS。AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。RabbitMQ是AMQP协议的Erlang的实现。

  • AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。
  1. 连接Connection:一个网络连接,比如TCP/IP套接字连接。
  2. 会话Session:端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
  3. 信道Channel:多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
  4. 客户端Client:AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
  5. 服务节点Broker:消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
  6. 端点:AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
  7. 消费者Consumer:一个从消息队列里请求消息的客户端程序。
  8. 生产者Producer:一个向交换机发布消息的客户端应用程序。
  • JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

  • 两者区别

  1. JMS是定义了统一的接口,来对消息操作进行统一;

  2. AMQP是通过规定协议来统一数据交互的格式,JMS限定了必须使用Java语言;

  3. AMQP只是协议,不规定实现方式,因此是跨语言的。JMS规定了两种消息模式;而AMQP的消息模式更加丰富。

消息队列产品

  • ActiveMQ:基于JMS

  • ZeroMQ:基于C语言开发

  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

  • RocketMQ:基于JMS,阿里巴巴产品

  • Kafka:类似MQ的产品;分布式消息系统,高吞吐量

安装

详见官方网站: RabbitMQ 官网下载页面

部署单机版,建议使用 docker,一条命令就搞定~

RabbitMQ

7种模式

www.rabbitmq.com/getstarted.…

简单模式,工作队列模式,发布订阅模式

路由模式,主题模式,远程调用模式

发布确认模式

简单模式

P:生产者;Queue:消息队列,图中红色部分;C:消费者。直接发给消息队列,底层走默认的交换机。

注意:消息队列如果不存在,生产者和消费者都会尝试创建。

# pom 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
<scope>compile</scope>
</dependency>
package com.simple.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//获取连接的工具类
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机地址;默认为 localhost
connectionFactory.setHost("192.168.2.100");
//连接端口;默认为 5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认为 /
connectionFactory.setVirtualHost("/dev");
//连接用户名;默认为guest
connectionFactory.setUsername("dev");
//连接密码;默认为guest
connectionFactory.setPassword("123456");
//创建连接
return connectionFactory.newConnection();
}
}
package com.simple.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//生产者
public class Producer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 要发送的信息
String message = "Hello World!";
/*
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:路由key,简单模式可以传递队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
package com.simple.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
//消费者
public class Consumer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
/*
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
//不关闭资源,应该一直监听消息
//channel.close();
//connection.close();
}
}
复制代码

生产者流程

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
  2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息至RabbitMQ Broker;
  5. 关闭信道;
  6. 关闭连接。

消费者流程

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
  2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接。

工作队列模式

与简单模式相比,多了一个或一些消费者,多个消费者共同消费同一个队列中的消息。

应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

代码与简单模式基本一致,改动两个地方:

  1. 生产者循环发送 100 万条消息;

  2. 消费者打印日志更改。

    //生产者更改部分 … // 要发送的信息 String message = “Hello World!”; /* * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ for (int i = 0; i < 1000000; i++) { channel.basicPublish(“”, QUEUE_NAME, null, (message + i).getBytes()); } System.out.println(“发送完毕”); …

    //消费者更改部分 … public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); } …

Ready:可消费数;Total:总数 Ready+Unacked;incoming:生产速率;deliver / get:消费速率;

直接开启三个消费者,然后开启一个生产者。

发布订阅模式

X:Exchange,由交换机决定消息应该发往哪个队列。发送给 Fanout 类型的交换机。

交换机常见三种类型:

  1. Fanout:广播,将消息交给所有绑定到交换机的队列(绑定队列时无需指定routing key,不会生效)
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

package com.simple.rabbitmq.ps;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.simple.rabbitmq.ConnectionUtil;
/**
* 发布与订阅使用的交换机类型为:fanout
*/
public class Producer {
//交换机名称
static final String FANOUT_EXCHAGE = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
* 声明交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
for (int i = 1; i <= 1000; i++) {
// 发送信息
String message = "Publish/Subscribe--Fanout: " + i;
/*
* 参数1:交换机名称,如果没有指定则使用默认Default Exchage
* 参数2:Fanout交换机指定路由规则也不会生效,所以不需要指定
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
// 关闭资源
channel.close();
connection.close();
}
}
复制代码

注意:生产者只需要往交换机发消息,不需要管队列。队列的创建以及与交换机的绑定关系、路由规则都是消费者要去关心的。

package com.simple.rabbitmq.ps;
import com.rabbitmq.client.*;
import com.simple.rabbitmq.ConnectionUtil;
import java.io.IOException;
public class ConsumberA {
static String FANOUT_QUEUE_A = "fanout_queue_a";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 声明(创建)队列
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
channel.queueDeclare(FANOUT_QUEUE_A, true, false, false, null);
//队列绑定交换机,Fanout交换机指定路由规则也不会生效,所以不需要指定
channel.queueBind(FANOUT_QUEUE_A, Producer.FANOUT_EXCHAGE, "");
//创建消费者;并设置消息处理
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
/*
* consumerTag 消息者标签,在channel.basicConsume时候可以指定
* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* properties 属性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.printf("Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n",
envelope.getRoutingKey(),
envelope.getExchange(),
envelope.getDeliveryTag(),
new String(body, "utf-8"));
}
};
//监听消息
/*
* 参数1:队列名称
* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
* 参数3:消息接收到后回调
*/
channel.basicConsume(FANOUT_QUEUE_A, true, consumer);
}
}
package com.simple.rabbitmq.ps;
import com.rabbitmq.client.*;
import com.simple.rabbitmq.ConnectionUtil;
import java.io.IOException;
public class ConsumberB {
static String FANOUT_QUEUE_B = "fanout_queue_b";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
channel.queueDeclare(FANOUT_QUEUE_B, true, false, false, null);
channel.queueBind(FANOUT_QUEUE_B, Producer.FANOUT_EXCHAGE, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.printf("Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n",
envelope.getRoutingKey(),
envelope.getExchange(),
envelope.getDeliveryTag(),
new String(body, "utf-8"));
}
};
channel.basicConsume(FANOUT_QUEUE_B, true, consumer);
}
}
复制代码

ConsumerB 只是修改了变量名以及队列名。

ConsumerA 的消费记录

ConsumerB 的消费记录

可见交换机向两个队列都发送了数据。发布订阅模式与工作队列模式的区:

  1. 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机 。

路由模式

路由模式指的是使用 Direct 类型的交换机;C1:消费者,指定了需要 routing key 为 orange 的消息;C2:消费者,指定了需要 routing key 为 black、green 的消息。

特点:

  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。

  2. 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey。

  3. Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列的 Routingkey 与消息的 Routingkey 完全一致,才会接收到消息。

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.simple.rabbitmq.ConnectionUtil;

    /**

    • 路由模式的交换机类型为:direct

    */ public class Producer { //交换机名称 static final String DIRECT_EXCHAGE = “direct_exchange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); String message = “橙色~”; channel.basicPublish(DIRECT_EXCHAGE, “orange”, null, message.getBytes()); System.out.println(“已发送消息:” + message); message = “绿色~”; channel.basicPublish(DIRECT_EXCHAGE, “green”, null, message.getBytes()); System.out.println(“已发送消息:” + message); message = “黑色~”; channel.basicPublish(DIRECT_EXCHAGE, “black”, null, message.getBytes()); System.out.println(“已发送消息:” + message); channel.close(); connection.close(); } }

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerOrange { static String DIRECT_QUEUE_ORANGE = “direct_queue_o”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DIRECT_QUEUE_ORANGE, true, false, false, null); //绑定路由 orange channel.queueBind(DIRECT_QUEUE_ORANGE, Producer.DIRECT_EXCHAGE, “orange”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); } }; channel.basicConsume(DIRECT_QUEUE_ORANGE, true, consumer); } }

    package com.simple.rabbitmq.routing;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerBlackGreen { static String DIRECT_QUEUE_BLACK_GREEN = “direct_queue_bg”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DIRECT_QUEUE_BLACK_GREEN, true, false, false, null); //绑定路由 black channel.queueBind(DIRECT_QUEUE_BLACK_GREEN, Producer.DIRECT_EXCHAGE, “black”); //绑定路由 green channel.queueBind(DIRECT_QUEUE_BLACK_GREEN, Producer.DIRECT_EXCHAGE, “green”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); } }; channel.basicConsume(DIRECT_QUEUE_BLACK_GREEN, true, consumer); } }

交换机与队列的绑定可以写多个。

主题模式

主题模式指的是使用类型为 Topic 的交换机。与Direct相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routingkey 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割。Routingkey 长度上限为 255 个字节。

通配符规则:

  • #:匹配 0 个或多个词。如果是 0 个词,后面的英文句号会自动省略而匹配上
  • *:匹配不多不少恰好 1 个词

绑定关系如上图:代码中都体现出来了。

  • RoutingKey 为 lazy.orange.rabbit 时,Q1、Q2 都能收到,并且 Q2 只会收到一次!

  • RoutingKey 为 lazy  时,Q2 能收到。注意:没有英文句号也可以接收到消息!

  • RoutingKey 为 lazy.black.puppy.dog  时,Q2 能收到

  • RoutingKey 为 puppy.gray.rabbit  时,Q2 能收到

  • RoutingKey 为 puppy.orange.dog  时,Q1 能收到

  • RoutingKey 为 kidding.orange.dog.haha 时,都收不到

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.simple.rabbitmq.ConnectionUtil;

    /**

    • 通配符Topic的交换机类型为:topic

    */ public class Producer { static final String TOPIC_EXCHAGE = “topic_exchange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); String message = “lazy orange rabbit”; channel.basicPublish(TOPIC_EXCHAGE, “lazy.orange.rabbit”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //两个都能收到,并且 ConsumerLazyRabbit 只收到一次。 message = “lazy”; channel.basicPublish(TOPIC_EXCHAGE, “lazy”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //只有 ConsumerLazyRabbit 能收到。注意:没有英文句号也可以接收到消息! message = “lazy black puppy dog”; channel.basicPublish(TOPIC_EXCHAGE, “lazy.black.puppy.dog”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //只有 ConsumerLazyRabbit 能收到。 message = “puppy gray rabbit”; channel.basicPublish(TOPIC_EXCHAGE, “puppy.gray.rabbit”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //只有 ConsumerLazyRabbit 能收到。 message = “puppy orange dog”; channel.basicPublish(TOPIC_EXCHAGE, “puppy.orange.dog”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //只有 ConsumerOrange 能收到。 message = “kidding orange dog haha”; channel.basicPublish(TOPIC_EXCHAGE, “kidding.orange.dog.haha”, null, message.getBytes()); System.out.println(“已发送消息:” + message); //都收不到!!! channel.close(); connection.close(); } }

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerOrange { static String TOPIC_QUEUE_ORANGE = “topic_queue_orange”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TOPIC_QUEUE_ORANGE, true, false, false, null); channel.queueBind(TOPIC_QUEUE_ORANGE, Producer.TOPIC_EXCHAGE, ” .orange. “); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); } }; channel.basicConsume(TOPIC_QUEUE_ORANGE, true, consumer); } }

    package com.simple.rabbitmq.topic;

    import com.rabbitmq.client.*; import com.simple.rabbitmq.ConnectionUtil;

    import java.io.IOException;

    public class ConsumerLazyRabbit { static String TOPIC_QUEUE_LAZY_RABBIT = “topic_queue_lazy_rabbit”;

    public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC); channel.queueDeclare(TOPIC_QUEUE_LAZY_RABBIT, true, false, false, null); channel.queueBind(TOPIC_QUEUE_LAZY_RABBIT, Producer.TOPIC_EXCHAGE, “lazy.#”); channel.queueBind(TOPIC_QUEUE_LAZY_RABBIT, Producer.TOPIC_EXCHAGE, ” . .rabbit”); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.printf(“Routing Key: %s, Exchange: %s, Message Id: %d, Message: %s\n”, envelope.getRoutingKey(), envelope.getExchange(), envelope.getDeliveryTag(), new String(body, “utf-8”)); } }; channel.basicConsume(TOPIC_QUEUE_LAZY_RABBIT, true, consumer); } }

Topic 非常强大,可以模拟其他模式

  • 如果队列绑定的 Routingkey 是 “#”,那么该队列可以接受到所有消息,类似 Fanout 模式;
  • 如果队列绑定的 Routingkey 不使用通配符,那么该队列只能接受到与 Routingkey 完全一致的消息,类似 Direct 模式。

远程调用模式

通过 RPC 的方式生产、消费消息。直接上 官网总结

发布确认模式

AMQP 0.9.1 协议的扩展。生产者发布消息,可以开启发布确认模式,在发布消息后可以调用方法确认消息已被成功接收,默认不开启。注意:发布确认模式与 Channel 关联,如果有 3 个 Channel 都想开启发布确认模式,需要三个 Channel 都开启一次,且只需要开启一次。

Channel channel = connection.createChannel();
channel.confirmSelect();
复制代码

上图为 官网总结

总共有三种策略:

  1. 发布一条消息,确认一次,同步方法,效率低下;
  2. 发布批量消息,确认一次,同步方法,效率较高,但是当消息发生丢失等情况时,无法追溯源头;
  3. 发送消息,异步确认,性能高,资源不浪费,能有效控制错误。

模式总结

  1. 简单模式 HelloWorld:一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
  2. 工作队列模式 Work Queue:一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
  3. 发布订阅模式 Publish/Subscribe:需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
  4. 路由模式 Routing:需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
  5. 通配符模式 Topic:需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
  6. 远程过程调用模式 RPC:使用 RPC 的方式与 RabbitMQ 通信
  7. 发布确认模式 Publisher Confirms:生产者发送消息后等待 RabbitMQ 返回确认
稀土掘金
我还没有学会写个人说明!
下一篇

在2021年,可能会是前景比较好的五门编程语言

你也可能喜欢

评论已经被关闭。

插入图片