Kafka 扩展
本文档分为两个部分(都是对 Phoenix-Kafka 能力做的一些扩展):
- Kafka 增强扩展:让 Phoenix Kafka Client 支持 LargeMessage 的消息发送
- Kafka 分区选择器:为 Phoenix 内部模块消息发送(Producer)定义发送的分区选择策略
一. Kafka 增强扩展
这里的 Kafka 增强扩展只是 Phoenix 对 KafkaClient 的预留了扩展接口, 内置了 Linkedin 的 KafkaClient 作为 Apache KafkaClient 的增强扩展
简介
Phoenix 框架为了解决 Kafka 消息传输中的 LargeMessage 问题,提供/预留了可配置 KafkaClient 增强扩展,其中一种解决方案是 Chunk Message,在探索之后本着不重复造轮子的原则,Phoenix 内置集成了支持 LargeMessage 的 Linkedin Kafka Client
当然,通过更改配置后,Kafka 也支持超过默认大小的消息,Linkedin(原 Kafka 开发者)做了相关的 Benchmark.
但是不优化的情况下,Large messages 可能会对 Broker 和 JVM 造成影响,然后拖慢 JVM 速度
当然,除了 Phoenix 的方案外,Kafka Large Message 也有其他解决方案,本文档不具体展开:
- Kafka 和外部存储中是用基于引用的消息传递(Reference-based)
- 在 KafkaClient 中使用无需外部存储的内置 Large Message 支持(Phoenix 所用)
- 内置 Large Message 支持 + 分层存储(tiered storage)
使用场景
Kafka 扩展(主要是 Large Message 的支持)需要双端使用统一客户端(Consumer、Producer), 目前识别到的场景有:
关于 CLIENT_CONSUMER 等 Phoenix 模块配置在 com.iquantex.phoenix.core.connect.kafka.PhoenixKafkaClientModule 中定义
- 当聚合根发出/回复的事件大小超出 Kafka 限制时:
CLIENT_CONSUMER <-> SERVER_PRODUCER
(用于PhoenixClient RPC 模式下响应的接收) - 当 EventPublish 发出的事件大小超出 Kafka 限制时:
EVENT_PUBLISH_PRODUCER <-> spring.consumer
(用 spring kafka 外部订阅) - 当 PhoenixClient 发出的命令大于 Kafka 限制时:
CLIENT_PRODUCER <-> SERVER_CONSUMER
(用于PhoenixClient RPC、异步模式) - 当 KafkaTemplate(spring kafka 客户端)发出的命令大于 Kafka 限制时:
spring.producer <-> SERVER_CONSUMER
(外部发送给 Phoenix 订阅、也可以是用户的自定义订阅) - 不使用 Phoenix,仅使用 Kafka 扩展:
spring.producer <-> spring.consumer
使用说明
Phoenix Kafka Extend 目前仅支持 K/V 为 <String,byte[]>
的序列化和反序列化. 如果需要支持 <String,String>
,请传入时使用 String.getBytes()
或提前将 Value 序列化
0. 引入 Kafka Extend 依赖
请注意,phoenix-kafka-extend
依赖 Phoenix 核心 API 包 phoenix-core
<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-kafka-extend-starter</artifactId>
<version>2.6.0</version>
</dependency>
1. Phoenix Client、内部订阅等
当您在 Kubernetes 环境中通过环境变量配置 spring-kafka-properties
时,可以用下面这种格式:
QUANTEX_PHOENIX_KAFKA_EXTEND_SPRING-KAFKA-PROPERTIES_BOOTSTRAP_SERVERS
也可以使用 QUANTEX_PHOENIX_KAFKA_EXTEND_SPRING_KAFKA_PROPERTIES_BOOTSTRAP_SERVERS
默认开启配置后,Phoenix 会自动将 Apache Kafka Client 替换成 Linkedin Kafka Clien
quantex:
phoenix:
kafka:
extend:
# 是否开启 Phoenix Kafka Extend
enabled: true
# 是否让 Server 端对 Client 的回复使用 Kafka 增强 Client (RPC 场景,需要同时开启 client-consumer)
server-producer-enabled: true
# 是否让 Client 端接收 Server 的响应使用 Kafka 增强 Client(RPC 场景,需要同时开启 server-producer)
client-consumer-enabled: true
# 是否让 Client 端发送给 Server 的命令使用 Kafka 增强 Client(默认消息入口场景,需要同时开启 server-consumer)
client-producer-enabled: true
# 是否让 Server 端接收 MQ 订阅使用 Kafka 增强 Client(默认订阅场景,需要同时开启 client-producer 或接收的外部 Client 支持)
server-consumer-enabled: true
# 是否让 EventPublish 端发送给外部的消息使用 Kafka 增强 Client(需要外部的 KafkaConsumer 支持增强 Client,可以用 Phoenix-Spring 的集成)
event-publish-producer-enabled: true
# 是否开启 Spring Consumer 的 Kafka 增强 Client,Phoenix 会注册一个 ConsumerFactory Bean
spring-consumer-enabled: true
# 是否开启 Spring Producer 的 Kafka 增强 Client,Phoenix 会注册一个 ProducerFactory Bean
spring-producer-enabled: true
# Spring 的 Kafka 增强 Client 的配置
spring-kafka-properties:
"bootstrap.servers": 127.0.0.1:9092,127.0.0.1:9092,127.0.0.1:9092
# Linkedin Kafka 配置
# Producer: 普通消息的最大大小,超出则开始切分. 默认 50kb
max-message-bytes: 51200
# Consumer: 用于缓冲不完整大消息段的内存总大小上限(默认是 30mb)
message-buffer-capacity: 32000000
# Consumer: LargeMessage 起始 Offset 和当前 Offset 最大差距值,超出则认为 Large Message 已不完整。默认 1000
message-expiration-offset-gap: 1000
# Consumer: 每个分区要跟踪 Offset 的最大消息数,默认 500
max-offset-track-pre-partition: 500
2. 自定义订阅
当 KafkaExtend 用于扩展时,从 Kafka 接收的消息体积可能会大于 256KB,此时会超出 Phoenix 内部 akka 的限制,如果需要让大于 256KB 的消息发能够
发送到聚合根,则需要更改配置, 如更改为 4MB 的相关配置为:(使用 quantex.phoenix.akka.akka-conf
自定义 akka 配置)
akka.remote.artery.advanced.maximum-frame-size= 4096 KiB
当开启 Kafka Extend 后,Phoenix 会向 Spring 注册一个 com.iquantex.phoenix.core.connect.kafka.KafkaClientProvider
的 SpringBean.
用户可以拿到该 Bean 并注册到 KafkaSubscribe 中:
@Bean
public Subscribe customSubscribe(KafkaClientProvider clientProvider) {
Properties properties = new Properties();
properties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new KafkaSubscribe(
mqAddress,
subscribeTopic,
appName,
properties,
new SelfDeseriSchema(),
clientProvider.getConsumer(PhoenixKafkaClientModule.SERVER_CONSUMER)); // 是用和 SERVER_CONSUMER 一样的配置
// 假设用户不希望 Server 订阅使用 Kafka Extend,仅对自己的某个自定义订阅需要,则可以是用默认的增强 Consumer
ConsumerFactory factory = clientProvider.getConsumer(PhoenixKafkaClientModule.ENHANCE_PRODUCER);
}
3. Spring Kafka Producer 集成
当用户想要在 Phoenix 外部(其他服务)使用 Phoenix Kafka Extend 时,通过开关,Phoenix 会向 Spring 注册 KafkaClient Factory 的 SpringBean,用户只需要修改 Spring Kafka Client 的配置即可。
/**
* 在注册 KafkaTemplate Bean 时,从 Spring IOC 中取出名为:phoenix-kafka-producer-extend 的 Spring 接口 ProducerFactory 的实现实例. 然后作为 KafkaTemplate 的参数传入 KafkaTemplate
*/
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(
@Qualifier(value = "phoenix-kafka-producer-extend")
ProducerFactory<String, byte[]> producerFactory) {
return new KafkaTemplate<String, byte[]>(producerFactory);
}
4. Spring Kafka Consumer 集成
当用户想要在 Phoenix 外部(其他服务)使用 Phoenix Kafka Extend 时,通过开关,Phoenix 会向 Spring 注册 KafkaClient Factory 的 SpringBean,用户只需要修改 Spring Kafka Client 的配置即可。
/**
* 同理,只需要更改 ConcurrentKafkaListenerContainerFactory 的 ConsumerFactory 即可.
* Phoenix 会注入名为 phoenix-kafka-consumer-extend 的 Spring 接口 ConsumerFactory 的实现实例
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier(value = "phoenix-kafka-consumer-extend")
ConsumerFactory<String, byte[]> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
内部处理
本文档只做简单介绍,详细请参考:https://github.com/linkedin/li-apache-kafka-clients
1. Offset
Kafka Extend 在处理 Large Message 时,会以最后一个 Segment 的 Offset 作为消息的 Offset
2. Offset Tracking
Kafka Extend 会在 Records 的 Header 中添加 Large Message 的信息,并在 Client 内部维护,当 Msg1 的消息传输发生问题时,用户可以从 Msg1 的第一个 Segment 开始消费(seek)
3. Rebalanced
当 Msg1 未被交付给 User 时,此时重平衡后从 offset=0 开始消费
当 Msg1 成功交付给 User 之后,Commit 中附加 Header 带上 delivered=2 的信息,并在重平衡后从 offset=3 开始消费并交付给 User
4. 内存开销
- 生产者:除了消息切分(splitter)和消息拷贝外,没有额外的内存开销
- 消费者
buffer.capacity
:消费者会缓存固定长度的消息expiration.offset.gap
:当 Offset 和 Large Message 的起始 Offset 超出一个差值,则认为该消息不完全,KafkaExtend 会丢弃该消息