echo编辑整理,欢迎转载,转载请声明文章来源。欢迎添加echo微信(微信号:t2421499075) 交流学习。
消费模式
RabbitMQ消费模式有两种,一种是队列的push,另外一个是从队列pull。对应API如下。
- pull: com.rabbitmq.client.Channel#basicGet。消息中间件主动将消息推送给消费者
- push: com.rabbitmq.client.Channel#basicConsume。消费者主动从消息中间件拉取消息
两种模式的区别
- push:推模式接收消息是最有效的一种消息处理方式。当我们使用该模式的时候,我们的消费端,只需要启动了之后,就会相当于使用了订阅模式,只要生产端不断推送消息,消费端就会持续接收到消息。
- pull:拉模式接收消息和push是完全不同的,它每次接收消息都需要去拉取一下队列的信息。由于是拉取的,实时性较差。不能及时有效的获取最新的消息,所以相对来说使用并不是很多。但是该模式对于部分业务量很小的功能来说确实一个不错的选择,能有效的降低对内存的消耗。
push模式的实例
import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * @author echo * @date 2021-01-14 15:05 */ public class TopicConsumerTest { private static final String EXCHANGE_NAME = "exchange_topic"; private static final String QUEUE_NAME = "queue_topic1"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ的链接参数 factory.setUsername("echo"); factory.setPassword("123456"); factory.setPort(PORT); factory.setHost(IP_ADDRESS); // 和RabbitMQ建立一个链接 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //声明交换机 Fanout模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null); //进行绑定,指定消费那个队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null); Consumer consumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("recv message: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); //等待回调函数执行完毕之后 关闭资源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } } 复制代码
pull模式实例
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * @author echo * @date 2021-01-14 15:05 */ public class TopicConsumerTest { private static final String EXCHANGE_NAME = "exchange_topic"; private static final String QUEUE_NAME = "queue_topic1"; private static final String IP_ADDRESS = "192.168.230.131"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ的链接参数 factory.setUsername("echo"); factory.setPassword("123456"); factory.setPort(PORT); factory.setHost(IP_ADDRESS); // 和RabbitMQ建立一个链接 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //声明交换机 Fanout模式 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null); //进行绑定,指定消费那个队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null); GetResponse getResponse = channel.basicGet(QUEUE_NAME, false); System.out.println(new String(getResponse.getBody())); //等待回调函数执行完毕之后 关闭资源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } } 复制代码
注意:本文来自网友投稿。本站无法对本文内容的真实性、完整性、及时性、原创性提供任何保证,请您自行验证核实并承担相关的风险与后果!
CoLaBug.com遵循[CC BY-SA 4.0]分享并保持客观立场,本站不承担此类作品侵权行为的直接责任及连带责任。您有版权、意见、投诉等问题,请通过[eMail]联系我们处理,如需商业授权请联系原作者/原网站。