微服务模式:Spring Boot + Kafka的业务流程Saga模式 – vinsguru

微信扫一扫,分享到朋友圈

微服务模式:Spring Boot + Kafka的业务流程Saga模式 – vinsguru

多年来,微服务已变得非常流行。微服务是分布式系统。它们更小,模块化,易于部署和扩展等。开发单个微服务应用程序可能会很有趣!但是处理跨越多个微服务的业务交易并不好玩!MicroService体系结构具有特定的职责。为了完成应用程序工作流程/任务,可能需要多个MicroServices一起工作。

让我们看看本文中在分布式系统中处理事务/数据一致性有多困难。

假设我们的业务规则说,当用户下订单时,如果产品的价格在用户的信用限额/余额之内并且该产品的库存可用,则订单将得到满足。否则将无法实现。看起来真的很简单。这在整体应用中非常容易实现。整个工作流程可以视为1个单事务。当所有内容都在单个数据库中时,提交/回滚很容易。对于具有多个数据库的分布式系统,这将非常复杂!首先让我们看一下我们的架构,看看如何实现它。

我们在下面的微服务中拥有自己的数据库。

  • 订购服务
  • 付款服务
  • 库存服务

当订单服务收到新订单的请求时,它必须与付款服务和库存服务进行核对。我们扣除付款,库存并最终完成订单!如果我们扣除付款但没有库存,会发生什么?如何回滚?涉及多个数据库很难。

传奇Saga模式

通常,在所有微服务之间处理事务和维护数据一致性很困难。当涉及多种服务时,例如付款,库存,欺诈检查,运输检查…..etc等,如果没有协调员,将很难通过多个步骤来管理如此复杂的工作流程。通过为协调员引入单独的服务,订单服务摆脱了这些多余责任。我们也没有引入任何循环依赖。

此处 检查项目源代码。

跨越多个微服务的每个业务交易都被分成特定于微服务的本地交易,并按顺序执行它们以完成业务工作流程。它被称为佐贺。它可以通过两种方式实现。

  • 编舞Choreography 方法
  • 编排Orchestration 方法

在本文中,我们将讨论基于Orchestration的传奇Saga。

在这种模式下,我们将有一个协调器,一个单独的服务,它将协调所有微服务之间的所有事务。如果一切正常,它将使订单请求完成,否则将其标记为已取消。

让我们看看如何实现这一点。我们的示例架构将或多或少像这样!

  • 在此演示中,协调器与其他服务之间的通信将是一个简单的HTTP,以一种非阻塞的异步方式来使其无状态。
  • 我们也可以使用Kafka主题进行交流。为此,我们必须使用分散/聚集模式,该模式更像是有状态的样式。

Order协调器

这是一个微服务,负责协调所有事务。它侦听订单创建的主题。当创建新订单时,它会立即为每个服务(如付款服务/库存服务等)建立单独的请求,并验证响应。如果可以,请执行订单。如果其中之一不是,则取消定单。它还尝试重置任何微服务中发生的任何本地事务。

我们将所有本地交易视为1个单一工作流程。一个工作流程将包含多个工作流程步骤。

  • 工作流程步骤
<b>public</b> <b>interface</b> WorkflowStep {
WorkflowStepStatus getStatus();
Mono<Boolean> process();
Mono<Boolean> revert();
}
  • 工作流程
<b>public</b> <b>interface</b> Workflow {
List<WorkflowStep> getSteps();
}
  • 在本例中,对于“订购”工作流,我们有2个步骤。每个实现都应该知道如何进行本地事务以及如何重置。
  • 库存步骤需要继承实现WorkflowStep接口
<b>public</b> <b>class</b> InventoryStep implements WorkflowStep {
<b>private</b> <b>final</b> WebClient webClient;
<b>private</b> <b>final</b> InventoryRequestDTO requestDTO;
<b>private</b> WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
<b>public</b> InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) {
<b>this</b>.webClient = webClient;
<b>this</b>.requestDTO = requestDTO;
}
@Override
<b>public</b> WorkflowStepStatus getStatus() {
<b>return</b> <b>this</b>.stepStatus;
}
@Override
<b>public</b> Mono<Boolean> process() {
<b>return</b> <b>this</b>.webClient
.post()
.uri(<font>"/inventory/deduct"</font><font>)
.body(BodyInserters.fromValue(<b>this</b>.requestDTO))
.retrieve()
.bodyToMono(InventoryResponseDTO.<b>class</b>)
.map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE))
.doOnNext(b -> <b>this</b>.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
}
@Override
<b>public</b> Mono<Boolean> revert() {
<b>return</b> <b>this</b>.webClient
.post()
.uri(</font><font>"/inventory/add"</font><font>)
.body(BodyInserters.fromValue(<b>this</b>.requestDTO))
.retrieve()
.bodyToMono(Void.<b>class</b>)
.map(r -><b>true</b>)
.onErrorReturn(false);
}
}
</font>
  • 付款步骤也是要实现接口中处理和回退两个步骤
