跳到主要内容
版本:latest

基于 PhoenixClient 的订阅

Phoenix Client基于 Phoenix SDK 实现了更简洁的接口,通常该类用于抽象 Producer 的能力,但是该模块也支持实现 Consumer 的能力。

本文只介绍订阅模式,并以例子的方式展示如何使用 PhoenixClient 实现订阅。

介绍

整个实现的背景基于 PhoenixClient 的 reply() 方法,详情看相应的文档:PhoenixClient 回复机制

在这个场景的基础上,又补充了非回复机制的订阅功能,让 PhoenixClient 变为更通用的 SDK 能力,因此整个 PhoenixClient 订阅分为两个接口:

  • MessageListener:用于实现订阅功能,不需要回复
  • ReplyMessageListener:用于实现订阅功能,支持回复机制

MessageListener 接口定义

PhoenixClient 提供了一个 MessageListener 接口,用户只需要继承该类,然后实现 subscribeTopic()onMessage() 方法即可。

由接口定义不难看出,PhoenixClient 会负责将启动一个 Spring Kafka Listener Container(也就是 KafkaConsumer),然后监听对应的 Topic,当有消息到来时,Phoenix 会自动调用 onMessage() 方法,并将消息体传入,用户只需要处理消息即可。

除此之外还有一些其他信息:

  • BatchAcknowledgingMessageListener:消息会批量拉取和由 SDK 手动 Ack,除此之外大部分遵循 Spring Kafka 的默认配置,用于可通过 consumerProperties() 方法自定义 Kafka Consumer 配置,或者通过 assembleContainer() 方法自定义 Spring Kafka Consumer 配置。
  • errorHandle:异常处理,Phoenix 默认会打印异常日志,用户可重写该方法自定义异常处理。
public abstract class MessageListener implements BatchAcknowledgingMessageListener<String, byte[]> {

/** 订阅主题 */
public abstract String subscribeTopic();

/** 处理消息, 如果重写了 batchHandle 方法,则该方法实现为空即可 */
public abstract void onMessage(Message message);

/** Kafka 消费者配置 */
public Properties consumerProperties() {
return new Properties();
}

/** 自定义 Spring Kafka Consumer 配置, 此接口配置优先级高于 {@link #consumerProperties()} */
public ConcurrentMessageListenerContainer<String, byte[]> assembleContainer(
ConcurrentMessageListenerContainer<String, byte[]> container) {
return container;
}

/** 批量处理接口, 可以手动 ack, 但是需要自己实现: 1. reply 2. error handle 3. acknowledgment 并重载此方法 */
public void batchHandle(List<Message> messageList, Acknowledgment acknowledgment) {
// 内部实现了:
// 1. 调用 handle,判断是否为空,不为空则 reply,在 reply 后调用 callback
// 2. error handle(catch 了 Throwable)
// 3. 批量 commit
}

/** 异常处理 */
public void errorHandle(Message message, Exception e) {
// Phoenix 默认会打印异常日志
}
}

ReplyMessageListener 接口定义

在 MessageListener 的基础上,我们基于 Spring 实现了一个 ReplyMessageListener,用户只需要继承该类,然后实现 subscribeTopic()handle() 方法即可(如果需要处理回复之后的回调和异常处理,可以重写 callback()errorHandle() 方法)。

将该类的子类注册到 Spring Bean 之后,Phoenix 会负责将启动一个 Spring Kafka Listener Container(也就是 KafkaConsumer),然后监听对应的 Topic,当有消息到来时,Phoenix 会自动调用 handle() 方法,并将消息体传入,用户只需要处理消息并返回对应的对象即可, 如果用户不需要问题处理,可以直接返回 null,Phoenix 则会忽略响应。

异常处理与 MessageListener 稍微不同,ReplyMessageListener 不仅会打印日志,也会往回复队列(如果消息中存在)中发送异常事件的消息(Phoenix.ExceptionEvent)。

