分布式数据
API May change
此部分的 API 还处于试验阶段, 可能在后续的版本中会发生变更!
功能介绍
当需要在 Phoenix 集群中的多个节点间使用共享数据时,可以使用 Phoenix 提供的分布式数据功能。
- 该功能提供了一类特殊的聚合根用来专门维护所有的分布式数据。默认 API 基于 Multi 模式。
- Multi 模式下, 具体的分布式数据类的
Class.SimpleName
作为聚合根 ID, 单个聚合根维护该类下的所有分布式数据. - Single 模式下,
Class.SimpleName
+ddataCode
为聚合根 ID, 单独一个聚合根维护分布式数据.
- Multi 模式下, 具体的分布式数据类的
- 该功能提供了一系列的 API 供 Phoenix 服务对分布式数据进行 新增/更新/查询/订阅/取消订阅 等操作,并支持值过期功能。
- 分布式数据提供了
ExtendDDataSource
接口, 在值过期或初始化时, 通过该接口获取外部数据更新自身. - 该功能提供默认的 Event-Publish 任务 ddata-task (该任务默认开启, 可通过
quantex.phoenix.event-publish.enable-ddata-task=false
进行关闭), 当更新分布式数据时, ddata-task 会将分布式数据更新事件转换为分布式数据变更命令, 同时根据该分布式数据与业务聚合根之间的订阅关系, 将分布式数据变更命令进行多播。
API 介绍
分布式数据主要分为客户端,和基于 Event-Publish 实现数据更新的服务端两个部分。
客户端 API
客户端层面的 API 分为两个部分:
- Phoenix-Client:轻量客户端 API,没有服务端组件,用于外部向 Phoenix 服务端发起请求
- Phoenix-Distributed-Data: 完整的客户端 API,用于聚合根向分布式数据发起请求
两者的共同点是都实现了 DistributedDataRemote
接口. 该接口是与分布式数据服务端通信的工具, 可以利用该接口 API 方便的进 行分布式数据的维护.该对象不支持序列化.
提示
删除分布式数据可以通过在 addDData 方法中传入值为 null
的方式实现.
public interface DistributedDataRemote {
/**
* 新增单个分布式数据, 由一个聚合根维护
*
* @param ddataCls
* @param ddataCode
* @param value
* @param targetTopic
*/
<T> boolean addSingleDData(
Class<T> ddataCls, String ddataCode, Object value, String targetTopic);
/**
* 新增带过期时间的单个分布式数据, 由一个聚合根维护
*
* @param ddataCls
* @param ddataCode
* @param value
* @param targetTopic
* @param expireTime 过期时间
*/
<T> boolean addSingleDData(
Class<T> ddataCls, String ddataCode, Object value, String targetTopic, Duration expireTime);
/**
* 新增分布式数据
*
* @param ddataCls
* @param ddataCode
* @param value
* @param targetTopic
*/
<T> boolean addDData(Class<T> ddataCls, String ddataCode, Object value, String targetTopic);
/**
* 新增带过期时间的分布式数据
*
* @param ddataCls
* @param ddataCode
* @param value
* @param targetTopic
* @param expireTime 过期时间
*/
<T> boolean addDData(
Class<T> ddataCls, String ddataCode, Object value, String targetTopic,
Duration expireTime);
/**
* 单查询单个聚合根的分布式对象,不维护订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param <T>
* @return
*/
<T> DData<T> querySingleDData(Class<T> ddataCls, String ddataCode, String targetTopic);
/**
* 单查询分布式对象,不维护订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param <T>
* @return
*/
<T> DData<T> queryDData(Class<T> ddataCls, String ddataCode, String targetTopic);
/**
* 调用远程方式获取单个聚合根的分布式数据并建立订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param selfTopic
* @param <T>
* @return
*/
<T> DData<T> subscribeSingle(
Class<T> ddataCls, String ddataCode, String targetTopic, String selfTopic);
/**
* 解绑某一订阅者和某一具体单个聚合根的分布式数据之间的订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param selfTopic
*/
<T> boolean unsubscribeSingle(
Class<T> ddataCls, String ddataCode, String targetTopic, String selfTopic);
/**
* 调用远程方式获取分布式数据并建立订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param selfTopic
* @param <T>
* @return
*/
<T> DData<T> subscribe(
Class<T> ddataCls, String ddataCode, String targetTopic, String selfTopic);
/**
* 解绑某一订阅者和某一具体的分布式数据之间的订阅关系
*
* @param ddataCls
* @param ddataCode
* @param targetTopic
* @param selfTopic
*/
<T> boolean unsubscribe(
Class<T> ddataCls, String ddataCode, String targetTopic, String selfTopic);
/**
* 清除该类下所有的分布式数据, 仅作用于 MultiMode
*
* @param ddataCls
* @param targetTopic
*/
<T> boolean clear(Class<T> ddataCls, String targetTopic);
}
Phoenix-Client API
Phoenix-Client 包下的分布式数据客户端 API 为 ClientDistributedDataRemote, 与服务端不同的是不包含订阅功能(聚合根订阅分布式数据的更新)。
DistributedDataRemote clientDData = new ClientDistributedDataRemote(phoenixClient, javaTimeClock);
// 支持的方法
clientDData.addDData(DData.class, ddataCode, value, targetTopic);
clientDData.addDData(DData.class, ddataCode, value, targetTopic, expireTime);
clientDData.addSingleDData(DData.class, ddataCode, value, targetTopic);
clientDData.addSingleDData(DData.class, ddataCode, value, targetTopic, expireTime);
clientDData.queryDData(DData.class, ddataCode, targetTopic);
clientDData.querySingleDData(DData.class, ddataCode, targetTopic);
clientDData.clear();
// 不支持的方法 UnsupportedOperationException
clientDData.subscribe();
clientDData.unsubscribe();
clientDData.subscribeSingle();
clientDData.unsubscribeSingle();
Phoenix-Distributed-Data
分布书数据服务端则提供两种 API:DDataMap
和 DistributedDataRemote
- DistributedDataRemote: 与分布式数据服务端通信的工具,可以利用该API方便的进行分布式数据的维护.该对象不支持序列化.
- DDataMap: 屏蔽与通信细节,提供一个Map结构的使用方式,支持序列化,可以再聚合根当中当做普通对象使用.
public interface DistributeDataManager {
String NAME = "distributeDataManager";
/**
* 获取分布式远程通信工具
*
* @return
*/
DistributedDataRemote getDistributedDataRemote();
/**
* 创建一个map结构的分布式数据
*
* @param cls
* @param targetTopic
* @param selfTopic
* @param <VALUE>
* @return
*/
<VALUE> DDataMap<VALUE> create(Class<VALUE> cls, String targetTopic, String selfTopic);
}
更新分布式数据
服务端的更新功能,可以通过参数控制。
quantex:
phoenix:
event-publish:
enable-ddata-task: true