<b>public</b> <b>class</b> PaymentStep implements WorkflowStep {
<b>private</b> <b>final</b> WebClient webClient;
<b>private</b> <b>final</b> PaymentRequestDTO requestDTO;
<b>private</b> WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
<b>public</b> PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) {
<b>this</b>.webClient = webClient;
<b>this</b>.requestDTO = requestDTO;
}
@Override
<b>public</b> WorkflowStepStatus getStatus() {
<b>return</b> <b>this</b>.stepStatus;
}
@Override
<b>public</b> Mono<Boolean> process() {
<b>return</b> <b>this</b>.webClient
.post()
.uri(<font>"/payment/debit"</font><font>)
.body(BodyInserters.fromValue(<b>this</b>.requestDTO))
.retrieve()
.bodyToMono(PaymentResponseDTO.<b>class</b>)
.map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED))
.doOnNext(b -> <b>this</b>.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
}
@Override
<b>public</b> Mono<Boolean> revert() {
<b>return</b> <b>this</b>.webClient
.post()
.uri(</font><font>"/payment/credit"</font><font>)
.body(BodyInserters.fromValue(<b>this</b>.requestDTO))
.retrieve()
.bodyToMono(Void.<b>class</b>)
.map(r -> <b>true</b>)
.onErrorReturn(false);
}
}
</font>
  • 服务/协调员
@Service
<b>public</b> <b>class</b> OrchestratorService {
@Autowired
@Qualifier(<font>"payment"</font><font>)
<b>private</b> WebClient paymentClient;
@Autowired
@Qualifier(</font><font>"inventory"</font><font>)
<b>private</b> WebClient inventoryClient;
<b>public</b> Mono<OrchestratorResponseDTO> orderProduct(<b>final</b> OrchestratorRequestDTO requestDTO){
Workflow orderWorkflow = <b>this</b>.getOrderWorkflow(requestDTO);
<b>return</b> Flux.fromStream(() -> orderWorkflow.getSteps().stream())
.flatMap(WorkflowStep::process)
.handle(((aBoolean, synchronousSink) -> {
<b>if</b>(aBoolean)
synchronousSink.next(<b>true</b>);
<b>else</b>
synchronousSink.error(<b>new</b> WorkflowException(</font><font>"create order failed!"</font><font>));
}))
.then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED)))
.onErrorResume(ex -> <b>this</b>.revertOrder(orderWorkflow, requestDTO));
}
<b>private</b> Mono<OrchestratorResponseDTO> revertOrder(<b>final</b> Workflow workflow, <b>final</b> OrchestratorRequestDTO requestDTO){
<b>return</b> Flux.fromStream(() -> workflow.getSteps().stream())
.filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE))
.flatMap(WorkflowStep::revert)
.retry(3)
.then(Mono.just(<b>this</b>.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED)));
}
<b>private</b> Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){
WorkflowStep paymentStep = <b>new</b> PaymentStep(<b>this</b>.paymentClient, <b>this</b>.getPaymentRequestDTO(requestDTO));
WorkflowStep inventoryStep = <b>new</b> InventoryStep(<b>this</b>.inventoryClient, <b>this</b>.getInventoryRequestDTO(requestDTO));
<b>return</b> <b>new</b> OrderWorkflow(List.of(paymentStep, inventoryStep));
}
<b>private</b> OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){
OrchestratorResponseDTO responseDTO = <b>new</b> OrchestratorResponseDTO();
responseDTO.setOrderId(requestDTO.getOrderId());
responseDTO.setAmount(requestDTO.getAmount());
responseDTO.setProductId(requestDTO.getProductId());
responseDTO.setUserId(requestDTO.getUserId());
responseDTO.setStatus(status);
<b>return</b> responseDTO;
}
<b>private</b> PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){
PaymentRequestDTO paymentRequestDTO = <b>new</b> PaymentRequestDTO();
paymentRequestDTO.setUserId(requestDTO.getUserId());
paymentRequestDTO.setAmount(requestDTO.getAmount());
paymentRequestDTO.setOrderId(requestDTO.getOrderId());
<b>return</b> paymentRequestDTO;
}
<b>private</b> InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){
InventoryRequestDTO inventoryRequestDTO = <b>new</b> InventoryRequestDTO();
inventoryRequestDTO.setUserId(requestDTO.getUserId());
inventoryRequestDTO.setProductId(requestDTO.getProductId());
inventoryRequestDTO.setOrderId(requestDTO.getOrderId());
<b>return</b> inventoryRequestDTO;
}
}
</font>

有关完整的源码,请 在此处 下载。

华为今年底或将推新平板:麒麟9000 12.9寸 高刷屏

上一篇

Linux定时器例子

下一篇

你也可能喜欢

微服务模式:Spring Boot + Kafka的业务流程Saga模式 – vinsguru

长按储存图像,分享给朋友