流行度分析
了解更多Phoenix应用场景,我们可以单独实现命令查询职责分离(CQRS)的Q端,通过Q端能够更迅速的查询数据。

基于酒店预订服务的示例,我们将增加实现酒店房型关注度排行,被预订最多次的商品被标记为最流行的商品,对流行商品进行排序、分析。
在此页面上,您将学习如何:
Source downloads
我们将示例源码分成了四个分支,您能够前往仓库构建并运行当前功能:流行度分析
maven依赖
本案例实现酒店房间的流行度分析功能,基于预订服务工程中的依赖,还需要添加以下依赖:
<dependency>
    <groupId>com.iquantex</groupId>
    <artifactId>phoenix-event-publish-starter</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
应用配置
本案例实现酒店房间的流行度分析功能,基于预订服务工程中的配置,还需要添加以下配置:
spring:
  datasource:
    url: jdbc:h2:file:./data/test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=FALSE;INIT=CREATE SCHEMA IF NOT EXISTS PUBLIC
    username: sa
    password:
    driver-class-name: org.h2.Driver
  jpa:
    hibernate:
      ddl-auto: update
      naming:
        physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy
    database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
    show-sql: false
quantex:
  phoenix:
    event-publish:
      event-task:
        enabled: true
        topic: hotel-event-pub
Spring Data JPA的使用
Spring Data JPA是一款对象关系映射(ORM)框架,我们使用 JPA 创建数据库存储流行度排行信息。以上依赖和配置已经给出,下面就是JPA的使用:
- 定义 JPA 接口。
 - 定义 model 类。
 
接口:
public interface BookingsStoreRepository extends CrudRepository<BookingStore, String> {}
model:
@Entity
@Data
@Builder
@Table(name = "BOOKING_STORE")
@AllArgsConstructor
@NoArgsConstructor
public class BookingStore implements Serializable {
	@Id
	private String roomType;
	private int bookingsCount;
}
这个接口会实现自动配置,我们要使用它存储或者查询时,只需要调用它的 API 即可。
EventPublish的Handle
如果您不了解事件发布和 Event-Publish 配置,请先阅读文档。
开启 eventPublish 后,事件将发布到指定消息队列中,调用此队列的可以是其他服务、Elasticsearch等。
我们使用eventPublish提供的拦截功能,对预订房间的事件进行重新handle,并使用数据库进行持久化,实现酒店房间类型的流行度分析功能。
@Component
public class PopPublishHandler implements EventHandler<Phoenix.Message, Phoenix.Message> {
	@Autowired
	private BookingsStoreRepository repository;
	/** 使用提供的默认反序列化器,反序列化MQ中的字节数组,得到以Message封装的领域事件 */
	private EventDeserializer<byte[], Message> deserializer = new DefaultMessageDeserializer();
	@Override
	public String getInfo() {
		return null;
	}
	@Override
	public CommittableEventBatchWrapper handleBatch(CommittableEventBatchWrapper batchWrapper) {
		List<EventStoreRecord<Phoenix.Message>> events = batchWrapper.getEvents();
		Iterator<EventStoreRecord<Phoenix.Message>> iterator = events.iterator();
		while (iterator.hasNext()) {
			Message message = deserializer.deserialize(iterator.next().getContent().toByteArray());
			if (message.getPayload() instanceof HotelCreateEvent) {
				String roomType = ((HotelCreateEvent) message.getPayload()).getRestType();
				try {
					BookingStore bookingStore = repository.findById(roomType).get();
					repository.save(BookingStore.builder().roomType(roomType)
							.bookingsCount(bookingStore.getBookingsCount() + 1).build());
				}
				catch (NoSuchElementException e) {
					// 获取不到数据时,get()抛出异常
					repository.save(BookingStore.builder().roomType(roomType).bookingsCount(1).build());
				}
			}
			else if (message.getPayload() instanceof HotelCancelEvent) {
				String roomType = ((HotelCancelEvent) message.getPayload()).getSubNumber().split("@")[0];
				BookingStore bookingStore = repository.findById(roomType).get();
				if (bookingStore.getBookingsCount() == 1) {
					repository.delete(bookingStore);
				}
				else {
					repository.save(BookingStore.builder().roomType(roomType)
							.bookingsCount(bookingStore.getBookingsCount() - 1).build());
				}
			}
		}
		return batchWrapper;
	}
	@Override
	public int getOrder() {
		return 0;
	}
}
本案例中我们只使用了事件的拦截功能,如果您想了解更多,请阅读订阅事件
服务调用
这里的服务调用只实现一个查询的逻辑,我们能直接从数据存储端快速获取数据,实现事件写入和读取的分离。
@Slf4j
@RestController
@RequestMapping("hotel")
public class ShoppingController {
    
    // other
	@Autowired
    private BookingsStoreRepository repository;
	
	// other
	@GetMapping("/queryPop")
    public String queryRestRoom() {
        try {
            Map<String, Integer> map = new HashMap<>();
            repository.findAll()
                    .forEach(bookingStore -> map.put(bookingStore.getRoomType(), bookingStore.getBookingsCount()));
            return new ObjectMapper().writeValueAsString(ConvertUtil.Map2Map(ConvertUtil.sortMap(map)));
        }
        catch (JsonProcessingException e) {
            return "query fail: " + e.getMessage();
        }
    }
}
数据监控
通过Phoenix的事件发布功能,框架本身还支持了对消息的监控,介绍说明:Phoenix ElasticSearch。
集成测试
- 启动服务
 
# 启动根目录下的 boot.sh 脚本, 指定服务名, Web 端口
sh  boot.sh hotel-server 8080
- 查询流行度排行榜
 
curl http://127.0.0.1:8080/hotel/queryPop
返回:
{"情侣套房":3,"大床房":2,"总统套房":1,"标准间":1}