之前我们已经熟悉了Disruptor的启动和事件生产操作,接下来我们一同探究Disruptor如何消费事件。
0x00 概念回顾
我们先回顾下Disruptor消费相关的名词概念:
Event: Disruptor中传输的事件。
RingBuffer: 存储和更新事件的容器。
EventHandler: 用户实现接口,包含消费处理逻辑,代表Disruptor一个消费者。
EventProcessor: EventProcessor继承了Runnable接口,包含处理Disruptor事件的主循环。
多播事件: 队列和Disruptor在表现行为上最大的区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的事件会发布给所有消费者。特别适合同一数据的独立并行处理操作。
消费者依赖图(消费链):同一事件需要被多个消费者消费时,消费者之间可能有依赖关系,如消费者A,B,C,B和C依赖A先执行,但是B和C可以并行消费。
0x01 EventProcessor接口概览
OK,咱们正式开始对Disruptor消费者的源码解读。
Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。
|
|
EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor。
在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。
而使用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。
0x02 消费技术实现
我们先回顾下Disruptor消费者的两个特点:消费者依赖图(即下文所谓的“消费链”)和事件多播。
假设现在有A,B,C,D四个消费者,它们都能组成什么样的形式呢?从众多的排列组合中,我挑了4组比较有代表性的消费链形式。
- 第1组中,消费者A消费按成后,B、C、D可同时消费;
- 第2组中,消费者A、B、C、D顺序消费;
- 第3组中,消费者A、B顺序消费后,C、D同时消费;
- 第4组中,消费者A在消费完成后,B和C可以同时消费,但是必须在都消费完成后,D才能消费。
标号为1、3、4的消费链都使用了事件多播,可见事件多播属于消费链的一种组合形式。注意,在上面4种组合中,每个组合的每一水平行,都属于一个消费者组。
这些还只是较为简单的消费链组成,实际中消费链可能会更复杂。
那么在Disruptor内部是怎么实现消费链的呢?
我们可以先思考下。如果想把独立的消费者组成消费链,那么后方的消费者(组)必然要知道在它前方的消费者(组)的处理情况,否则就做不到顺序消费。同时,消费者也要了解生产者的位置,来判断是否有可用事件。之前我们分析生产者代码的时候,已经讲过,生产者为了不覆盖没有消费完全的事件,必须知道最慢消费者的处理情况。
做到了这些才会有能力去控制消费者组成消费链。下面让我们具体看Disruptor中的实现。
0x02.1 使用BatchEventProcessor单线程批处理事件
在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。
|
|
可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors这个方法中。
先简单说下ConsumerRepository,这个类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMap。IdentityHashMap
和HashMap最大的不同,就是使用==而不是equals比较key。
这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier。
BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。
0x02.2 消费者可用序列屏障-SequenceBarrier
我们重点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的主要作用是协调获取消费者可处理到的最大序号,内部持有着生产者和其依赖的消费者序列。它的接口定义如下。
SequenceBarrier实例引用被EventProcessor持有,用于等待并获取可用的消费事件,主要体现在waitFor这个方法。
要实现这个功能,需要3点条件:
- 知道生产者的位置。
- 因为Disruptor支持消费者链,在不同的消费者组之间,要保证后边的消 费者组只有在前消费者组中的消费者都处理完毕后,才能进行处理。
- 暂时没有事件可消费,在等待可用消费时,还需要使用某种等待策略进行等待。
看下SequenceBarrier实现类ProcessingSequenceBarrier的代码是如何实现waitFor方法。
0x02.3 该用什么姿势等待可用事件-WaitStrategy
看来实际的等待操作还是在WaitStrategy#waitFor完成的。
阻塞等待策略使用Lock+Condition的方式等待生产者生产可用事件,而使用Busy Spin的方式等待可能出现的上一个消费者组未消费完成的情况。
这里给我们一个提示,在构建低延迟系统时,因为锁的性能消耗,尽量不要使用锁。如果必须要用锁,也要把锁粒度调到最小。
另外,消费者在等待可用消费事件时,会循环调用barrier.checkAlert(),再去调用锁的条件等待,等待可用消费事件。
有三个地方可以唤醒等待中的消费线程。两种是在Sequencer实现类中,一是有可用事件发布,通知消费线程继续消费;二是在调用next()获取可用的RingBuffer槽位时,发现RingBuffer满了(生产者速度大于消费者,导致生产者没有可用位置发布事件),将唤醒消费者线程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )。开始我百思不得,为什么要在buffer满了的时候不断唤醒消费者线程,直到看到这个issue才明白。大意是在log4j2中使用Disruptor时发生了死锁,为了避免在发布事件时,由于某种原因导致没有通知到消费者,在生产者尝试往一个已满的buffer发布数据时,就会再通知消费者进行消费。而这个bug最终也被Log4j认领,与Disruptor无关。Disruptor这里的再次通知也是为了更加保险。
还有一种唤醒就是关闭Disruptor时,消费者关闭前将会处理完当前批次数据(并非RingBuffer的所有数据,而是此次循环取出的最大可用序号以下的所有未处理数据),如果消费者线程当前在等待状态,将被唤醒并终结。
BatchEventProcessor就讲到这。
0x02.4 使用WorkProcessor多线程处理事件
下面说一说WorkHandler+WorkProcessor。
上面讲过,使用EventHandler+BatchEventProcessor这种方式类似JMS的发布订阅,同一个事件会被不同线程的EventHandler并行消费。那么,如果单线程处理能力不足,想多线程处理同一主题下的不同事件该怎么办呢?这种方式就类似JMS的点到点模式,多个消费者可以监听同一个队列,谁先拿到就归谁处理。
在Disruptor中使用WorkHandler+WorkProcessor实现以上功能。当需要使用这种模式,可在设置Disruptor消费者时,通过使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool设置消费链。
先看下相关的源码。
在使用线程池处理事件时,与单线程处理相比,最大的不同在于新增了一个WorkerPool。WorkerPool用于管理一组WorkProcessor,它的属性、方法如下。
WorkProcessor的原理和BatchEventProcessor类似,只是多了workSequence用来保存同组共用的处理序列。在更新workSequence时,涉及多线程操作,所以使用CAS进行更新。
WorkProcessor的run()方法如下。
代码逻辑和BatchEventProcessor类似,就不再赘述啦。
还有一点需要留意,Disruptor通过EventHandlerGroup代表一个消费者组,就表示之前那四张图中一个水平线上的消费者组。这样不同的消费者组之间不必关心各自的实现,从而可以实现更加复杂和灵活的消费链,即依赖图表。
0x03 消费者小结
从小语文老师就教育我们写作文要总结,好习惯不能忘~
本文主要探讨了Disruptor消费者内部概要实现,重点阐述了BatchEventProcessor、WorkProcess的消费代码原理。同时省略了超时通知、开始和结束通知、异常控制等内容,并非不重要,而只是尽量言简意赅,达到抛砖引玉的目的。
BatchEventProcessor主要用于处理单线程并行任务,同一消费者组的不同消费者会接收相同的事件,并在所有事件处理完毕后进入下一消费者组进行处理(是不是类似JUC里的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor通过WorkerPool管理多个WorkProcessor,达到多线程处理事件的目的,同一消费者组的多个WorkProcessor不会处理同一个事件。通过选择不同的WaitStragegy实现,可以控制消费者在没有可用事件处理时的等待策略。
好啦,有关Disruptor消费者的分享就到这。
欢迎大家留言讨论,一同探讨,一同进步。