拼多商城整合 rabbitmq
当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统
我们需要应对流量峰值,让流量曲线变得平缓,如下图
1.订单存储的解耦
为了进行流量削峰,我们引入 rabbitmq 消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;而订单消费者应用从消息队列接收订单消息,并把订单保存到数据库
这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存.
2. 创建pdweb项目
从课前资料中导入项目即可
3. 定义生产者-发送订单
3.1 pom.xml 添加依赖
spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.2 修改 application.yml 文件
添加RabbitMQ的连接信息
spring: rabbitmq: host: 192.168.64.140 port: 5672 virtualHost: /pd username: admin password: admin
3.3 修改主程序 RunPdAPP
在主程序中添加下面的方法创建Queue实例
当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,代码在 RabbitAdmin.declareQueues()
方法中
@Bean public Queue getQueue() { Queue q = new Queue("orderQueue", true); return q; }
3.4 修改 OrderServiceImpl
//RabbitAutoConfiguration中创建了AmpqTemplate实例 @Autowired AmqpTemplate amqpTemplate; //saveOrder原来的数据库访问代码全部注释,添加rabbitmq消息发送代码 public String saveOrder(PdOrder pdOrder) throws Exception { String orderId = generateId(); pdOrder.setOrderId(orderId); amqpTemplate.convertAndSend("orderQueue", pdOrder); return orderId; // String orderId = generateId(); // pdOrder.setOrderId(orderId); // // // PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId()); // pdOrder.setShippingName(pdShipping.getReceiverName()); // pdOrder.setShippingCode(pdShipping.getReceiverAddress()); // pdOrder.setStatus(1);// // pdOrder.setPaymentType(1); // pdOrder.setPostFee(10D); // pdOrder.setCreateTime(new Date()); // // double payment = 0; // List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList()); // for (ItemVO itemVO : itemVOs) { // PdOrderItem pdOrderItem = new PdOrderItem(); // String id = generateId(); // //String id="2"; // pdOrderItem.setId(id); // pdOrderItem.setOrderId(orderId); // pdOrderItem.setItemId("" + itemVO.getPdItem().getId()); // pdOrderItem.setTitle(itemVO.getPdItem().getTitle()); // pdOrderItem.setPrice(itemVO.getPdItem().getPrice()); // pdOrderItem.setNum(itemVO.getPdCartItem().getNum()); // // payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice(); // pdOrderItemMapper.insert(pdOrderItem); // } // pdOrder.setPayment(payment); // pdOrderMapper.insert(pdOrder); // return orderId; }
4. 消费者-接收订单,并保存到数据库
4.1 pd-web项目复制为pd-order-consumer
4.2 修改 application.yml 文件
把端口修改成 81
server: port: 81 spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8 username: root password: root rabbitmq: host: 192.168.64.140 port: 5672 virtualHost: /pd username: admin password: admin mybatis: #typeAliasesPackage: cn.tedu.ssm.pojo mapperLocations: classpath:com.pd.mapper/*.xml logging: level: cn.tedu.ssm.mapper: debug
4.3 新建 OrderConsumer
package com.pd; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.pd.pojo.PdOrder; import com.pd.service.OrderService; @Component public class OrderConsumer { //收到订单数据后,会调用订单的业务代码,把订单保存到数据库 @Autowired private OrderService orderService; //添加该注解后,会从指定的orderQueue接收消息, //并把数据转为 PdOrder 实例传递到此方法 @RabbitListener(queues="orderQueue") public void save(PdOrder pdOrder) { System.out.println("消费者"); System.out.println(pdOrder.toString()); try { orderService.saveOrder(pdOrder); } catch (Exception e) { e.printStackTrace(); } } }
4.4 修改 OrderServiceImpl 的 saveOrder() 方法
public String saveOrder(PdOrder pdOrder) throws Exception { // String orderId = generateId(); // pdOrder.setOrderId(orderId); // // amqpTemplate.convertAndSend("orderQueue", pdOrder); // return orderId; // // // // String orderId = generateId(); // pdOrder.setOrderId(orderId); //从RabbitMQ接收的订单数据, //已经在上游订单业务中生成过id,这里不再重新生成id //直接获取该订单的id String orderId = pdOrder.getOrderId(); PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId()); pdOrder.setShippingName(pdShipping.getReceiverName()); pdOrder.setShippingCode(pdShipping.getReceiverAddress()); pdOrder.setStatus(1);// pdOrder.setPaymentType(1); pdOrder.setPostFee(10D); pdOrder.setCreateTime(new Date()); double payment = 0; List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList()); for (ItemVO itemVO : itemVOs) { PdOrderItem pdOrderItem = new PdOrderItem(); String id = generateId(); //String id="2"; pdOrderItem.setId(id); pdOrderItem.setOrderId(orderId); pdOrderItem.setItemId("" + itemVO.getPdItem().getId()); pdOrderItem.setTitle(itemVO.getPdItem().getTitle()); pdOrderItem.setPrice(itemVO.getPdItem().getPrice()); pdOrderItem.setNum(itemVO.getPdCartItem().getNum()); payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice(); pdOrderItemMapper.insert(pdOrderItem); } pdOrder.setPayment(payment); pdOrderMapper.insert(pdOrder); return orderId; }
5. 手动确认
5.1 修改 application.yml 文件
spring: rabbitmq: listener: simple: acknowledge-mode: manual`
5.2 OrderConsumer
package com.pd; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.pd.pojo.PdOrder; import com.pd.service.OrderService; import com.rabbitmq.client.Channel; @Component public class OrderConsumer { //收到订单数据后,会调用订单的业务代码,把订单保存到数据库 @Autowired private OrderService orderService; //添加该注解后,会从指定的orderQueue接收消息, //并把数据转为 PdOrder 实例传递到此方法 @RabbitListener(queues="orderQueue") public void save(PdOrder pdOrder, Channel channel, Message message) { System.out.println("消费者"); System.out.println(pdOrder.toString()); try { orderService.saveOrder(pdOrder); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }