跳到主要内容

Phoenix 2.6.0 迁移指南

· 阅读需 12 分钟
jingzhang.chen

Phoenix 2.6.0 有一些重大变化,其中许多变化可能导致用户在升级后出现无法启动的问题,遵循下面的升级指南能够帮助 用户平滑升级。

官方公开的 Github 仓库中,也有升级到 2.6.0 的 Merge Request 可以参考:

一. API 变动

1. PhoenixClient

PhoenixClient 现在需要手动指定 RPC 返回的结果类型, 示例变动代码如下:

        public String allocate(String account, double amt, String allocateNumber) {
AccountAllocateCmd cmd = new AccountAllocateCmd(account, amt, allocateNumber);
- Future<RpcResult> future = phoenixClient.send(cmd, accountTopic, "");
+ Future<RpcResult<String>> future = phoenixClient.send(cmd, accountTopic, "");
try {
RpcResult result = future.get(10, TimeUnit.SECONDS);
return result.getMessage();
public interface IPhoenixClient {
/**
* Client 端异步发送接口. 回复到指定 topic
*
* @param msg rpc调用消息
* @param targetTopic 接收命令的服务 Topic
* @param sourceTopic 接收回复的响应服务 Topic
* @param requestId 请求ID
*/
MessageMetaData sendNoReply(
Object msg, String targetTopic, String sourceTopic, String requestId);

/**
* Client 端异步发送接口. 不需要回复
*
* @param msg rpc调用消息
* @param targetTopic 目的地 Topic
* @param requestId 请求ID
*/
MessageMetaData sendNoReply(Object msg, String targetTopic, String requestId);
}

2. SourceCollect

一个示例的修改如下:

 class SelfSerializeSchema implements SourceCollect {

@Override
- public List<CollectResult> collect(Records records, CollectMetaData collectMetaData) {
+ public List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaData) {
public interface SourceCollect {

/**
* 转换消息,实现中可以自定义反序列化,广播等各种操作
*
* <p>{@link CollectResult}是为了用户可以自定义一些消息处理路径.
*
* @param records 源数据
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaDataQuery);

/**
* 是否支持批量转换消息.
*
* @return
*/
default boolean supportBatchCollect() {
return false;
}

/**
* 批量转换消息. 使用批量消息转换时, 必须严格按照源数据集合的顺序处理.
*
* @param recordsList 源数据集合
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
default List<CollectResult> batchCollect(
List<Records> recordsList, CollectMetaDataQuery collectMetaDataQuery) {
return Collections.emptyList();
}
}

二. 数据库表结构变动

create table if not exists event_store
(
aggregate_root_type varchar(255) not null,
aggregate_id varchar(255) not null,
version bigint(19) not null,
idempotent_id varchar(128) not null,
event_content longblob null,
command_class_name varchar(256) default '' null,
create_time timestamp(3) default CURRENT_TIMESTAMP(3) null,
primary key (aggregate_id, version),
constraint event_store_cmd_id_idx
unique (idempotent_id)
);

create index idx_create_time
on event_store (create_time);

三. 监控方式变动

在 2.6.0 版本抽象 Phoenix APM 埋点模块后,用户需要选择并主动引入埋点模块的依赖:

  • phoenix-telemetry-jmx:基于 JMX 的代码埋点(如同2.5.X 及之前)
  • phoenix-telemetry-otel:基于 OpenTelemetry 的代码埋点,导出指标的方式不同,请参考监控文档:配置指标导出器
+        <dependency>
+ <groupId>com.iquantex</groupId>
+ <artifactId>phoenix-telemetry-jmx</artifactId>
+ <version>${phoenix.version}</version>
+ </dependency>

四. Scala 版本升级

在 2.6.0 版本中, Phoenix 升级了部分依赖的同时升级了 Scala 版本(2.13.11), 并支持发布多 Scala 版本发布, 用户可通过引入依赖中附加后缀的方式选择 scala 版本,目前依赖 scala 并支持 2.12 版本的模块如下:(使用 2.13 版本则无需显式指定 scala 后缀)

其余未指定模块无需指定 scala 版本.

[INFO] phoenix-client-starter_2.12                                        [jar]
[INFO] phoenix-cluster-core_2.12 [jar]
[INFO] phoenix-cluster-nacos_2.12 [jar]
[INFO] phoenix-cluster-consul_2.12 [jar]
[INFO] phoenix-cluster-eureka_2.12 [jar]
[INFO] phoenix-console_2.12 [jar]
[INFO] phoenix-dgc_2.12 [jar]
[INFO] phoenix-distributed-data_2.12 [jar]
[INFO] phoenix-distributed-data-starter_2.12 [jar]
[INFO] phoenix-event-publish_2.12 [jar]
[INFO] phoenix-event-publish-starter_2.12 [jar]
[INFO] phoenix-eventstore-jdbc_2.12 [jar]
[INFO] phoenix-kafka-extend-starter_2.12 [jar]
[INFO] phoenix-server_2.12 [jar]
[INFO] phoenix-server-starter_2.12 [jar]
[INFO] phoenix-starter-autoconfigure_2.12 [jar]
[INFO] phoenix-stream-kafka_2.12 [jar]
[INFO] phoenix-stream-kafka-starter_2.12 [jar]
[INFO] phoenix-telemetry-jmx_2.12 [jar]
[INFO] phoenix-telemetry-otel_2.12 [jar]
[INFO] phoenix-transaction_2.12 [jar]

当用户不希望升级到新版的 Scala 时,需要变更一些依赖的工件 ID:

         <dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-server-starter</artifactId>
+ <artifactId>phoenix-server-starter_2.12</artifactId>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
- <artifactId>phoenix-transaction</artifactId>
+ <artifactId>phoenix-transaction_2.12</artifactId>
</dependency>

为了避免同时引入两个相同依赖的不同版本的 NoClassFoundError(Scala 二进制敏感) 问题, 可在父工程下引入下面的插件来帮助检查 Scala 版本冲突

通过命令:mvn clean compile validate 执行校验,下面的案例用于排查不符合 scala 2.13 升级的依赖

<build>
<plugins>
<!-- make sure we don't have any _2.10, _2.11 or _2.12 dependencies when building
for Scala 2.13 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version>
<executions>
<execution>
<id>enforce-versions</id>
<phase>validate</phase>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<bannedDependencies>
<excludes combine.children="append">
<exclude>*:*_2.10</exclude>
<exclude>*:*_2.11</exclude>
<exclude>*:*_2.12</exclude>
</excludes>
<message>Scala 2.10/2.11/2.12 dependencies are not allowed for Scala 2.13
builds.
This can be caused by hard-coded scala versions, where the
'scala.binary.version' property should be used instead.
</message>
</bannedDependencies>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

五. Kafka 版本升级

在 2.6.0 中,我们将依赖的 Kafka-Clients 版本升级到了 2.4.1,并且在默认模式下启用了新版本的 API,如用户不希望升级到最新版本的 Kafka,则可以遵循以下步骤回退 KafkaClient 版本:

5.1 排除 Phoenix 依赖中的 Kafka 版本

根据项目按需排除,只有下面三个依赖需要排除:

             <dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-client-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>

5.2 在依赖 Phoenix 上述包的项目中手动引入指定版本 Kafka-Client

     <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>1.0.0</version>
+ </dependency>
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-server-starter_2.12</artifactId>
<version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>

5.3 更换 MQ 消费模式

   quantex:
phoenix:
server:
mq:
+ consumer-type: poll # 增加此配置,并修改为 POLL,默认下为 PUSH
type: kafka
address: 127.0.0.1:9092
subscribe:

六. 实体聚合根注解变化

在 2.6.0 中,实体聚合根新增了一些特性,因此在注解上新增了一些内容

@EntityAggregateAnnotation(
aggregateRootType = "BankAccount",
idempotentSize = 100,
snapshotInterval = 100000,
numberOfRetainSnapshots = 2,
+ allowPassivation = true,
+ batchWeight = 200,
+ runningMode= Mode.SYNC,
+ bufferSize = 200,
+ snapshotMode = SnapshotMode.EAGER
)
public class BankAccountAggregate implements SelectiveSnapshot<SnapshotData> {

对于原有用快照周期来关闭快照功能的用户,则需要以使用以下的改变:

@EntityAggregateAnnotation(
aggregateRootType = "BankAccount",
- snapshotInterval = 0,
+ snapshotMode = SnapshotMode.DISABLE
)
public class BankAccountAggregate implements SelectiveSnapshot<SnapshotData> {

此外,实体聚合根攒批大小(性能调优配置)也从独立的 Spring 配置中迁移到聚合根注解配置中:

quantex:
phoenix:
server:
- performance:
- batch-process: 200
+ entityAggregate:
+ BankAccount: # {aggregateRootType}
+ batchWeight: 200

七. 事务聚合根注解变化

在 2.6.0 中,对事务聚合根的配置进行重构,增加一些调优配置,并将大部分调优配置放在注解之上.

重试策略配置从 Spring 配置中迁移到注解中(仍然是退避策略=0, 固定频率=1):

quantex:
phoenix:
server:
- performance:
- transaction-retry-strategy: 0
+ transactionAggregate:
+ BankTransferSaga: # {aggregateRootType}
+ retryStrategy: 0

心跳配置新增多种类型, 增加攒批配置:

@TransactionAggregateAnnotation(
aggregateRootType = "BankTransferSaga",
- heartbeatInterval = 10
+ heartbeatTickInterval = 3,
+ heartbeatCheckInterval = 2,
+ retryStrategy = RetryStrategy.EXPONENTIAL_BACKOFF,
+ maxRetryNum = Integer.MAX_VALUE,
+ batchWeight = 200
)
public class BankTransferSaga implements Serializable {

八. EventStore 表结构变化

在 2.6.0 中,为了支持 Console 按命令查询,因此修改了 DDL 增加了命令名字段用于查询。

因此从 2.5.x 升级的用户可能会遇到字段缺失的异常,有两种解决方案:

  • 表重建
    1. 彻底停止应用
    2. 从数据库中删除表
    3. 重启 Phoenix 应用,通过 Phoenix 的启动建表重建
  • 通过数据库 DDL 修改表结构

以下是经过验证的各数据库 DDL:

-- DDL
CREATE TABLE event_store
(
aggregate_root_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
version BIGINT(19) NOT NULL,
idempotent_id VARCHAR(128) NOT NULL,
event_content longblob,
+ command_class_name VARCHAR(256) DEFAULT '',
create_time TIMESTAMP(3) DEFAULT current_timestamp(3),
constraint event_store_pk
PRIMARY KEY (aggregate_id, version)
)
-- 迁移 SQL
ALTER TABLE event_store ADD command_class_name VARCHAR(256) DEFAULT ''

九. 事务聚合根增加校验

原有的事务聚合根校验并不完善,为了规范化代码,事务聚合根增加了多个校验:

  • 事务聚合根入口(@TransactionStart) 方法必须以 act 命名
  • 事务聚合根只允许有一个无参数,名为 onFinish 方法,并且以 TransactionFinishReturn 返回
    @TransactionStart
- public TransactionReturn on(Account.AccountTransferCmd cmd) {
+ public TransactionReturn act(Account.AccountTransferCmd cmd) {
return TransactionReturn.builder().build();
}

public TransactionFinishReturn onFinsish() {
return TransactionFinishReturn.builder()
.retMessage("not allowed null here")
.data(new ResultEvent())
.build();
}