Phoenix 2.6.0 迁移指南
Phoenix 2.6.0 有一些重大变化,其中许多变化可能导致用户在升级后出现无法启动的问题,遵循下面的升级指南能够帮助 用户平滑升级。
官方公开的 Github 仓库中,也有升级到 2.6.0 的 Merge Request 可以参考:
- 不升级 Scala 版本:https://github.com/PhoenixIQ/phoenix-samples/pull/38
- 升级 Scala 版本 + 增加片段聚合根案例:https://github.com/PhoenixIQ/phoenix-samples/pull/37
一. API 变动
1. PhoenixClient
PhoenixClient 现在需要手动指定 RPC 返回的结果类型, 示例变动代码如下:
public String allocate(String account, double amt, String allocateNumber) {
AccountAllocateCmd cmd = new AccountAllocateCmd(account, amt, allocateNumber);
- Future<RpcResult> future = phoenixClient.send(cmd, accountTopic, "");
+ Future<RpcResult<String>> future = phoenixClient.send(cmd, accountTopic, "");
try {
RpcResult result = future.get(10, TimeUnit.SECONDS);
return result.getMessage();
- 2.6.0
- 2.5.x
public interface IPhoenixClient {
/**
* Client 端异步发送接口. 回复到指定 topic
*
* @param msg rpc调用消息
* @param targetTopic 接收命令的服务 Topic
* @param sourceTopic 接收回复的响应服务 Topic
* @param requestId 请求ID
*/
MessageMetaData sendNoReply(
Object msg, String targetTopic, String sourceTopic, String requestId);
/**
* Client 端异步发送接口. 不需要回复
*
* @param msg rpc调用消息
* @param targetTopic 目的地 Topic
* @param requestId 请 求ID
*/
MessageMetaData sendNoReply(Object msg, String targetTopic, String requestId);
}
public interface IPhoenixClient {
/**
* Client 端异步发送接口. 回复到指定 topic
*
* @param msg rpc调用消息
* @param targetTopic 接收命令的服务 Topic
* @param sourceTopic 接收回复的响应服务 Topic
* @param requestId 请求ID
*/
void sendNoReply(
Object msg, String targetTopic, String sourceTopic, String requestId);
/**
* Client 端异步发送接口. 不需要回复
*
* @param msg rpc调用消息
* @param targetTopic 目的地 Topic
* @param requestId 请求ID
*/
void sendNoReply(Object msg, String targetTopic, String requestId);
}
2. SourceCollect
一个示例的修改如下:
class SelfSerializeSchema implements SourceCollect {
@Override
- public List<CollectResult> collect(Records records, CollectMetaData collectMetaData) {
+ public List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaData) {
- 2.6.0
- 2.5.x
public interface SourceCollect {
/**
* 转换消息,实现中可以自定义反序列化,广播等各种操作
*
* <p>{@link CollectResult}是为了用户可以自定义一些消息处理路径.
*
* @param records 源数据
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaDataQuery);
/**
* 是否支持批量转换消息.
*
* @return
*/
default boolean supportBatchCollect() {
return false;
}
/**
* 批量转换消息. 使用批量消息转换时, 必须严格按照源数据集合的顺序处理.
*
* @param recordsList 源数据集合
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
default List<CollectResult> batchCollect(
List<Records> recordsList, CollectMetaDataQuery collectMetaDataQuery) {
return Collections.emptyList();
}
}
public interface SourceCollect {
/**
* 转换消息,实现中可以自定义反序列化,广播等各种操作
*
* <p>{@link CollectResult}是为了用户可以自定义一些消息处理路径.
*
* @param records 源数据
* @param collectMetaData 转换注册源数据
* @return 消息实体列表
*/
List<CollectResult> collect(Records records, CollectMetaData collectMetaData);
}
二. 数据库表结构变动
- 2.6.0
- 2.5.x
create table if not exists event_store
(
aggregate_root_type varchar(255) not null,
aggregate_id varchar(255) not null,
version bigint(19) not null,
idempotent_id varchar(128) not null,
event_content longblob null,
command_class_name varchar(256) default '' null,
create_time timestamp(3) default CURRENT_TIMESTAMP(3) null,
primary key (aggregate_id, version),
constraint event_store_cmd_id_idx
unique (idempotent_id)
);
create index idx_create_time
on event_store (create_time);
create table if not exists event_store
(
aggregate_root_type varchar(255) not null,
aggregate_id varchar(255) not null,
version bigint(19) not null,
idempotent_id varchar(128) not null,
event_content longblob null,
create_time timestamp(3) default CURRENT_TIMESTAMP(3) null,
primary key (aggregate_id, version),
constraint event_store_cmd_id_idx
unique (idempotent_id)
);
create index idx_create_time
on event_store (create_time);
三. 监控方式变动
在 2.6.0 版本抽象 Phoenix APM 埋点模块后,用户需要选择并主动引入埋点模块的依赖:
phoenix-telemetry-jmx
:基于 JMX 的代码埋点(如同2.5.X 及之前)phoenix-telemetry-otel
:基于 OpenTelemetry 的代码埋点,导出指标的方式不同,请参考监控文档:配置指标导出器
+ <dependency>
+ <groupId>com.iquantex</groupId>
+ <artifactId>phoenix-telemetry-jmx</artifactId>
+ <version>${phoenix.version}</version>
+ </dependency>
四. Scala 版本升级
在 2.6.0 版本中, Phoenix 升级了部分依赖的同时升级了 Scala 版本(2.13.11), 并支持发布多 Scala 版本发布, 用户可通过引入依赖中附加后缀的方式选择 scala 版本,目前依赖 scala 并支持 2.12 版本的模块如下:(使用 2.13 版本则无需显式指定 scala 后 缀)
其余未指定模块无需指定 scala 版本.
[INFO] phoenix-client-starter_2.12 [jar]
[INFO] phoenix-cluster-core_2.12 [jar]
[INFO] phoenix-cluster-nacos_2.12 [jar]
[INFO] phoenix-cluster-consul_2.12 [jar]
[INFO] phoenix-cluster-eureka_2.12 [jar]
[INFO] phoenix-console_2.12 [jar]
[INFO] phoenix-dgc_2.12 [jar]
[INFO] phoenix-distributed-data_2.12 [jar]
[INFO] phoenix-distributed-data-starter_2.12 [jar]
[INFO] phoenix-event-publish_2.12 [jar]
[INFO] phoenix-event-publish-starter_2.12 [jar]
[INFO] phoenix-eventstore-jdbc_2.12 [jar]
[INFO] phoenix-kafka-extend-starter_2.12 [jar]
[INFO] phoenix-server_2.12 [jar]
[INFO] phoenix-server-starter_2.12 [jar]
[INFO] phoenix-starter-autoconfigure_2.12 [jar]
[INFO] phoenix-stream-kafka_2.12 [jar]
[INFO] phoenix-stream-kafka-starter_2.12 [jar]
[INFO] phoenix-telemetry-jmx_2.12 [jar]
[INFO] phoenix-telemetry-otel_2.12 [jar]
[INFO] phoenix-transaction_2.12 [jar]
当用户不希望升级到新版的 Scala 时,需要变更一些依赖的工件 ID:
<dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-server-starter</artifactId>
+ <artifactId>phoenix-server-starter_2.12</artifactId>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-transaction</artifactId>
+ <artifactId>phoenix-transaction_2.12</artifactId>
</dependency>
为了避免同时引入两个相同依赖的不同版本的 NoClassFoundError
(Scala 二进制敏感) 问题, 可在父工程下引入下面的插件来帮助检查 Scala 版本冲突
通过命令:
mvn clean compile validate
执行校验,下面的案例用于排查不符合 scala 2.13 升级的依赖
<build>
<plugins>
<!-- make sure we don't have any _2.10, _2.11 or _2.12 dependencies when building
for Scala 2.13 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version>
<executions>
<execution>
<id>enforce-versions</id>
<phase>validate</phase>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<bannedDependencies>
<excludes combine.children="append">
<exclude>*:*_2.10</exclude>
<exclude>*:*_2.11</exclude>
<exclude>*:*_2.12</exclude>
</excludes>
<message>Scala 2.10/2.11/2.12 dependencies are not allowed for Scala 2.13
builds.
This can be caused by hard-coded scala versions, where the
'scala.binary.version' property should be used instead.
</message>
</bannedDependencies>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
五. Kafka 版本升级
在 2.6.0 中,我们将依赖的 Kafka-Clients 版本升级到了 2.4.1,并且在默认模式下启用了新版本的 API,如用户不希望升级到最新版本的 Kafka,则可以遵循以下步骤回退 KafkaClient 版本:
5.1 排除 Phoenix 依赖中的 Kafka 版本
根据项目按需排除,只有下面三个依赖需要排除:
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-client-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
5.2 在依赖 Phoenix 上述包的项目中手动引入指定版本 Kafka-Client
<dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
5.3 更换 MQ 消费模式
quantex:
phoenix:
server:
mq:
+ consumer-type: poll # 增加此配置,并修改为 POLL,默认下为 PUSH
type: kafka
address: 127.0.0.1:9092
subscribe:
六. 实体聚合根注解变化
在 2.6.0 中,实体聚合根新增了一些特性,因此在注解上新增了一些内容
@EntityAggregateAnnotation(
aggregateRootType = "BankAccount",
idempotentSize = 100,
snapshotInterval = 100000,
numberOfRetainSnapshots = 2,
+ allowPassivation = true,
+ batchWeight = 200,
+ runningMode= Mode.SYNC,
+ bufferSize = 200,
+ snapshotMode = SnapshotMode.EAGER
)
public class BankAccountAggregate implements SelectiveSnapshot<SnapshotData> {
对于原有用快照周期来关闭快照功能的用户,则需要以使用以下的改变:
@EntityAggregateAnnotation(
aggregateRootType = "BankAccount",
- snapshotInterval = 0,
+ snapshotMode = SnapshotMode.DISABLE
)
public class BankAccountAggregate implements SelectiveSnapshot<SnapshotData> {
此外,实体聚合根攒批大小(性能调优配置)也从独立的 Spring 配置中迁移到聚合根注解配置中:
quantex:
phoenix:
server:
- performance:
- batch-process: 200
+ entityAggregate:
+ BankAccount: # {aggregateRootType}
+ batchWeight: 200
七. 事务聚合根注解变化
在 2.6.0 中,对事务聚合根的配置进行重构,增加一些调优配置,并将大部分调优配置放在注解之上.
重试策略配置从 Spring 配置中迁移到注解中(仍然是退避策略=0, 固定频率=1):
quantex:
phoenix:
server:
- performance:
- transaction-retry-strategy: 0
+ transactionAggregate:
+ BankTransferSaga: # {aggregateRootType}
+ retryStrategy: 0