基于 PhoenixClient 的订阅
Phoenix Client基于 Phoenix SDK 实现了更简洁的接口,通常该类用于抽象 Producer 的能力,但是该模块也支持实现 Consumer 的能力。
本文只介绍订阅模式,并以例子的方式展示如何使用 PhoenixClient 实现订阅。
介绍
整个实现的背景基于 PhoenixClient 的 reply()
方法,详情看相应的文档:PhoenixClient 回复机制。
在这个场景的基础上,又补充了非回复机制的订阅功能,让 PhoenixClient 变为更通用的 SDK 能力,因此整个 PhoenixClient 订阅分为两个接口:
MessageListener
:用于实现订阅功能,不需要回复ReplyMessageListener
:用于实现订阅功能,支持回复机制
MessageListener 接口定义
PhoenixClient 提供了一个 MessageListener
接口,用户只需要继承该类,然后实现 subscribeTopic()
和 onMessage()
方法即可。
由接口定义不难看出,PhoenixClient 会负责将启动一个 Spring Kafka Listener Container(也就是 KafkaConsumer),然后监听对应的 Topic,当有消息到来时,Phoenix 会自动调用 onMessage()
方法,并将消息体传入,用户只需要处理消息即可。
除此之外还有一些其他信息:
- BatchAcknowledgingMessageListener:消息会批量拉取和由 SDK 手动 Ack,除此之外大部分遵循 Spring Kafka 的默认配置,用于可通过
consumerProperties()
方法自定义 Kafka Consumer 配置,或者通过assembleContainer()
方法自定义 Spring Kafka Consumer 配置。 - errorHandle:异常处理,Phoenix 默认会打印异常日志,用户可重写该方法自定义异常处理。
public abstract class MessageListener implements BatchAcknowledgingMessageListener<String, byte[]> {
/** 订阅主题 */
public abstract String subscribeTopic();
/** 处理消息, 如果重写了 batchHandle 方法,则该方法实现为空即可 */
public abstract void onMessage(Message message);
/** Kafka 消费者配置 */
public Properties consumerProperties() {
return new Properties();
}
/** 自定义 Spring Kafka Consumer 配置, 此接口配置优先级高于 {@link #consumerProperties()} */
public ConcurrentMessageListenerContainer<String, byte[]> assembleContainer(
ConcurrentMessageListenerContainer<String, byte[]> container) {
return container;
}
/** 批量处理接口, 可以手动 ack, 但是需要自己实现: 1. reply 2. error handle 3. acknowledgment 并重载此方法 */
public void batchHandle(List<Message> messageList, Acknowledgment acknowledgment) {
// 内部实现了:
// 1. 调用 handle,判断是否为空,不为空则 reply,在 reply 后调用 callback
// 2. error handle(catch 了 Throwable)
// 3. 批量 commit
}
/** 异常处理 */
public void errorHandle(Message message, Exception e) {
// Phoenix 默认会打印异常日志
}
}
ReplyMessageListener 接口定义
在 MessageListener 的基础上,我们基于 Spring 实现了一个 ReplyMessageListener
,用户只需要继承该类,然后实现 subscribeTopic()
和 handle()
方法即可(如果需要处理回复之后的回调和异常处理,可以重写 callback()
和 errorHandle()
方法)。
将该类的子类注册到 Spring Bean 之后,Phoenix 会负责将启动一个 Spring Kafka Listener Container(也就是 KafkaConsumer),然后监听对应的 Topic,当有消息到来时,Phoenix 会自动调用 handle()
方法,并将消息体传入,用户只需要处理消息并返回对应的对象即可,
如果用户不需要问题处理,可以直接返回 null
,Phoenix 则会忽略响应。
异常处理与 MessageListener 稍微不同,ReplyMessageListener 不仅会打印日志,也会往回复队列(如果消息中存在)中发送异常事件的消息(Phoenix.ExceptionEvent)。
public abstract class ReplyMessageListener<T> extends MessageListener implements PhoenixReplyClientAware {
/**
* 处理消息并回复.
* 如果返回 null, 则不会回复
* 如果重写了 batchHandle 方法,则该方法实现为空即可
*/
public abstract T handle(Message message);
public void callback(Message message, MessageMetaData metaData) {
log.debug("handle msg<{}> success, reply with : {}", message.identify(), metaData);
}
/** 批量处理接口, 可以手动 ack, 但是需要自己实现: 1. reply 2. error handle 3. acknowledgment 并重载此方法 */
public void batchHandle(List<Message> messageList, Acknowledgment acknowledgment) {
// 内部实现了:
// 1. 调用 handle,判断是否为空,不为空则 reply,在 reply 后调用 callback
// 2. error handle(catch 了 Throwable)
// 3. 批量 commit
}
/** 手动回复方法 */
public MessageMetaData reply(T result, Message parent, RetCode retCode, String retMsg) {
// 无须用户实现
}
/** ReplyClient 设置接口,由 SDK 传入,用户可重写后自定义 */
public void setPhoenixReplyClient(PhoenixReplyClient client) {
this.client = client;
}
}
Maven依赖
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-client-starter</artifactId>
<version>2.6.1</version>
</dependency>
示例说明
下面是一个简单的例子,我们定义一个 CreateListener
,监听 EventPublish 发布的账户创建事件,并且在收到创建事件后,发送一个划拨增加余额的命令,给每个新创建的用户增加 10 元的新人礼包余额。
// 注册为 Spring Bean
@Component
public class CreateListener extends ReplyMessageListener<Object> {
// 多 Topic 订阅写法
@Value("#{'${phoenix.server.topic.account-event-pub}'.split(',')}")
private String[] topics;
@Override
public String[] subscribeTopic() {
return topics;
}
// 单 Topic 订阅写法
@Value("${phoenix.server.topic.account-event-pub}")
private String topic;
@Override
public String[] subscribeTopic() {
return new String[] {topic};
}
// 消费逻辑
@Override
public Object handle(Message message) {
String payloadClassName = message.getPayloadClassName();
// 如果是创建命令,则发送一个划拨增加余额
if (AccountCreateEvent.class.getName().equals(payloadClassName)) {
AccountCreateEvent createEvent = message.getPayload();
return Account.AccountAllocateCmd.newBuilder()
.setAccountCode(createEvent.getAccountCode())
.setAmt(10)
.setAllocateNumber(UUIDUtils.genUUID())
.build();
}
// 非创建命令,不产生回复(直接返回 null, phx 会自动过滤)
return null;
}
}
相应的 Spring 配置如下:
# spring 相关, consumer 的主要配置由此配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: "phoenix-client-subscriber"
# 其 他配置,无须配置序列化/反序列化
# phoenix 相关配置
quantex.phoenix.client:
name: ${spring.application.name}-client
# PhoenixClient 是必备的,因为 ReplyMessageListener 会使用 PhoenixClient 来回复事件
mq:
type: kafka
address: 127.0.0.1:9092
topic: ${spring.application.name}-client
# 启动 Subscribe 功能,开启后会自动识别 ReplyMessageListener
subscribe:
enabled: true
案例验证
创建一个账户,然后观察 Phoenix Console 中的事件列表,可以看到账户创建后,会有一个划拨增加余额的命令。
观察划拨命令的事件信息,可以观察到划拨命令中的 parentMsgId
存在,这是因为 PhoenixClient 会自动将“账户创建事件”的 msgId
作为划拨命令的 parentMsgId
,形成关联关系。
验证“账户创建事件”的 msgId
和“划拨命令”的 parentMsgId
是一致的。
总结
由上面的案例看出,Phoenix Client 的 ReplyMessageListener 帮用户实现了序列化/反序列化,消息关联等能力,又无须引入 Phoenix Framework。