实体聚合根
在使用 Phoenix 框架开发项目时,通常需要借助领域驱动设计(DDD),提取领域实体,其中一个特殊的实体为聚合根( 领域外对领域内其他实体的访问都需要通过该聚合根),而Phoenix中的实体聚合根概念就对应DDD中的聚合根。
实体聚合根利用 EventSourcing
思想把状态直接存在对象当中,参考Event Souring
。在Phoenix当中,单个实体聚合根(同一个聚合根ID)处理请求是单线程的,所以不用担心线程安全问题。当然,如果你希望你的应用是可以横向扩容的,就需要设计好聚合根。
Maven 依赖
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter</artifactId>
<version>2.6.1</version>
</dependency>
定义实体聚合根
聚合根类成员变量支持 WildcardType
,TypeVariable
,GenericArrayType
, 在聚合根类合法性校验中,
会跳过这些类型的校验。但开发者必须实现这些类型的序列化接口.
如对于 WildcardType
设置类型的上限的约束: Map<String, ? extends String>
实体聚合根使用注解@EntityAggregateAnnotation
来定义,服务启动后 Phoenix 会扫描和校验符合规范的实体聚合根对象,并在接收命令时创建实体聚合根类对象。实体聚合根类需要遵循如下规范:
- 聚合根类需要使用
@EntityAggregateAnnotation
注解进行标记。 - 聚合根类以及聚合根类中的成员均需实现
Serializable
接口,并定义serialVersionUID
。 - 聚合根类需要提供无参构造函数。
在聚合根上添加 @EntityAggregateAnnotation
注解时,需要通过 aggregateRootType
指定一个聚合根的类别。用来区分不同的聚合根类,该聚合根类别是全局唯一的。
示例代码
@EntityAggregateAnnotation(aggregateRootType = "BankAccount", idempotentSize = 100, bloomSize = 1000000, snapshotInterval = 100000)
public class BankAccountAggregate implements Serializable {
private static final long serialVersionUID = 6073238164083701075L;
// ... act and on method
}
注解配置
配置项 | 描述 | 类型 | 默认值 |
---|---|---|---|
aggregateRootType | 聚合根类型 | String | 必填项 |
surviveTime | 聚合根生存时间,超过该时间聚合根将被从JVM中淘汰,下一次使用时再重建,减少内存使用 | long | Long.MAX_VALUE |
snapshotMode | 快照模式,可用于关闭快照或使用 Lazy 的快照模式 | enum | EAGER, 可选: DISABLE, LAZY |
snapshotInterval | 快照间隔,每隔snapshotInterval条消息打印一次快照,加速聚合根重建 | long | 1000 |
numberOfRetainSnapshots | 需要保留的快照数量, Long.MAX_VALUE 为关闭 | long | Long.MAX_VALUE |
idempotentSize | 聚合根幂等集合大小,取值应大于零,否则可能会导致启动失败 | long | 1000 |
bloomSize | 布隆过滤器大小,内存中识别幂等,减少读库判断 | long | 100000L |
dispatcher | 选择聚合根的调度者,缓解阻塞问题,支持自定义线程池 | String | "phoenix-dispatcher" |
runningMode | 聚合根的运行模式, 支持同步和异步两种 | enum | SYNC |
allowPassivation | 允许聚合根钝化(仅从内存上清除聚合根,仍会在 EventStore 中保留聚合根历史状态) | boolean | true |
batchWeight | 聚合根最大攒批大小 (尽力而为的攒批模式, 当下游可用时总是会优先交付) | int | 200 |
bufferSize | 异步模式下,持久化时允许缓存的消息批次(package)数量 | Int | 100 |
heartbeatInterval | 聚合根心跳频率(ms), 心跳负责淘汰和自身信息上报 | long | 60,000 |
命令处理
在 EventSouring 中,聚合根的生命周期是:接收命令 -> 处理 -> 产生事件 -> 更改状态
,因此本小节的内容是说明 Phoenix 如何定义一个聚合根命令处理的声明:
实体聚合根中定义处理命令的方法是:提供返回值是 ActReturn 的 act() 方法,签名是:public ActReturn act(Command cmd)
,并使用注解 @CommandHandler
声明其配置。
命令处理会产生该领域接收命令后产生的事件,然后通过事件来修改聚合根状态,也可以直接修改(使用CommandSouring
模式, 避免在 EventSouring 模式下在 act 方法中修改状态,这样会污染内存状态)。
对于Command命令和Event事件,Phoenix支持三种协议,详情请参见序列化
。
- protobuf
- protostuff
- Java serializable:默认,但并不推荐,Java 序列化的性能和安全性都偏差
示例代码
@CommandHandler(aggregateRootId = "accountCode", isCommandSourcing = true)
public ActReturn act(AccountCreateCmd createCmd) {
this.account = createCmd.getAccountCode();
this.balanceAmt = createCmd.getBalanceAmt();
String message = String.format("初始化账户代码<%s>, 初始化余额<%s>. ", createCmd.getAccountCode(),createCmd.getBalanceAmt());
return ActReturn.builder()
.retCode(RetCode.SUCCESS)
.retMessage(message)
.event(new AccountCreateEvent(createCmd.getAccountCode(),createCmd.getBalanceAmt()))
.build();
}
聚合根 ID aggregateRootId
是该聚合根的唯一标识, 其大小限制随着 Event-Store 中的 aggregateRootIdSize 配置改变, 但不能超过 256 的字节长度, 因为过长的聚合根 ID 会导致索引性能下降.
如果您的聚合根 ID 长度需要转义成更长的字符串,请自行转换成更精简的表达.
注解配置
配置项 | 描述 | 类型 | 默认值 |
---|---|---|---|
aggregateRootId | 聚合根id,允许多字段组成并且成员嵌套字段访问aggregateRootId = {event.accountCode, event.name} | String[] | 必填项 |
enableCreateAggregate | 是否允许自动创建聚合根 | boolean | true |
idempotentIds | 幂等id(参考幂等操作 段落详解) | String[] | 采用phoenix默认的幂等id |
isCommandSourcing | 是否是command sourcing,默认是event sourcing | boolean | false |
聚合根 ID 支持同时使用组合和嵌套的使用方式
class CommandA {
String name;
CommandB commandb;
}
class CommandB {
String age;
}
@CommandHandler(aggregateRootId = {name, commandb.age})
public ActReturn act(A cmd) {
/*
business logic
*/
}
ActReturn
Reply 可用于返回 RPC 结果,快速发送到指定 Topic(参考 Phoenix Client API)来缩短 EventPublish 路径,当需要特别注意的是,Reply 的交付语义是至多一次的。 也就是说 Reply 仅会事件处理结果后发送一次(一次性)。在幂等时、主键冲突等情况下,Reply 不会再次交付,而是返回 Event。
ActReturn 定义了命令处理的结果,包含处理状态、事件等,其 Java 定义如下:
public class ActReturn {
// 处理状态码,必填参数
private final RetCode retCode;
// 返回消息,必填参数
private final String retMessage = "";
// 持久化事件,必填参数
private final Object event;
// 回复事件,可选参数,至多一次交付语义
private final Object reply;
// 元数据注册
private final List<RegistryCollectData> registryCollectDataList;
}
事件处理
在 EventSouring 中,聚合根的生命周期是:接收命令 -> 处理 -> 产生事件 -> 更改状态
,因此本小节的内容是说明 Phoenix 如何定义一个聚合根事件处理的声明:
实体聚合根中定义处理命令的方法是:提供没有返回值的 on() 方法,签名是:public void on(Event cmd)
。
实体聚合根接收会接收 act()
方法产生的事件,并调用自身的命令处理逻辑来修改状态。事件仅会在持久化成功之后才会发送到聚合根,因此用户可以认为在 on()
方法中
修改的状态总是可靠的,而 act()
方法修改的状态总是不可靠的。用户在这个语义下可以在 act()
中执行外部交互方法(如 IO),并将结果存入事件,这样每当聚合根溯源时,
不会重复于外部交互。
事件会进行持久化处理(EventStore),当需要重建聚合根内存状态时,通过 EventSourcing 进行状态重塑。
在重塑时可以通过快照进行加速。快照相关配置请参考:配置详情
on() 方法需要遵循如下规范:
- on() 方法中不能有IO操作,例如:调用DB操作,调用外部接口
- on() 方法中不能有随机函数,例如:获取系统当前时间,获取随机数
示例代码
public void on(Account.AccountAllocateOkEvent event) {
account = event.getAccountCode();
balanceAmt += event.getAmt();
if (event.getAmt() < 0) {
successTransferOut++;
} else {
successTransferIn++;
}
}
查询操作
Phoenix提供了查询聚合根状态的能力。通过在**act()方法上添加 @QueryHandler
注解用来标志该act()**方法将会执行一个查询操作。
在该**act()**方法中将需要查询的数据封装进Event事件中,通过ActReturn进行返回。
因查询操作不涉及内存状态的修改,只是对数据进行查询,所以查询操作所产生的的Event事件不会进行持久化操作。
示例代码
@QueryHandler(aggregateRootId = "accountCode")
public ActReturn act(AccountQueryCmd cmd) {
// business logic
return ActReturn.builder()
.retCode(RetCode.SUCCESS)
.retMessage("查询成功")
.event(new AccountQueryEvent(account, balanceAmt, successTransferOut, failTransferOut, successTransferIn))
.build();
}
实体聚合根处理扫描支持对象关系. 如支持在父类中定义 act
, on
方法来帮助整理一个聚合根类的代码,但暂不支持用泛型的方式复用这些方法.