目录
1 生产者 数据源
1.1. match-server 一启动 初始化数据 自动查询数据库 查询level2要展示的数据
1.2 match-server接收 前端发给Exchange-server的数据
2. 将查询/接受的数据EntrustOrder 转成 Order 解耦 过滤掉不要的属性
3.Order转成 OrderEvent
4. 分配序号发布数据到ringbuffer,管理序号和栅栏(前提是ringbuffer构建好了)
5 分配序号 发布数据到ringbuffer
5.1. 初始化数据源实现的代码 循环每一条数据转换 分配序号 发布数据到ringbuffer
5.2 前端数据源
1 生产者 数据源
1.1 match-server 一启动 初始化数据 自动查询数据库 查询level2要展示的数据
List<EntrustOrder> entrustOrders = entrustOrderMapper.selectList(
new LambdaQueryWrapper<EntrustOrder>()
.eq(EntrustOrder::getStatus, 0)
.orderByAsc(EntrustOrder::getCreated)
1.2 match-server接收 前端发给Exchange-server的数据
@StreamListener("order_in") // "order_in" 在 Sink中
public void handleMessage(EntrustOrder entrustOrder) { // 消息监听}
2. 将查询/接受的数据EntrustOrder 转成 Order 解耦 过滤掉不要的属性
public static Order entrustOrder2Order(EntrustOrder entrustOrder) {
Order order = new Order();
order.setOrderId(entrustOrder.getId().toString());
order.setPrice(entrustOrder.getPrice());
order.setAmount(entrustOrder.getVolume().subtract(entrustOrder.getDeal())); // 交易的数量= 总数量- 已经成交的数量
order.setSymbol(entrustOrder.getSymbol());
order.setOrderDirection(OrderDirection.getOrderDirection(entrustOrder.getType().intValue())); // 交易side
order.setTime(entrustOrder.getCreated().getTime());
return order ;
}
3.Order转成 OrderEvent
// 使用事件转换器的好处,1环形队列获取序号, 2拿到事件填充数据, 3再发布序号 省了从2给 事件填充数据 private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, Order>() { // Order 转化成 OrderEvent
// 现在上面实例化 然后 在下面 实现
/**
* Translate a data representation into fields set in given event 将数据表示转换为给定事件中设置的字段
* Params:参数
* event
* into which the data should be translated. 要转成什么data
* sequence 序列
* that is assigned to event. 分配给事件的序列。
* arg0
* The first user specified argument to the translator 转换器的第一个用户指定参数
*/
public void translateTo(OrderEvent event, long sequence, Order input) {
event.setSource(input);
} // setSource 是 source的 set方法
};
4. 分配序号发布数据到ringbuffer,管理序号和栅栏(前提是ringbuffer构建好了)
/**
* 我们使用DisruptorTemplate 时,就使用它的onData方法
* @param input
*
* public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
* {
* final long sequence = sequencer.next();
* translateAndPublish(translator, sequence, arg0);
* }
* private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
* {
* try
* {
* translator.translateTo(get(sequence), sequence, arg0);
* }
* finally
* {
* sequencer.publish(sequence);
* }
* }
*
* void publish(long sequence);
*
*/
public void onData(Order input) {
ringBuffer.publishEvent(TRANSLATOR, input); // 这个就是分配序号 然后发布数据到ringbuffer
}
5 分配序号 发布数据到ringbuffer
5.1. 初始化数据源 从数据查询数据 循环每一条数据转换 分配序号 发布数据到ringbuffer
for (EntrustOrder entrustOrder : entrustOrders) {
disruptorTemplate.onData(BeanUtils.entrustOrder2Order(entrustOrder)); // 往ringbuffer 中放 // BeanUtils.entrustOrder2Order 数据转换
} // BeanUtils.entrustOrder2Order(entrustOrder) 是数据转换 就是从entrustOrder 筛选出 Order 需要的属性数据
5.2 前端数据源 高频多生产者 每次一条数据
exchange-service 发送消息到match
disruptorTemplate.onData(BeanUtils.entrustOrder2Order(entrustOrder));
6 disruptorTemplate.onData 源码
onData
public void onData(Order input) {
ringBuffer.publishEvent(TRANSLATOR, input); // 这个就是分配序号 然后发布数据到ringbuffer
}
TRANSLATOR
// 先用 EventTranslatorOneArg方法做了个 TRANSLATOR 再调用publishEvent方法
private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, Order>() { // Order 转化成 OrderEvent
public void translateTo(OrderEvent event, long sequence, Order input) {
event.setSource(input);
} // setSource 是 source的 set方法
};
publishEvent
@Override
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
final long sequence = sequencer.next(); // 单生成者就是 但生产者中得next算法 1:43:28 // https://www.bilibili.com/video/BV1zM4y1L7Q9/?spm_id_from=333.337.search-card.all.click&vd_source=ff8b7f852278821525f11666b36f180a
translateAndPublish(translator, sequence, arg0);
}
translateAndPublish
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
{
try
{
translator.translateTo(get(sequence), sequence, arg0);
}
finally
{
sequencer.publish(sequence);
}
}
EventTranslatorOneArg
public interface EventTranslatorOneArg<T, A>
{
/**
* Translate a data representation into fields set in given event
*
* @param event into which the data should be translated.
* @param sequence that is assigned to event.
* @param arg0 The first user specified argument to the translator
*/
void translateTo(T event, long sequence, A arg0);
}
translateTo 自定义 还是调用?
private static final EventTranslatorOneArg<OrderEvent, Order> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, Order>() { // Order 转化成 OrderEvent
public void translateTo(OrderEvent event, long sequence, Order input) {
event.setSource(input);
} // setSource 是 source的 set方法
};
Sequencer
public interface Sequencer extends Cursored, Sequenced
{
}
Sequenced ->publish
public interface Sequenced
{
/**
* Publishes a sequence. Call when the event has been filled.
*
* @param sequence the sequence to be published.
*/
void publish(long sequence); // 操作: publish 序号
}