Phoenix 2.6.0 有一些重大变化,其中许多变化可能导致用户在升级后出现无法启动的问题,遵循下面的升级指南能够帮助 用户平滑升级。
官方公开的 Github 仓库中,也有升级到 2.6.0 的 Merge Request 可以参考:
- 不升级 Scala 版本:https://github.com/PhoenixIQ/phoenix-samples/pull/38
- 升级 Scala 版本 + 增加片段聚合根案例:https://github.com/PhoenixIQ/phoenix-samples/pull/37
一. API 变动
1. PhoenixClient
PhoenixClient 现在需要手动指定 RPC 返回的结果类型, 示例变动代码如下:
public String allocate(String account, double amt, String allocateNumber) {
AccountAllocateCmd cmd = new AccountAllocateCmd(account, amt, allocateNumber);
- Future<RpcResult> future = phoenixClient.send(cmd, accountTopic, "");
+ Future<RpcResult<String>> future = phoenixClient.send(cmd, accountTopic, "");
try {
RpcResult result = future.get(10, TimeUnit.SECONDS);
return result.getMessage();
- 2.6.0
- 2.5.x
public interface IPhoenixClient {
/**
* Client 端异步发送接口. 回复到指定 topic
*
* @param msg rpc调用消息
* @param targetTopic 接收命令的服务 Topic
* @param sourceTopic 接收回复的响应服务 Topic
* @param requestId 请求ID
*/
MessageMetaData sendNoReply(
Object msg, String targetTopic, String sourceTopic, String requestId);
/**
* Client 端异步发送接口. 不需要回复
*
* @param msg rpc调用消息
* @param targetTopic 目的地 Topic
* @param requestId 请求ID
*/
MessageMetaData sendNoReply(Object msg, String targetTopic, String requestId);
}
public interface IPhoenixClient {
/**
* Client 端异步发送接口. 回复到指定 topic
*
* @param msg rpc调用消息
* @param targetTopic 接收命令的服务 Topic
* @param sourceTopic 接收回复的响应服务 Topic
* @param requestId 请求ID
*/
void sendNoReply(
Object msg, String targetTopic, String sourceTopic, String requestId);
/**
* Client 端异步发送接口. 不需要回复
*
* @param msg rpc调用消息
* @param targetTopic 目的地 Topic
* @param requestId 请求ID
*/
void sendNoReply(Object msg, String targetTopic, String requestId);
}
2. SourceCollect
一个示例的修改如下:
class SelfSerializeSchema implements SourceCollect {
@Override
- public List<CollectResult> collect(Records records, CollectMetaData collectMetaData) {
+ public List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaData) {
- 2.6.0
- 2.5.x
public interface SourceCollect {
/**
* 转换消息,实现中可以自定义反序列化,广播等各种操作
*
* <p>{@link CollectResult}是为了用户可以自定义一些消息处理路径.
*
* @param records 源数据
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaDataQuery);
/**
* 是否支持批量转换消息.
*
* @return
*/
default boolean supportBatchCollect() {
return false;
}
/**
* 批量转换消息. 使用批量消息转换时, 必须严格按照源数据集合的顺序处理.
*
* @param recordsList 源数据集合
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
default List<CollectResult> batchCollect(
List<Records> recordsList, CollectMetaDataQuery collectMetaDataQuery) {
return Collections.emptyList();
}
}
public interface SourceCollect {
/**
* 转换消息,实现中可以自定义反序列化,广播等各种操作
*
* <p>{@link CollectResult}是为了用户可以自定义一些消息处理路径.
*
* @param records 源数据
* @param collectMetaData 转换注册源数据
* @return 消息实体列表
*/
List<CollectResult> collect(Records records, CollectMetaData collectMetaData);
}
二. 数据库表结构变动
- 2.6.0
- 2.5.x
create table if not exists event_store
(
aggregate_root_type varchar(255) not null,
aggregate_id varchar(255) not null,
version bigint(19) not null,
idempotent_id varchar(128) not null,
event_content longblob null,
command_class_name varchar(256) default '' null,
create_time timestamp(3) default CURRENT_TIMESTAMP(3) null,
primary key (aggregate_id, version),
constraint event_store_cmd_id_idx
unique (idempotent_id)
);
create index idx_create_time
on event_store (create_time);
create table if not exists event_store
(
aggregate_root_type varchar(255) not null,
aggregate_id varchar(255) not null,
version bigint(19) not null,
idempotent_id varchar(128) not null,
event_content longblob null,
create_time timestamp(3) default CURRENT_TIMESTAMP(3) null,
primary key (aggregate_id, version),
constraint event_store_cmd_id_idx
unique (idempotent_id)
);
create index idx_create_time
on event_store (create_time);
三. 监控方式变动
在 2.6.0 版本抽象 Phoenix APM 埋点模块后,用户需要选择并主动引入埋点模块的依赖:
phoenix-telemetry-jmx
:基于 JMX 的代码埋点(如同2.5.X 及之前)phoenix-telemetry-otel
:基于 OpenTelemetry 的代码埋点,导出指标的方式不同,请参考监控文档:配置指标导出器
+ <dependency>
+ <groupId>com.iquantex</groupId>
+ <artifactId>phoenix-telemetry-jmx</artifactId>
+ <version>${phoenix.version}</version>
+ </dependency>
四. Scala 版本升级
在 2.6.0 版本中, Phoenix 升级了部分依赖的同时升级了 Scala 版本(2.13.11), 并支持发布多 Scala 版本发布, 用户可通过引入依赖中附加后缀的方式选择 scala 版本,目前依赖 scala 并支持 2.12 版本的模块如下:(使用 2.13 版本则无需显式指定 scala 后缀)
其余未指定模块无需指定 scala 版本.
[INFO] phoenix-client-starter_2.12 [jar]
[INFO] phoenix-cluster-core_2.12 [jar]
[INFO] phoenix-cluster-nacos_2.12 [jar]
[INFO] phoenix-cluster-consul_2.12 [jar]
[INFO] phoenix-cluster-eureka_2.12 [jar]
[INFO] phoenix-console_2.12 [jar]
[INFO] phoenix-dgc_2.12 [jar]
[INFO] phoenix-distributed-data_2.12 [jar]
[INFO] phoenix-distributed-data-starter_2.12 [jar]
[INFO] phoenix-event-publish_2.12 [jar]
[INFO] phoenix-event-publish-starter_2.12 [jar]
[INFO] phoenix-eventstore-jdbc_2.12 [jar]
[INFO] phoenix-kafka-extend-starter_2.12 [jar]
[INFO] phoenix-server_2.12 [jar]
[INFO] phoenix-server-starter_2.12 [jar]
[INFO] phoenix-starter-autoconfigure_2.12 [jar]
[INFO] phoenix-stream-kafka_2.12 [jar]
[INFO] phoenix-stream-kafka-starter_2.12 [jar]
[INFO] phoenix-telemetry-jmx_2.12 [jar]
[INFO] phoenix-telemetry-otel_2.12 [jar]
[INFO] phoenix-transaction_2.12 [jar]
当用户不希望升级到新版的 Scala 时,需要变更一些依赖的工件 ID:
<dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-server-starter</artifactId>
+ <artifactId>phoenix-server-starter_2.12</artifactId>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-transaction</artifactId>
+ <artifactId>phoenix-transaction_2.12</artifactId>
</dependency>