跳到主要内容
版本:2.5.5

使用说明

Phoenix中提供了可集成于Phoenix Server服务中的EventPublish功能模块,启用该模块可直接在Phoenix服务中运行Event Publish任务,服务部署时可通过配置定义Event Publish任务。

引用依赖

<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-event-publish-starter</artifactId>
<version>2.5.5</version>
</dependency>

功能说明

  • 可在Phoenix服务中开启Event-Publish能力,扫描event-store中的领域事件,发布到特定的Kafka Topic。
  • 可在Phoenix服务配置中定义Event-Publish模块的运行参数和定义event-task与monitor-task,event-task用于将Phoenix处理事件转发至指定的Topic,monitor-task可以采集Phoenix处理的消息信息,推送到Elasticsearch,通过Grafana进行监控。
  • Event-Publish任务保证读取的Event-Store领域事件完整性,不出现事件遗漏、丢失。
  • Event-Publish任务保证相同的聚合根ID的领域事件按照Version严格递增的顺序投递到Kafka。
  • Event-Publish任务支持多节点负载均衡和高可用,需要依赖Phoenix服务本身的Akka集群能力。

使用说明

Phoenix-Event-Publish功能模块对Phoenix服务本身没有代码入侵,要打开Event-Publish功能模块,仅需要使用quantex.phoenix.event-publish配置。

要开启Event-Publish功能模块,需要将Phoenix服务的quantex.phoenix.event-publsh.enabled配置置为true

配置详情

Event-Publish模块完全使用配置定义和声明服务中需要运行的Event-Publish任务,完全配置参见:

配置项描述类型默认值
quantex.phoenix.event-publish.batch-size批量大小int128
quantex.phoenix.event-publish.buffer-queue-size缓存队列大小int64
quantex.phoenix.event-publish.state-table-nameEventPublish状态表名称Stringevent_publish_state
quantex.phoenix.event-publish.from-begin新建状态或状态不存在时,是否重置读取位置到EventStore最早时间戳booleanfalse
quantex.phoenix.event-publish.event-task.enabled是否开启领域事件发布booleantrue
quantex.phoenix.event-publish.event-task.topic领域事件发布的目标topicString
quantex.phoenix.event-publish.monitor-task.enabled开关
quantex.phoenix.event-publish.monitor-task.es-server消息监控上报的elasticsearch服务地址String

以上配置为Event-Publish功能模块的通用配置,应用于服务内所有Event-Publish任务。

使用示例:

quantex:
phoenix:
event-publish:
batch-size: 64
buffer-queue-size: 64
from-begin: true
state-table-name: event_publish_state
event-task:
enabled: true
topic: bank-account-event-pub
monitor-task:
es-server: 127.0.0.1:9200

事件过滤器

从数据库中读取到的事件会放到阻塞队列中,由事件处理线程读取并发送到Kafka中,当业务系统依赖了phoenix-stater时,可以通过依赖注入的方式自定义Event Handler做事件拦截。

Event Handler提供了getOrder()接口,定义了处理器的执行顺序(Integer.MIN_VALUE > Integer.MAX_VALUE)。当handleBatch()接口返回null,则不继续执行下面的处理器。


public class MarketChangePublishHandler implements EventHandler<Phoenix.Message, Phoenix.Message> {
@Override
public CommittableEventBatchWrapper<Phoenix.Message> handleBatch(CommittableEventBatchWrapper<Phoenix.Message> batchWrapper) {
List<EventStoreRecord<Phoenix.Message>> events = batchWrapper.getEvents();

Iterator<EventStoreRecord<Phoenix.Message>> iterator = events.iterator();
// 只发布行情变更事件
while (iterator.hasNext()) {
EventStoreRecord<Phoenix.Message> event = iterator.next();
Phoenix.Message content = event.getContent();
String payloadClassName = content.getPayloadClassName();
if (!payloadClassName.equals(MarketChangeEvent.class.getName())) {
iterator.remove();
}
}
return batchWrapper;
}

@Override
public int getOrder() {
return 0;
}
}


自定义投递 Key

默认情况下, EventPublish 会以用户聚合根 ID 作为 Key, 消息作为内容投递到 Kafka.

对于特定需求, 如用户需要将同一类型的聚合根投递到某个 Partition. Phoenix 支持自定义 EventPublish 投递到 Kafka 的 Key. 其主要的核心机制建立在 Message 中的元数据(MetaData).

支持如下两种场景的自定义。

  • 从订阅中生成 Phoenix 消息,在处理订阅的消息转换的 SourceCollect 中使用 MessageFactory 附加 EventPublish Key.
  • 客户端主动发送 Phoenix 消息, 使用 PhoenixClient 新增 API 附加 EventPublish Key 到 Phoenix 消息.
信息

注意:目前事务场景下, 事务聚合根发出的 Ti,Ci 等消息暂不支持自定义投递 Key.

PhoenixClient 的新增 API 请参考客户端介绍

MessageFactory 的自定义 Key API 如下:


/**
* 获取聚合根处理命令(自定义 EventPublish Key)
*
* @param dst
* @param src
* @param payload
* @param requestId
* @param eventPublishKey 投影(EventPublish)发送到 Kafka 时的 Key
* @return
*/
public static Message getCmdMsgWithEventPublishKey(String dst, String src, Object payload, String requestId, String eventPublishKey);
/**
* 获取聚合根处理命令(自定义 EventPublish Key)
*
* @param dst
* @param src
* @param payload
* @param eventPublishKey 投影(EventPublish)发送到 Kafka 时的 Key
* @return
*/
public static Message getCmdMsgWithEventPublishKey(String dst, String src, Object payload, String eventPublishKey);