事件管理
设计思想
Phoenix维护内存状态,核心能力是使用EventSouring技术,Phoenix会定时打快照用来加速EventSouring的恢复。
EventStore-JDBC
Phoenix 默认使用的是 JDBC 版本的 EventStore, 请按下面步骤进行配置
- 引用 phoenix-server-starter 的依赖
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter</artifactId>
<version>2.4.4</version>
</dependency>
- 在
application.yaml
中增加EventStore配置
其中可以给一个DataSources
增加label
隔离业务使用和系统使用,一般用于性能测试
quantex:
phoenix:
event-store:
driver-class-name: org.h2.Driver
data-sources:
- url: jdbc:h2:file:./data/test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=FALSE;INIT=CREATE SCHEMA IF NOT EXISTS PUBLIC
username: sa
password:
- url: jdbc:h2:file:./data2/test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=FALSE;INIT=CREATE SCHEMA IF NOT EXISTS PUBLIC
username: sa
password:
label: reliablity # 配置Label让资源隔离(性能测试用)
initial-size: 0
min-idle: 0
max-active: 8
- 如果在测试过程中不想使用 EventStore 功能,可以进行如下配置:
quantex:
phoenix:
event-store:
no-eventstore: true
事件归档
在选择JDBC支持的数据库时,由于有些数据库不具备动态扩容的功能,事件的存储并不能一直存储下去,需要定期归档。目前调研下来的数据库有
- 不支持动态扩容的: Mysql,Oracle
- 支持动态扩容的: TIDB
数据库归档的主要逻辑是:可以归档掉最新快照之前的事件和快照(如果没有其他事后分析需求,也可以删除)。
目前Phoenix提供归档SQL来方便用户进行数据的归档操作。
oracle版本
-- 归档快照表
-- 创建快照备份表
create table snapshot_store_his as
select *
from snapshot_store
where 0 = 1;
-- 归档快照, 条件snapshot.version < max(snapshot.version)
insert into snapshot_store_his
select *
from snapshot_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) s1
where snapshot_store.aggregate_id = s1.aggregate_id
and snapshot_store.version < s1.max_version);
-- 删除原表中已经归档的快照
delete
from snapshot_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) s1
where snapshot_store.aggregate_id = s1.aggregate_id
and snapshot_store.version < s1.max_version);
-- 归档事件表
-- 创建事件备份表
create table event_store_his as
select *
from event_store
where 0 = 1;
-- 归档事件, 条件event.version < max(snapshot.version)
insert into event_store_his
select *
from event_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) s1
where event_store.aggregate_id = s1.aggregate_id
and event_store.version < s1.max_version);
-- 删除原表中已归档的事件
delete
from event_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) s1
where event_store.aggregate_id = s1.aggregate_id and event_store.version < s1.max_version);
mysql版本
-- 归档快照表
-- 创建快照备份表
create table if not exists snapshot_store_his as select * from snapshot_store where 0=1;
-- 归档快照, 条件:snapshot.version < max(snapshot.version)
insert into snapshot_store_his
select *
from snapshot_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) as s1
where snapshot_store.aggregate_id = s1.aggregate_id and snapshot_store.version < s1.max_version);
-- 删除原表中已归档的快照
delete
from snapshot_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) as s1
where snapshot_store.aggregate_id = s1.aggregate_id and snapshot_store.version < s1.max_version);
-- 归档事件表
-- # 创建事件备份表
create table if not exists event_store_his as select * from event_store where 0=1;
-- # 归档事件,条件:event.version < max(snapshot.version)
insert into event_store_his
select *
from event_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) as s1
where event_store.aggregate_id = s1.aggregate_id and event_store.version < s1.max_version);
-- # 删除原表中已归档的事件
delete
from event_store
where exists(select 1
from (select aggregate_id, max(version) as max_version from snapshot_store group by aggregate_id) as s1
where event_store.aggregate_id = s1.aggregate_id and event_store.version < s1.max_version);