Phoenix SDK 介绍
Phoenix SDK主要用于定义 Phoenix 的通信层协议(ProtoBuffers)和传输方式(Kafka),并提供了一个抽象的 Pub/Sub 接口。
介绍
在 2.7.x 版本中,我们对遗留的多个 Kafka 实现(KafkaServerConsumer、MQProducer、ClientProducer/Consumer)进行了整合,统一重构为 Producer<T>
和 Consumer<T>
,提供了更加统一的接口,
所有基于 Pub/Sub 的通信都会使用该接口,这让我们对其他 MQ 的支持更加便捷.
在 Pub/Sub 核心接口的定义中,我们参考 Kafka 的设计,Producer 总是异步回调式的,而 Consumer 总是同步阻塞拉取式的,这样的设 计可以让我们更高的吞吐量进行消息的发布,以背压的方式进行消息的消费, 因此 MQ 的实现选择需要支持缓冲能力的。
通常用户不需要使用这一层接口,而是使用 PhoenixClient 提供的接口,以及 PhoenixServer 提供的消费和 EventPublish 提供的发布能力。
Maven依赖
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-core</artifactId>
<version>2.6.1</version>
</dependency>
消息协议定义
Phoenix 内部的消息协议是基于 ProtoBuffers 定义的,我们提供了工厂类用于快速构建消息协议。
package com.iquantex.phoenix.core.message;
public abstract class MessageFactory {
/**
* 构建命令型消息
*
* @param dst 目的 Topic
* @param src 源 Topic(如果不希望被回复,可以设置为空)
* @param payload 消息体
* @param requestId 请求 ID(链路追踪使用)
* @return
*/
public static Message getCmdMsg(String dst, String src, Object payload, String requestId);
/**
* 构建响应/事件型消息(用于实体聚合根和事务聚合根回复请求或向下游推送)
*
* @param retCode 返回码
* @param retMessage 返回消息
* @param payload 消息体
* @param parent 命令消息
* @return
*/
public static Message getEventMsg(
RetCode retCode, String retMessage, Object payload, Message parent);
/**
* 构建异常的事件消息
*
* @param e 处理异常
* @param parent 命令消息
* @return
*/
public static Message getExceptionEventMsg(Exception e, Message parent);
/**
* 反序列化消息
*/
public static Message parse(byte[] bytes);
}
通信接口定义
/**
* 消费者接口,在 Phoenix 内部的所有消费者都基于此接口实现
*/
public interface Consumer<T> {
/** 初始化方法 */
void init();
/**
* 拉取消息,可指定超时时间(如果传入-1,当没有拉取到消息时直接返回),如果超过指定时间依旧没有拉取到消息,直接返回。
*
* @param timeout 超时时间,如果缓冲区中没有数据,则等待轮询所花费的时间(以毫秒为单位)。 如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。 不得为负。
* @return if not find, sleep 1ms return null
*/
default List<T> pull(int timeout) {
final List<T> result = new ArrayList<>();
pull(timeout, (result::addAll));
return result;
}
/**
* 拉取消息,可指定超时时间(如果传入-1,当没有拉取到消息时直接返回),如果超过指定时间依旧没有拉取到消息,直接返回。
* 支持传入回调函数,对该批次拉取到的消息进行处理,根据回调函数的处理情况手动提交offset(如果处理过程中遇到异常,设置offset为上一次提交的offset,否则提交最新的offset)
*
* @param timeout 超时时间,如果缓冲区中没有数据,则等待轮询所花费的时间(以毫秒为单位)。 如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。 不得为负。
* @param callback 拉到消息的回调函数
*/
void pull(int timeout, ConsumerCallback<T> callback);
/** 关闭消费者 */
void close();
/** current position */
default long getCurrentPosition() {
return Long.MIN_VALUE;
}
}
/**
* 生产者接口,在 Phoenix 内部的所有生产者都基于此接口实现
*/
public interface Producer<T> {
/** 初始化 */
void init();
/**
* 异步批量发送,内部处理掉发送失败的异常,并打印出错误详情。
*
* @param record
*/
default void send(T record) {
this.send(record, Callback.DEFAULT_CALLBACK);
}
/** 异步发送,支持回调. */
void send(T msg, Callback callback);
/** 关闭生产者 */
void close();
}
通信接口说明
从构造函数和功能上看,Phoenix 并不强烈推荐用户使用这一层接口,大部分情况下,用户应该使用 PhoenixClient 和 PhoenixFramework 提供的能力。
/**
* PhoenixCore 提供的基于 Kafka 协议实现的 Producer,在上层 PHX 封装了:
* - 预定义配置(基于二进制传输、retries、max.block.ms 等参数)
* - 序列化器同 Kafka 类似,PHX 提供了序列化器向调用方屏蔽序列化细节
* - 分区选择器,PHX 提供自定义的分区选择器
* 总体而言,PHX 没有在 KafkaProducer 上做太多的封装,而是提供了一些满足 PHX 使用需求的基本配置
*/
public KafkaProducer(
String serverAddr, // MQ 地址
Properties properties, // MQ 配置
Serializer<T> serializer, // 序列化器,PHX 提供了传输二进制的:RecordsSerializer 以及传输 Phoenix 内部消息协议格式的 MessageSerializer
KafkaProducerFactory factory, // KafkaProducer 工厂,PHX 提供了:ApacheKafkaProducerFactory 和支持 Chunked 传输的 LinkedKafkaProducerFactory 以供选择
PartitionSelector partitionSelector // 分区选择器,PHX 提供了默认的 Kafka 原生的 OriginPartitionSelector 以及支持根据 Key 哈希的 KeyHashSelector
);
/**
* PhoenixCore 提供的基于 Kafka 协议实现的主题级别订阅的 Consumer(非线程安全),在上层 PHX 封装了:
* - 初始化获取 Topic 的分区信息
* - 预定义配置(基于二进制传输、auto.reset.offset、手动提交等参数)
* - 反序列化器同 Kafka 类似,PHX 提供了反序列化器向调用方屏蔽反序列化细节
* - 回调重试机制,PHX 提供了重试一次的机制(注意,如果希望幂等,则需要在业务捕获异常,并手动重试)
* - 消费进度指标
*/
public KafkaTopicConsumer(
String serverAddress, // MQ 地址
String topic, // 订阅 Topic
String group, // 订阅组
Properties properties, // MQ 配置
Deserializer<T> deserializer, // 反序列化器
KafkaConsumerFactory factory // KafkaConsumer 工厂,PHX 提供了:ApacheKafkaConsumerFactory 和支持 Chunked 传输的 LinkedKafkaConsumerFactory 以供选择
);
/**
* PhoenixCore 提供的基于 Kafka 协议实现的分区级别订阅的 Consumer(非线程安全),在上层 PHX 封装了:
* - 初始化获取分区的 Offset 信息
* - 预定义配置(基于二进制传输、auto.reset.offset、手动提交等参数)
* - 反序列化器同 Kafka 类似,PHX 提供了反序列化器向调用方屏蔽反序列化细节
* - 回调重试机制,PHX 提供了重试一次的机制(注意,如果希望幂等,则需要在业务捕获异常,并手动重试)
* - 消费进度指标
* 与主题级别订阅基本类似,但是分区级别订阅可以更加精细的控制顺序等
*/
public KafkaPartitionConsumer(
String serverAddress, // MQ 地址
String topic, // 订阅 Topic
int partitionId, // 分区 ID
String group, // 订阅组
Properties properties, // MQ 配置
Deserializer<T> deserializer, // 反序列化器
KafkaConsumerFactory factory // KafkaConsumer 工厂,PHX 提供了:ApacheKafkaConsumerFactory 和支持 Chunked 传输的 LinkedKafkaConsumerFactory 以供选择
);
Kafka 序列化器
如果用户想要直接使用 Apache Kafka Clients 或其他等同的 Kafka Clients,而不是 Phoenix 提供的通信接口封装,那么用户需要使用 Phoenix 提供的序列化器。
- 序列化器:
com.iquantex.phoenix.core.connect.kafka.serialization.PhoenixKafkaSerializer
- 反序列化器:
com.iquantex.phoenix.core.connect.kafka.serialization.PhoenixKafkaDeserializer
:::tips 建议 我们推荐用户使用 PhoenixClient,因为除了基础的 Pub/Sub 能力外,Message 的众多元数据的规范定义是一个比较繁琐的事情. :::