SpringClub09-RabbitMQ 拼多商城整合

拼多商城整合 rabbitmq

当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统

我们需要应对流量峰值,让流量曲线变得平缓,如下图

1.订单存储的解耦

为了进行流量削峰,我们引入 rabbitmq 消息队列,当购物系统产生订单后,可以把订单数据发送到消息队列;而订单消费者应用从消息队列接收订单消息,并把订单保存到数据库

这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存.

2. 创建pdweb项目

从课前资料中导入项目即可

3. 定义生产者-发送订单

3.1 pom.xml 添加依赖

spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 修改 application.yml 文件

添加RabbitMQ的连接信息

spring:
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin

3.3 修改主程序 RunPdAPP

在主程序中添加下面的方法创建Queue实例

当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,代码在 RabbitAdmin.declareQueues() 方法中

@Bean
public Queue getQueue() {
Queue q = new Queue("orderQueue", true);
return q;
}

3.4 修改 OrderServiceImpl

//RabbitAutoConfiguration中创建了AmpqTemplate实例
@Autowired
AmqpTemplate amqpTemplate;
//saveOrder原来的数据库访问代码全部注释,添加rabbitmq消息发送代码
public String saveOrder(PdOrder pdOrder) throws Exception {
String orderId = generateId();
pdOrder.setOrderId(orderId);
amqpTemplate.convertAndSend("orderQueue", pdOrder);
return orderId;
//        String orderId = generateId();
//        pdOrder.setOrderId(orderId);
//
//
//        PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
//        pdOrder.setShippingName(pdShipping.getReceiverName());
//        pdOrder.setShippingCode(pdShipping.getReceiverAddress());
//        pdOrder.setStatus(1);//
//        pdOrder.setPaymentType(1);
//        pdOrder.setPostFee(10D);
//        pdOrder.setCreateTime(new Date());
//
//        double payment = 0;
//        List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
//        for (ItemVO itemVO : itemVOs) {
//            PdOrderItem pdOrderItem = new PdOrderItem();
//            String id = generateId();
//            //String id="2";
//            pdOrderItem.setId(id);
//            pdOrderItem.setOrderId(orderId);
//            pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
//            pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
//            pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
//            pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
//
//            payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
//            pdOrderItemMapper.insert(pdOrderItem);
//        }
//        pdOrder.setPayment(payment);
//        pdOrderMapper.insert(pdOrder);
//        return orderId;
}

4. 消费者-接收订单,并保存到数据库

4.1 pd-web项目复制为pd-order-consumer

4.2 修改 application.yml 文件

把端口修改成 81

server:
port: 81
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/pd_store?useUnicode=true&characterEncoding=utf8
username: root
password: root
rabbitmq:
host: 192.168.64.140
port: 5672
virtualHost: /pd
username: admin
password: admin
mybatis:
#typeAliasesPackage: cn.tedu.ssm.pojo
mapperLocations: classpath:com.pd.mapper/*.xml
logging:
level:
cn.tedu.ssm.mapper: debug

4.3 新建 OrderConsumer

package com.pd;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
@Component
public class OrderConsumer {
//收到订单数据后,会调用订单的业务代码,把订单保存到数据库
@Autowired
private OrderService orderService;
//添加该注解后,会从指定的orderQueue接收消息,
//并把数据转为 PdOrder 实例传递到此方法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder)
{
System.out.println("消费者");
System.out.println(pdOrder.toString());
try {
orderService.saveOrder(pdOrder);
} catch (Exception e) {
e.printStackTrace();
}
}
}

4.4 修改 OrderServiceImpl 的 saveOrder() 方法

public String saveOrder(PdOrder pdOrder) throws Exception {
//        String orderId = generateId();
//        pdOrder.setOrderId(orderId);
//
//        amqpTemplate.convertAndSend("orderQueue", pdOrder);
//        return orderId;
//
//
//
//        String orderId = generateId();
//        pdOrder.setOrderId(orderId);
//从RabbitMQ接收的订单数据,
//已经在上游订单业务中生成过id,这里不再重新生成id
//直接获取该订单的id
String orderId = pdOrder.getOrderId();
PdShipping pdShipping = pdShippingMapper.selectByPrimaryKey(pdOrder.getAddId());
pdOrder.setShippingName(pdShipping.getReceiverName());
pdOrder.setShippingCode(pdShipping.getReceiverAddress());
pdOrder.setStatus(1);//
pdOrder.setPaymentType(1);
pdOrder.setPostFee(10D);
pdOrder.setCreateTime(new Date());
double payment = 0;
List<ItemVO> itemVOs = selectCartItemByUseridAndItemIds(pdOrder.getUserId(), pdOrder.getItemIdList());
for (ItemVO itemVO : itemVOs) {
PdOrderItem pdOrderItem = new PdOrderItem();
String id = generateId();
//String id="2";
pdOrderItem.setId(id);
pdOrderItem.setOrderId(orderId);
pdOrderItem.setItemId("" + itemVO.getPdItem().getId());
pdOrderItem.setTitle(itemVO.getPdItem().getTitle());
pdOrderItem.setPrice(itemVO.getPdItem().getPrice());
pdOrderItem.setNum(itemVO.getPdCartItem().getNum());
payment = payment + itemVO.getPdCartItem().getNum() * itemVO.getPdItem().getPrice();
pdOrderItemMapper.insert(pdOrderItem);
}
pdOrder.setPayment(payment);
pdOrderMapper.insert(pdOrder);
return orderId;
}

5. 手动确认

5.1 修改 application.yml 文件

spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual`

5.2 OrderConsumer

package com.pd;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import com.rabbitmq.client.Channel;
@Component
public class OrderConsumer {
//收到订单数据后,会调用订单的业务代码,把订单保存到数据库
@Autowired
private OrderService orderService;
//添加该注解后,会从指定的orderQueue接收消息,
//并把数据转为 PdOrder 实例传递到此方法
@RabbitListener(queues="orderQueue")
public void save(PdOrder pdOrder, Channel channel, Message message)
{
System.out.println("消费者");
System.out.println(pdOrder.toString());
try {
orderService.saveOrder(pdOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
SegmentFault博客
我还没有学会写个人说明!
上一篇

因为一次 Kafka 宕机,我明白了 Kafka 高可用原理!

你也可能喜欢

评论已经被关闭。

插入图片