public abstract class ReplyMessageListener<T> extends MessageListener implements PhoenixReplyClientAware {

/**
* 处理消息并回复.
* 如果返回 null, 则不会回复
* 如果重写了 batchHandle 方法,则该方法实现为空即可
*/
public abstract T handle(Message message);

public void callback(Message message, MessageMetaData metaData) {
log.debug("handle msg<{}> success, reply with : {}", message.identify(), metaData);
}

/** 批量处理接口, 可以手动 ack, 但是需要自己实现: 1. reply 2. error handle 3. acknowledgment 并重载此方法 */
public void batchHandle(List<Message> messageList, Acknowledgment acknowledgment) {
// 内部实现了:
// 1. 调用 handle,判断是否为空,不为空则 reply,在 reply 后调用 callback
// 2. error handle(catch 了 Throwable)
// 3. 批量 commit
}

/** 手动回复方法 */
public MessageMetaData reply(T result, Message parent, RetCode retCode, String retMsg) {
// 无须用户实现
}

/** ReplyClient 设置接口,由 SDK 传入,用户可重写后自定义 */
public void setPhoenixReplyClient(PhoenixReplyClient client) {
this.client = client;
}
}

Maven依赖

<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-client-starter</artifactId>
<version>2.6.1</version>
</dependency>

示例说明

下面是一个简单的例子,我们定义一个 CreateListener,监听 EventPublish 发布的账户创建事件,并且在收到创建事件后,发送一个划拨增加余额的命令,给每个新创建的用户增加 10 元的新人礼包余额。

// 注册为 Spring Bean
@Component
public class CreateListener extends ReplyMessageListener<Object> {

// 多 Topic 订阅写法
@Value("#{'${phoenix.server.topic.account-event-pub}'.split(',')}")
private String[] topics;

@Override
public String[] subscribeTopic() {
return topics;
}

// 单 Topic 订阅写法
@Value("${phoenix.server.topic.account-event-pub}")
private String topic;

@Override
public String[] subscribeTopic() {
return new String[] {topic};
}

// 消费逻辑
@Override
public Object handle(Message message) {
String payloadClassName = message.getPayloadClassName();
// 如果是创建命令,则发送一个划拨增加余额
if (AccountCreateEvent.class.getName().equals(payloadClassName)) {
AccountCreateEvent createEvent = message.getPayload();
return Account.AccountAllocateCmd.newBuilder()
.setAccountCode(createEvent.getAccountCode())
.setAmt(10)
.setAllocateNumber(UUIDUtils.genUUID())
.build();
}
// 非创建命令,不产生回复(直接返回 null, phx 会自动过滤)
return null;
}
}

相应的 Spring 配置如下:

# spring 相关, consumer 的主要配置由此配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: "phoenix-client-subscriber"
# 其他配置,无须配置序列化/反序列化
# phoenix 相关配置
quantex.phoenix.client:
name: ${spring.application.name}-client
# PhoenixClient 是必备的,因为 ReplyMessageListener 会使用 PhoenixClient 来回复事件
mq:
type: kafka
address: 127.0.0.1:9092
topic: ${spring.application.name}-client
# 启动 Subscribe 功能,开启后会自动识别 ReplyMessageListener
subscribe:
enabled: true

案例验证

创建一个账户,然后观察 Phoenix Console 中的事件列表,可以看到账户创建后,会有一个划拨增加余额的命令。

event-list.png

观察划拨命令的事件信息,可以观察到划拨命令中的 parentMsgId 存在,这是因为 PhoenixClient 会自动将“账户创建事件”的 msgId 作为划拨命令的 parentMsgId,形成关联关系。

command-parent-id.png

验证“账户创建事件”的 msgId 和“划拨命令”的 parentMsgId 是一致的。

event-msg-id.png

总结

由上面的案例看出,Phoenix Client 的 ReplyMessageListener 帮用户实现了序列化/反序列化,消息关联等能力,又无须引入 Phoenix Framework。