跳到主要内容
版本:latest

聚合根通信

简介

默认情况下,聚合根的命令通过订阅发送给聚合根,然后由聚合根处理。但有时候,我们需要直接向聚合根发送命令,而不是通过订阅,这种情况下 我们支持一个比较特殊的功能:聚合根通信。

聚合根通信是一种特殊的命令发送方式,它不会经过订阅,而是直接发送给聚合根,该类有两个 API:

  • tell:异步发送命令到聚合根,不关心命令是否发送成功
  • ask:同步发送命令聚合根,并期待返回结果
注意

需要注意的是,使用聚合根通信发送的命令,不会将事件发送到其他 Topic,除非用户开启了 EventPublish 功能。

使用方式

在 Phoenix 中,我们提供了一个 PhoenixAkkaSender 类,用于直接向聚合根发送命令,该类可以通过 Phoenix 容器(PhoenixBootstrap)获取。

@Service
public class ActorInteractionService {

private final PhoenixBootstrap bootstrap;

public ActorInteractionService(PhoenixBootstrap bootstrap) {
// 这里不可以直接将 PhoenixAkkaSender 作为成员,因为在构造函数中,PhoenixAkkaSender 还未初始化
this.bootstrap = bootstrap;
}

public PhoenixAkkaSender getAkkaSender() {
return bootstrap.getAkkaSender();
}
}

除了通过 Phoenix 容器外获取全局单例外,用户也可以手动创建实例:

@Service
public class ActorInteractionService {

private final PhoenixAkkaSender sender

public ActorInteractionService(ActorSystem<?> system ) {
this.sender = new PhoenixAkkaSender(system);
}

public PhoenixAkkaSender getAkkaSender() {
return this.sender;
}
}

示例代码

下面是一个在 Spring 中直接编写向聚合根发送命令的 API 实现示例:

@RestController
@RequestMapping("/actor")
public class ActorInteractionController {

private final ActorInteractionService service;

public ActorInteractionController(ActorInteractionService service) {
this.service = service;
}

@GetMapping
public String allocate() {
Account.AccountAllocateCmd cmd = Account.AccountAllocateCmd.newBuilder()
.setAccountCode("A00000000")
.setAllocateNumber(UUID.randomUUID().toString())
.setAmt(ThreadLocalRandom.current().nextDouble(1000.0))
.build();
try {
// 同步请求 AccountAllocateCmd 到 A00000000 的聚合根,并期待在 1000ms 内返回结果
Message response = service.getAkkaSender().ask(cmd, 1000);
Account.AccountAllocateOkEvent replyEvent = response.getPayload();
return replyEvent.toString();
} catch (Exception e) {
log.error("Actor not reply or exception: {}", e.getMessage(), e);
return "Actor not reply or exception";
}
}

@PostMapping
private String post(){
Account.AccountAllocateCmd cmd = Account.AccountAllocateCmd.newBuilder()
.setAccountCode("A00000000")
.setAllocateNumber(UUID.randomUUID().toString())
.setAmt(ThreadLocalRandom.current().nextDouble(1000.0))
.build();
Message cmdMsg = MessageFactory.getCmdMsg("", "reply", cmd);
try {
// 异步发送 AccountAllocateCmd 到 A00000000 的聚合根,支持直接发送 cmd 和 Message(Phoenix 内部协议)
// 注意:这里即使用 Message 指定了回复的 Topic 为 reply,但是由于使用聚合根通信,因此不会发送到 reply Topic
service.getAkkaSender().tell(cmdMsg);
return "ok";
} catch (Exception e) {
log.error("Actor not reply or exception: {}", e.getMessage(), e);
return "Actor not reply or exception";
}
}
}