跳到主要内容
版本:latest

流行度分析

了解更多Phoenix应用场景,我们可以单独实现命令查询职责分离(CQRS)的Q端,通过Q端能够更迅速的查询数据。

image

基于酒店预订服务的示例,我们将增加实现酒店房型关注度排行,被预订最多次的商品被标记为最流行的商品,对流行商品进行排序、分析。

在此页面上,您将学习如何:

Source downloads

我们将示例源码分成了四个分支,您能够前往仓库构建并运行当前功能:流行度分析

maven依赖

本案例实现酒店房间的流行度分析功能,基于预订服务工程中的依赖,还需要添加以下依赖:

<dependency>
<groupId>com.iquantex</groupId>
<artifactId>phoenix-event-publish-starter</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

应用配置

本案例实现酒店房间的流行度分析功能,基于预订服务工程中的配置,还需要添加以下配置:

spring:
datasource:
url: jdbc:h2:file:./data/test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=FALSE;INIT=CREATE SCHEMA IF NOT EXISTS PUBLIC
username: sa
password:
driver-class-name: org.h2.Driver
jpa:
hibernate:
ddl-auto: update
naming:
physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy
database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
show-sql: false

quantex:
phoenix:
event-publish:
event-task:
enabled: true
topic: hotel-event-pub

Spring Data JPA的使用

Spring Data JPA是一款对象关系映射(ORM)框架,我们使用 JPA 创建数据库存储流行度排行信息。以上依赖和配置已经给出,下面就是JPA的使用:

  • 定义 JPA 接口。
  • 定义 model 类。

接口:

public interface BookingsStoreRepository extends CrudRepository<BookingStore, String> {}

model:

@Entity
@Data
@Builder
@Table(name = "BOOKING_STORE")
@AllArgsConstructor
@NoArgsConstructor
public class BookingStore implements Serializable {

@Id
private String roomType;

private int bookingsCount;

}

这个接口会实现自动配置,我们要使用它存储或者查询时,只需要调用它的 API 即可。

EventPublish的Handle

如果您不了解事件发布Event-Publish 配置,请先阅读文档。

开启 eventPublish 后,事件将发布到指定消息队列中,调用此队列的可以是其他服务、Elasticsearch等。

我们使用eventPublish提供的拦截功能,对预订房间的事件进行重新handle,并使用数据库进行持久化,实现酒店房间类型的流行度分析功能。

@Component
public class PopPublishHandler implements EventHandler<Phoenix.Message, Phoenix.Message> {

@Autowired
private BookingsStoreRepository repository;

/** 使用提供的默认反序列化器,反序列化MQ中的字节数组,得到以Message封装的领域事件 */
private EventDeserializer<byte[], Message> deserializer = new DefaultMessageDeserializer();

@Override
public String getInfo() {
return null;
}

@Override
public CommittableEventBatchWrapper handleBatch(CommittableEventBatchWrapper batchWrapper) {
List<EventStoreRecord<Phoenix.Message>> events = batchWrapper.getEvents();
Iterator<EventStoreRecord<Phoenix.Message>> iterator = events.iterator();
while (iterator.hasNext()) {
Message message = deserializer.deserialize(iterator.next().getContent().toByteArray());
if (message.getPayload() instanceof HotelCreateEvent) {
String roomType = ((HotelCreateEvent) message.getPayload()).getRestType();
try {
BookingStore bookingStore = repository.findById(roomType).get();
repository.save(BookingStore.builder().roomType(roomType)
.bookingsCount(bookingStore.getBookingsCount() + 1).build());
}
catch (NoSuchElementException e) {
// 获取不到数据时,get()抛出异常
repository.save(BookingStore.builder().roomType(roomType).bookingsCount(1).build());
}
}
else if (message.getPayload() instanceof HotelCancelEvent) {
String roomType = ((HotelCancelEvent) message.getPayload()).getSubNumber().split("@")[0];
BookingStore bookingStore = repository.findById(roomType).get();
if (bookingStore.getBookingsCount() == 1) {
repository.delete(bookingStore);
}
else {
repository.save(BookingStore.builder().roomType(roomType)
.bookingsCount(bookingStore.getBookingsCount() - 1).build());
}
}
}
return batchWrapper;
}

@Override
public int getOrder() {
return 0;
}

}

本案例中我们只使用了事件的拦截功能,如果您想了解更多,请阅读订阅事件

服务调用

这里的服务调用只实现一个查询的逻辑,我们能直接从数据存储端快速获取数据,实现事件写入和读取的分离。

@Slf4j
@RestController
@RequestMapping("hotel")
public class ShoppingController {

// other

@Autowired
private BookingsStoreRepository repository;

// other

@GetMapping("/queryPop")
public String queryRestRoom() {
try {
Map<String, Integer> map = new HashMap<>();
repository.findAll()
.forEach(bookingStore -> map.put(bookingStore.getRoomType(), bookingStore.getBookingsCount()));
return new ObjectMapper().writeValueAsString(ConvertUtil.Map2Map(ConvertUtil.sortMap(map)));
}
catch (JsonProcessingException e) {
return "query fail: " + e.getMessage();
}
}
}

数据监控

通过Phoenix的事件发布功能,框架本身还支持了对消息的监控,介绍说明:Phoenix ElasticSearch

集成测试

  1. 启动服务
# 启动根目录下的 boot.sh 脚本, 指定服务名, Web 端口
sh boot.sh hotel-server 8080
  1. 查询流行度排行榜
curl http://127.0.0.1:8080/hotel/queryPop

返回:

{"情侣套房":3,"大床房":2,"总统套房":1,"标准间":1}