案例驱动学disruptor框架
❝disruptor框架:通过填充缓冲行,消除了CPU的伪共享。它的模型 类似于生产者-消费者模型,我们可以类比JUC中的ArrayBlockingQueue。
❞
对于disruptor而言,业务逻辑是完全运行在内存中的,它使用事件源驱动的方式,为事件预分配内存,避免了运行时因垃圾回收以及内存分配产生增加额外的开销,从而影响性能。
使用disruptor框架有以下几个步骤:
首先需要定义一个event类,用于创建Event类实例对象 还需要有个监听事件类,用于处理数据event类,可以理解成是事件的消费者 编写生产者组件向disruptor容器中去投递数据(这里的数据就是事件) 最终需要实例化一个disruptor实例,配置参数,编写disruptor核心组件,运行代码。
我们根据这些步骤,结合代码详细看一下如何使用disruptor框架。
定义事件
首先需要定义事件,这里简单的给一个OrderEvent
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private long value;
}
二、提供一个事件工厂。disruptor需要调用该事件工厂,实例化出对应的空对象(无属性)
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
// new 一个空的orderEvent对象即可
// 就是为了返回空的event对象
return new OrderEvent();
}
}
可以看出,自定义的事件工厂需要实现EventFactory接口,这样disruptor框架就可以回调该接口以实例化对应的事件对象
三、编写EventHandler实现类,它本质上可以理解为是对应业务的消费者,即:对事件的具体执行逻辑
public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消费者: receive->" + event);
}
}
❝在onEvent回调方法中,实现具体的消费逻辑,这里的Event对象已经是具体的业务对象了,是赋值过属性的,那么属性是如何被赋值的呢?
❞
四、既然已经有了消费者,那么肯定会有对应的生产者
❝编写一个生产者类,持有一个RingBuffer引用,RingBuffer是disruptor中的一个核心数据结构,它事实上保存了实践对象,可以把它看做线程池中的阻塞队列(先简单这么理解,后面会深入解释)
❞
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
System.out.println("生产者: sendData->" + data.getLong(0));
// 1. 在生产者发送消息的时候 首先需要从我们的RingBuffer中获取一个可用的序号
long sequence = this.ringBuffer.next();
try {
// 2. 根据这个序号找到具体的元素 此时获取到的event是一个空对象(属性未赋值)
OrderEvent orderEvent = this.ringBuffer.get(sequence);
// 3. 进行实际赋值处理即可
orderEvent.setValue(data.getLong(0));
} finally {
// 4. 提交发布操作
this.ringBuffer.publish(sequence);
}
}
}
通过构造方法(实际上别的方式也可以,比如setter传参或者SpringBean的注入等方式)将RingBuffer引入传递给成员变量 sendData方法接收一个ByteBuffer对象,实际上可以是任意的对象 调用 this.ringBuffer.next(); 从RingBuffer中取出一个可用的序号 根据取出的序号找到一个具体的元素,注意这时候从RingBuffer中取出的是一个空对象,(并不是NULL)是通过事件工厂new出来的一个普通POJO 此时就可以为事件进行属性赋值 最终通过RingBuffer的publish方法提交序号,最终实际上影响的是对应序号上的事件对象实例的引用
五、编写测试用例,运行并观察结果
public class DisruptorDemo {
public static void main(String[] args) {
// 1. 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024 * 1024;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(10000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("threadpool-");
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 2. 实例化disruptor对象
* eventFactory: 消息event工厂对象
* ringBufferSize: 容器长度
* executor: 线程池 建议使用自定有线程池
* ProduceType: 生产者是单还是多
* waitStrategy: 等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor<>(
orderEventFactory,
ringBufferSize,
threadPoolExecutor,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
// 3. 添加消费者监听 构建disruptor与消费者的关联关系
disruptor.handleEventsWith(new OrderEventHandler());
// 4. 启动disruptor
disruptor.start();
// 5. 获取实际存储数据的容器:RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer orderEventProducer = new OrderEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
for (long i = 0; i < 100; i++) {
byteBuffer.putLong(0, i);
orderEventProducer.sendData(byteBuffer);
}
disruptor.shutdown();
threadPoolExecutor.shutdown();
}
}
首先进行了一些简单的参数准备工作,实例化了事件工厂、初始化了ringBufferSize、自定义了一个线程池 实例化disruptor对象,暂时指定生产者类型为单生产者,后续再介绍多生产者如何使用 为disruptor添加消费者监听,实际上可以添加多个,构建出disruptor与消费者的关联关系 启动disruptor 获取ringBuffer实例,将引用传递给生产者 通过生产者为ringBuffer中的事件进行赋值,并发布 消费者消费事件对象 关闭disruptor
运行案例,日志打印如下
省略部分内容.....
生产者: sendData->96
消费者: receive->OrderEvent(value=95)
生产者: sendData->97
消费者: receive->OrderEvent(value=96)
生产者: sendData->98
消费者: receive->OrderEvent(value=97)
生产者: sendData->99
消费者: receive->OrderEvent(value=98)
可以看到,生产顺序与消费顺序都是有序的,并且实现了生产者逻辑与消费者逻辑的解耦。
本文主要从案例出发,对disruptor先入为主的进行了体验,后续我们会深入各个组件详细认识这个高性能线程间消息传递库。
评论