跳到主要内容
版本:latest

Phoenix SDK 快速入门

Phoenix Client 基于 Phoenix SDK(消息通用协议定义)和 Spring Boot Starter 实现了简化的发布、订阅的功能,本文以案例的方式介绍 发布订阅 的使用方式

引入 Maven 依赖

<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-client-starter</artifactId>
<version>2.6.1</version>
</dependency>

发布-订阅通信

发布订阅通信案例中,PhoenixClient 发送的事件不期待同步获取响应结果,因此服务端不需要使用 ReplyMessageListener 来处理事件,而是使用 MessageListener 来处理事件。

在此案例中,使用 JSON 字符串作为事件内容,因此也不需要定义事件类。

应用配置

# Spring 相关, phoenix 的订阅基于 Spring Kafka 之上实现,因此需要读取 Spring Kafka 的配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: "phoenix-client-subscriber"
# 其他配置,无须配置序列化/反序列化
# Phoenix 相关配置, PhoenixClient 对于订阅和发布都是必备的,因为 ReplyMessageListener 会使用 PhoenixClient 来回复事件
quantex.phoenix.client:
name: ${spring.application.name}-client
mq:
type: kafka
address: ${spring.kafka.bootstrap-servers}
subscribe:
enabled: true

Producer 案例

@Autowired
private PhoenixClient client;

public void publish() {
// 1. 序列化事件为 JSON
String jsonPayload = JSON.toJSONString(xxxx);
// 2. 发送
client.sendNoReply(jsonPayload, "target_topic", UUID.randomUUID());
}

Consumer 案例

// 注册为 Spring Bean
@Component
public class CommandListener extends MessageListener {

// 单 Topic 订阅写法 ---- start
@Value("${phoenix.server.topic.account-event-pub}")
private String topic;
@Override
public String[] subscribeTopic() {
return new String[] {topic};
}
// 单 Topic 订阅写法 ---- end

// 消费逻辑
@Override
public void onMessage(Message message) {
String payload = message.getPayload();
log.info("received message: {}", payload);
}
}

请求-响应式通信

请求响应式通信案例中,PhoenixClient 发送的事件期待同步获取响应结果,因此服务端需要使用 ReplyMessageListener 来处理事件,并且订阅多个 Topic 的事件(通过 Spring Kafka 的能力)

请求响应案例中,规范化定义了事件类,每个事件类都需要使用注解 @CustomSerializer(serializerType = Serializers.Type.JSON) 来指定序列化方式。

除此之外,该案例也演示了,通过重写 consumerProperties() 方法,来覆盖 Spring Kafka 读取的配置,也可以重写 bootstrap.servers 来使用不同的 BrokerServers

应用配置

# Spring 相关, phoenix 的订阅基于 Spring Kafka 之上实现,因此需要读取 Spring Kafka 的配置
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: "phoenix-client-subscriber"
# 其他配置,无须配置序列化/反序列化
# Phoenix 相关配置, PhoenixClient 对于订阅和发布都是必备的,因为 ReplyMessageListener 会使用 PhoenixClient 来回复事件
quantex.phoenix.client:
name: ${spring.application.name}-client
mq:
type: kafka
address: ${spring.kafka.bootstrap-servers}
# 接收回复的 Topic
topic: ${spring.application.name}-client
subscribe:
enabled: true

事件定义


import java.io.Serializable;
import com.iquantex.phoenix.core.serialization.CustomSerializer;
import com.iquantex.phoenix.core.serialization.Serializers;

// 自定义为 JSON 序列化,默认为 Java
@CustomSerializer(serializerType = Serializers.Type.JSON)
public class Command implements Serializable {

private static final long serialVersionUID = -8951705463515914994L;

private final String id;

public Command(String id) {
this.id = id;
}
}

@CustomSerializer(serializerType = Serializers.Type.JSON)
public class Event implements Serializable {

private static final long serialVersionUID = -8951705463515914995L;

}

Producer 案例

@Autowired
private PhoenixClient client;

public void publish() {
// 1. 通过调用 **send()** 方法返回的 **Future** 对象的 **get()** 方法,同步接收请求结果。
Future<RpcResult<Event>> future = client.send(new Command("id"), "target_topic", UUID.randomUUID());
RpcResult<Event> rpcResult = future.get();
Event event = rpcResult.getData();

// 2. 通过调用 **sendSync()** 方法,可以直接返回 `RpcResult` 结果,这里是阻塞的, 因此需要提供超时时间.
RpcResult<Event> result = client.sendSync(new Command("id"), "target_topic", UUID.randomUUID(), Duration.ofMillis(10_000));
Event event = result.getData();

}

Consumer 案例

// 注册为 Spring Bean
@Component
public class CommandListener extends ReplyMessageListener<Event> {

// 多 Topic 订阅写法
@Value("#{'${phoenix.server.topic.account-event-pub}'.split(',')}")
private String[] topics;

@Override
public String[] subscribeTopic() {
return topics;
}

@Override
public Event handle(Message message) {
String payloadClassName = message.getPayloadClassName();
if (Command.class.getName().equals(payloadClassName)) {
Event eventObj = handleCmd(cmd);
return eventObj;
}
// 不匹配的事件,不产生结果,不会进行回复
return null;
}

@Override
public Properties consumerProperties() {
// 不使用 Kafka Consumer 配置,从 spring 配置读取其他配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return properties;
}
}

不使用 Spring Boot

Phoenix Client 也可以在非 Spring Boot 环境下使用,只需要手动初始化 PhoenixClient 即可。

生产者案例


public PhoenixClient createPhoenixClient(
Properties producerProperties, Properties consumerProperties) {

String bootstrapServers = "localhost:9092";
String replyTopic = "reply-topic";
String replyGroup = "reply-group";

Properties properties = new Properties();
properties.putAll(producerProperties);
properties.putAll(consumerProperties);
PhoenixKafkaClient kafkaClient =
new PhoenixKafkaClientImpl(
bootstrapServers,
replyTopic,
replyGroup,
properties,
new MatchingConfig());
PhoenixClient phoenixClient = new PhoenixClient(kafkaClient);
// 启动方法是必须的
phoenixClient.start();
return phoenixClient;
}

消费者案例


public class Subscribe implements Runnable {

private static final Logger log = LoggerFactory.getLogger(SdkSub.class);

@Override
public void run() throws Exception {
log.info("start init subscribe.");

String brokerAddr = "localhost:9092";
String subscribeTopic = "subscribeTopic"

Consumer<Message> consumer =
new KafkaTopicConsumer<>(
brokerAddr,
subscribeTopic,
"subscribe-group",
new Properties(),
new MessageDeserializer());
consumer.init();

log.info("subscriber init, start consuming.");

while (!Thread.interrupted()) {

List<Message> pull = consumer.pull(100);
if (pull != null && !pull.isEmpty()) {
for (Message message : pull) {
String payload = message.getPayload();
log.info(
"received message from topic: {} , content: {}",
subscribeTopic,
payload);
}
} else {
Thread.sleep(100);
}
}
}
}