解读Disruptor源码系列文章将从一个demo入手,逐步探究Disruptor中的源码实现。
对原理不熟悉的同学建议先看我之前的两个翻译和导读文章。
对Disruptor源码感兴趣的同学,可以下载我注释的Disruptor代码。
完整版Demo
|
|
构建Disruptor类
可以发现整个例子都是围绕Disruptor这个类实现的,相关内容可参见官方文档Disruptor Wizard。
其实不使用Disruptor类也是完全可以的,直接操作RingBuffer更加灵活也更麻烦。Disruptor类提供了操作RingBuffer和设置消费依赖的便捷API,如构建Ringbuffer、设置消费链、启动关闭Disruptor、暂停消费者、发布事件等。
接下来,我们把示例拆开看。
这里调用构造方法创建了一个Disruptor对象,实际上创建了一个RingBuffer对象和一个Executor,并将引入传入私有化的构造方法创建了Disruptor对象。
这里注意下,在构造RingBuffer时,需要传入用于创建事件对象的工厂eventFactory和记录生产者序号的sequencer。根据生产者是否是多线程生产,Sequencer又分为单、多生产者模式,后续还会讲到。
构建Disruptor实例后,需要设置Disruptor的消费者。
设置消费者
|
|
这里使用了两组消费者,第一组包含三个消费者,第二组包含一个消费者。当事件可消费后,只有当第一组全部消费者都处理完毕后,事件才能被第二组消费者处理。
启动Disruptor
消费者设置成功后,即可启动Disruptor。
|
|
ConsumerRepository这个类实现了Iterable接口,iterator()方法返回ConsumerInfo集合的迭代器。ConsumerInfo是一个封装类,对应EventBatchProcessor和WorkProcessor有两种实现。EventProcessorInfo对应BatchEventProcessor,保存了与一个事件处理过程相关的EventProcessor、EventHandler、SequenceBarrier的引用。WorkerPoolInfo对应WorkProcessor,保存了WorkerPool、SequenceBarrier的引用以及代表消费者组是否为消费者链尾的标志endOfChain。
如果看不懂,不要着急哈,后续讲到消费者的时候就会明白了。
调用ConsumerInfo.start()方法,其实就是启动了消费者线程:
至此,Disruptor的初始化和启动就完成了。主要是完成了RingBuffer数据结构的初始化、设置消费者以及启动。
后续将继续分享消费者代码。