订阅与广播
订阅
功能介绍
在流/批计算场景,上游业务系统会把数据发送至消息队列。但并不是按聚合根用户定义的命令的协议,而是上游系统自己的协议。这时可以使用Phoenix的Subscribe功能,扩展订阅的功能,做协议转换,分发,广播等操作。
如上图所示,Subscribe是用户自定声明注入到Phoenix框架的(Phoenix目前提供默认的KafkaSubscribe),其功能如下:
- Subscribe可以控制Phoenix接收消息的并发粒度,以KafkaSubscribe为例,每个Partition则会生成一个ReceiverActor来复杂消息拉取。
- Subscribe提供SourceCollect接口,当框架拉下数据之后,会调用SourceCollect接口进行数据的反序列化以及转换操作(后面会讲解该接口)。
有了Subscribe,用户即可灵活的订阅各种Topic,并且自己完成协议的转换(上游协议转聚合根的命令),以及广播等操作。
Subscribe使用
用户不需要自己编写Subscribe,可以直接使用Phoenix提供的KafkaSubscribe
(将来也会支持更多的消息队列),使用时只需要构造该对象,启动时给到Phoenix即。
在phoenix-spring-boot-starter中,可以直接把KafkaSubscribe
丢给Spring容器即可。
@Configuration
public class PhoenixSubscribeConfig {
@Bean("customSubscribe")
public Subscribe customSubscribe() {
Properties properties = new Properties();
properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaSubscribe(
mqAddress, subscribeTopic, appName, properties, new SourceCollect());
}
}
KafkaSubscribe
可以通过自定义Properties
来设置Topic消费的额外配置,Properties是Map类型,Key和Value都是String类型,并且Key是ConsumerConfig类中的常量值,Value对应Kafka相关配置。
可以注意到,除了Kafka基本的配置之外,用户还需要提供一个SourceCollect
的实现。
KafkaSubscribe
默认会订阅 Topic
下的所有 Partition
, 但是用户也可以通过以下方式自定义订阅 Partition
@Configuration
public class CustomPartitionSubscribeConfig {
@Bean("customPartitionSubscribe")
public Subscribe customPartitionSubscribe() {
Properties properties = new Properties();
properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaPartitionSubscribe(
mqAddress, subscribeTopic, partitionNum, appName, properties, new SourceCollect());
}
/**
* 继承的 KafkaSubscribe 实现,重写 split()
*/
class KafkaPartitionSubscribe extends KafkaSubscribe {
// topic下partition的数量
private int partitionNum;
// 分隔符;注意有些字符不能当做分隔符使用,如"|",http请求无法从url中解析"|"
private static final String SEPARATOR = "_";
public KafkaPartitionSubscribe(String mqAddress, String topic, int partitionNum, String group,
Properties properties, SourceCollect sourceCollect) {
super(mqAddress, topic, group, properties, sourceCollect);
this.partitionNum = partitionNum;
}
@Override
public String getSplitRangeId(){
return super.getSplitRangeId().concat(SEPARATOR).concat(String.valueOf(partitionNum));
}
@Override
public List<SplitId> split() {
// 父类中则是通过 KafkaProducer 获取全部 PartitionNum, 然后生成同数量的 SplitID
return Arrays.asList(new SplitId(getSplitRangeId(), partitionNum));
}
}
}
SourceCollect使用
SourceCollect 是消息转换器,当 Phoenix 从上游拉取到消息之后,会调用SourceCollect来实现数据反序列化和数据转换操作。
用户可以自定义实现SourceCollect接口来实现上游数据到本集群命令的转换。如下案例所示:
- 根据records.getKey()获取上游放入的className。
- 如果匹配事件一致则进行反序列化(这里模拟了事件是JSON序列化的,实际应用当中应根据上游发送的协议进行反序列化)
- 根据上游事件内容构造本业务系统聚合根可以处理的命令并返回。
public class SelfSourceCollect implements SourceCollect {
@Override
public List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaDataQuery) {
List<CollectResult> collectResults = new ArrayList<>();
if (UpperAccountAllocateEvent.class.getName().equals(records.getKey())) {
// 反序列化上游事件
UpperAccountAllocateEvent upperAccountAllocateEvent = JsonUtil.decode(new String(records.getValue()), records.getKey());
// 根据上游事件要素构造出聚合根的cmd
Account.AccountAllocateCmd accountAllocateCmd =
Account.AccountAllocateCmd.newBuilder()
.setAccountCode(upperAccountAllocateEvent.getaccountCode())
.setAmt(upperAccountAllocateEvent.getAmt())
.setAllocateNumber(UUIDUtils.genUUID())
.build();
collectResults.add(new CollectResult(accountAllocateCmd, true));
}
return collectResults;
}
}
可以看到,Collect的返回体是List,如果上游事件内要素可以做到构造出多个不同聚合根的命令(聚合根id)不同,则可以返回多个命令,来让多个聚合根实例处理。
批量处理
对于一些复杂场景,必须通过 I/O 调用才能获取信息,Phoenix 在 SourceCollect 中提供了批量处理接口(默认关闭)。
假设在默认接口方法中执行 I/O 调用,会导致经典的 N + 1 的问题:一批 Kafka 消息(一次 I/O)的序列化,发生了 n 消息数量的 I/O 调用。 这会极大的影响 Phoenix 消息摄入的性能。
Phoenix Subscribe 的 MetaData 功能则无此问题,元数据会一批消息中一次拉取。
如要使用 SourceCollect 的批量处理能力, 则需要重写下面的两个方法。
在使用 batchCollect()
时,请根据业务决定是否按原始的消息顺序排序(原始消息 Records 为此新增了原始 Kafka 消息的 Offset 信息)
public interface SourceCollect {
/**
* 是否支持批量转换消息.
*
* @return
*/
default boolean supportBatchCollect() {
return false;
}
/**
* 批量转换消息. 使用批量消息转换时, 必须严格按照源数据集合的顺序处理. 为了帮助排序, {@link Records} 提供了 Kafka 消息的原始 Offset 信息.
*
* @param recordsList 源数据集合
* @param collectMetaDataQuery 转换注册源数据查询接口
* @return 消息实体列表
*/
default List<CollectResult> batchCollect(
List<Records> recordsList, CollectMetaDataQuery collectMetaDataQuery) {
return Collections.emptyList();
}
}
性能、死锁问题
Distributed-Data(分布式数据)的响应会经过 SourceCollect, 避免在 SourceCollect 中对 DData 发起同步调用,以避免发生业务上的死锁。
SourceCollect 是消息入口, 应避免/禁止在此发起 I/O 调用, 否则将会导致性能问题,如应用需要在流量入口对命令加工并该信息能通过聚合根提供, 请使用 Phoenix 的 MetaData 能力。
如:维护已处理聚合根的 ID 集合,在流量入口处过滤无效流量。
为消息附加元数据
SourceCollect 除了支持返回 Command
之外, 也支持返回 Message
对象,可以基于 MessageFactory
增加元数据信息,如更改 EventPublish 投递 Key,增加消息优先级信息等。
public class SelfSourceCollect implements SourceCollect {
@Override
public List<CollectResult> collect(Records records, CollectMetaDataQuery collectMetaDataQuery) {
List<CollectResult> collectResults = new ArrayList<>();
if (UpperAccountAllocateEvent.class.getName().equals(records.getKey())) {
// 反序列化上游事件
UpperAccountAllocateEvent upperAccountAllocateEvent = JsonUtil.decode(new String(records.getValue()), records.getKey());
// 根据上游事件要素构造出聚合根的cmd
Account.AccountAllocateCmd accountAllocateCmd =
Account.AccountAllocateCmd.newBuilder()
.setAccountCode(upperAccountAllocateEvent.getaccountCode())
.setAmt(upperAccountAllocateEvent.getAmt())
.setAllocateNumber(UUIDUtils.genUUID())
.build();
Message message = MessageFactory.getCmdMsg("send_topic", "reply_topic",
accountAllocateCmd);
message = message.withMessagePriority(MessagePriority.HIGH_PRIORITY);
collectResults.add(new CollectResult(message, true));
}
return collectResults;
}
}