解读Disruptor系列--解读源码(3)之消费者

之前我们已经熟悉了Disruptor的启动和事件生产操作,接下来我们一同探究Disruptor如何消费事件。

0x00 概念回顾

我们先回顾下Disruptor消费相关的名词概念:
Event: Disruptor中传输的事件。
RingBuffer: 存储和更新事件的容器。
EventHandler: 用户实现接口,包含消费处理逻辑,代表Disruptor一个消费者。
EventProcessor: EventProcessor继承了Runnable接口,包含处理Disruptor事件的主循环。

多播事件: 队列和Disruptor在表现行为上最大的区别。队列中的一个事件只能被一个消费者消费,而Disruptor中的事件会发布给所有消费者。特别适合同一数据的独立并行处理操作。
消费者依赖图(消费链):同一事件需要被多个消费者消费时,消费者之间可能有依赖关系,如消费者A,B,C,B和C依赖A先执行,但是B和C可以并行消费。

0x01 EventProcessor接口概览

OK,咱们正式开始对Disruptor消费者的源码解读。
Disruptor的消费者依赖EventProcessor循环处理可用事件。EventProcessor顾名思义,就是事件处理器(handle和process都可以翻译为“处理”,但是process侧重于机器的处理,而handle侧重于有人工的处理,所以使用handle表示用户逻辑的处理,使用process表示机器的处理),这个接口有两个实现类,分别是WorkProcessor和BatchEventProcessor,它们对应的逻辑处理消费者分别是EventHandler和WorkHandler。下面是EventProcessor的UML类图及EventHandler和EventProcessor的接口定义。

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
* @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
* 处理事件的回调接口
*/
public interface EventHandler<T>
{
    /**
    * Called when a publisher has published an event to the {@link RingBuffer}
    *
    * @param event      published to the {@link RingBuffer}
    * @param sequence  of the event being processed
    * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
    * @throws Exception if the EventHandler would like the exception handled further up the chain.
    */
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
* 事件执行器,等待RingBuffer有可用消费事件。一个事件处理器关联一个执行线程
*/
public interface EventProcessor extends Runnable
{
    /**
    * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
    *
    * @return reference to the {@link Sequence} for this {@link EventProcessor}
    */
    Sequence getSequence();
    /**
    * Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
    * It will call {@link SequenceBarrier#alert()} to notify the thread to check status.
    */
    void halt();
    boolean isRunning();
}

EventProcessor接口继承了Runnable接口,主要有两种实现:单线程批量处理BatchEventProcessor和多线程处理WorkProcessor
在使用Disruptor帮助类构建消费者时,使用handleEventsWith方法传入多个EventHandler,内部使用多个BatchEventProcessor关联多个线程执行。这种情况类似JMS中的发布订阅模式,同一事件会被多个消费者并行消费。适用于同一事件触发多种操作。
而使用Disruptor的handleEventsWithWorkerPool传入多个WorkHandler时,内部使用多个WorkProcessor关联多个线程执行。这种情况类似JMS的点对点模式,同一事件会被一组消费者其中之一消费。适用于提升消费者并行处理能力。

0x02 消费技术实现

我们先回顾下Disruptor消费者的两个特点:消费者依赖图(即下文所谓的“消费链”)和事件多播。
假设现在有A,B,C,D四个消费者,它们都能组成什么样的形式呢?从众多的排列组合中,我挑了4组比较有代表性的消费链形式。

image.png

  • 第1组中,消费者A消费按成后,B、C、D可同时消费;
  • 第2组中,消费者A、B、C、D顺序消费;
  • 第3组中,消费者A、B顺序消费后,C、D同时消费;
  • 第4组中,消费者A在消费完成后,B和C可以同时消费,但是必须在都消费完成后,D才能消费。

标号为1、3、4的消费链都使用了事件多播,可见事件多播属于消费链的一种组合形式。注意,在上面4种组合中,每个组合的每一水平行,都属于一个消费者组。
这些还只是较为简单的消费链组成,实际中消费链可能会更复杂。
那么在Disruptor内部是怎么实现消费链的呢?
我们可以先思考下。如果想把独立的消费者组成消费链,那么后方的消费者(组)必然要知道在它前方的消费者(组)的处理情况,否则就做不到顺序消费。同时,消费者也要了解生产者的位置,来判断是否有可用事件。之前我们分析生产者代码的时候,已经讲过,生产者为了不覆盖没有消费完全的事件,必须知道最慢消费者的处理情况
做到了这些才会有能力去控制消费者组成消费链。下面让我们具体看Disruptor中的实现。

0x02.1 使用BatchEventProcessor单线程批处理事件

在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现,其实可以算作是Disruptor这个辅助类的一部分。

1
2
3
4
5
6
7
8
9
10
// EventHandlerGroup.java
public EventHandlerGroup<T> then(final EventHandler<? super T>... handlers)
{
    return handleEventsWith(handlers);
}
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return disruptor.createEventProcessors(sequences, handlers);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Disruptor.java
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
    return createEventProcessors(new Sequence[0], handlers);
}
// 由EventHandlerGroup调用时,barrierSequences是EventHandlerGroup实例的序列,也就是上一个事件处理者组的序列,作为当前事件处理的门控,防止后边的消费链超前
// 如果是第一次调用handleEventsWith,则barrierSequences是一个空数组
EventHandlerGroup<T> **createEventProcessors**(
    final Sequence[] barrierSequences,
    final EventHandler<? super T>[] eventHandlers)
{
    checkNotStarted();
    // 对应此事件处理器组的序列组
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
    {
        final EventHandler<? super T> eventHandler = eventHandlers[i];
        // 批量处理事件的循环
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
        if (exceptionHandler != null)
        {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }
    // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加。(所谓门控,是指后续消费链的消费,不能超过前边。)
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
// 为消费链下一组消费者,更新门控序列
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值
// processorSequences是本次要设置的事件处理器组的序列
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    if (processorSequences.length > 0)
    {
        ringBuffer.addGatingSequences(processorSequences); // 将本组序列添加到Sequencer中的gatingSequences中
        for (final Sequence barrierSequence : barrierSequences) // 将上组序列从Sequencer中的gatingSequences中,gatingSequences一直保存消费链末端消费者的序列组
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); // 取消标记上一组消费者为消费链末端
    }
}

可以看到,使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors这个方法中。
先简单说下ConsumerRepository,这个类主要保存消费者的各种关系,如通过EventHandler引用获取EventProcessorInfo信息,通过Sequence获取ConsumerInfo信息等。因为要使用引用做key,所以数据结构使用IdentityHashMapIdentityHashMap
和HashMap最大的不同,就是使用==而不是equals比较key
这个createEventProcessors方法接收两个参数,barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。createEventProcessors方法的另一个参数eventHandlers,这个参数是代表事件消费逻辑的EventHandler数组。
Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
在构建BatchEventProcessor时需要以下传入三个构造参数:dataProvider是数据存储结构如RingBuffer;sequenceBarrier用于跟踪生产者游标,协调数据处理;eventHandler是用户实现的事件处理器,也就是实际的消费者。
注意,Disruptor并非为每个BatchEventProcessor都创建一个新的SequenceBarrier,而是每个消费者组共用一个SequenceBarrier
BatchEventProcessor定义如下。至于为什么要叫做BatchEventProcessor,可以看看在run()方法里每次waitFor获取的availableSequence是当前能够使用的最大值,然后再循环处理这些数据。这样当消费者有瞬时抖动,导致暂时落后生产者时,可在下一次循环中,批量处理所有落后的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
* and delegating the available events to an {@link EventHandler}.
* <p>
* If the {@link EventHandler} also implements {@link LifecycleAware} it will be notified just after the thread
* is started and just before the thread is shutdown.
*
* @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
*
* 每个EventHandler对应一个EventProcessor执行者,BatchEventProcessor每次大循环可以获取最高可用序号,并循环调用EventHandler
*/
public final class BatchEventProcessor<T>
    implements EventProcessor
{
    private final AtomicBoolean running = new AtomicBoolean(false);
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    private final DataProvider<T> dataProvider; // 数据提供者,默认是RingBuffer,也可替换为自己的数据结构
    private final SequenceBarrier sequenceBarrier; // 默认为ProcessingSequenceBarrier
    private final EventHandler<? super T> eventHandler; // 此EventProcessor对应的用户自定义的EventHandler实现
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // 当前执行位置
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware; // 每次循环取得一批可用事件后,在实际处理前调用
    /**
    * Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
    * the {@link EventHandler#onEvent(Object, long, boolean)} method returns.
    *
    * @param dataProvider    to which events are published.
    * @param sequenceBarrier on which it is waiting.
    * @param eventHandler    is the delegate to which events are dispatched.
    */
    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;
        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }
        batchStartAware =
                (eventHandler instanceof BatchStartAware) ? (BatchStartAware) eventHandler : null;
        timeoutHandler =
                (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }
    // ... 省略部分代码
    /**
    * It is ok to have another thread rerun this method after a halt().
    *
    * @throws IllegalStateException if this object instance is already running in a thread
    */
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();
        notifyStart();
        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {  // availableSequence返回的是可用的最大值
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 使用给定的等待策略去等待下一个序列可用
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    // 批处理在此处得以体现
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    // eventHandler处理完毕后,更新当前序号
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }
}

0x02.2 消费者可用序列屏障-SequenceBarrier

我们重点看一下SequenceBarrier,可直译为“序列屏障”。SequenceBarrier的主要作用是协调获取消费者可处理到的最大序号,内部持有着生产者和其依赖的消费者序列。它的接口定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public interface SequenceBarrier
{
    /**
    * Wait for the given sequence to be available for consumption.<br>
    * 等待指定序列可用
    * @param sequence to wait for
    * @return the sequence up to which is available
    * @throws AlertException      if a status change has occurred for the Disruptor
    * @throws InterruptedException if the thread needs awaking on a condition variable.
    * @throws TimeoutException
    *
    */
    long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
    /**
    * Get the current cursor value that can be read.<br>
    * 获取当前可读游标值
    *
    * @return value of the cursor for entries that have been published.
    *
    */
    long getCursor();
    /**
    * The current alert status for the barrier.<br>
    * 当前的alert状态
    *
    * @return true if in alert otherwise false.
    */
    boolean isAlerted();
    /**
    * Alert the {@link EventProcessor}s of a status change and stay in this status until cleared.<br>
    *
    * 通知消费者状态变化。当调用EventProcessor#halt()将调用此方法。
    */
    void alert();
    /**
    * Clear the current alert status.<br>
    * 清楚alert状态
    */
    void clearAlert();
    /**
    * Check if an alert has been raised and throw an {@link AlertException} if it has.
    * 检查是否发生alert,发生将抛出异常
    * @throws AlertException if alert has been raised.
    */
    void checkAlert() throws AlertException;
}

SequenceBarrier实例引用被EventProcessor持有,用于等待并获取可用的消费事件,主要体现在waitFor这个方法。
要实现这个功能,需要3点条件:

  1. 知道生产者的位置。
  2. 因为Disruptor支持消费者链,在不同的消费者组之间,要保证后边的消 费者组只有在前消费者组中的消费者都处理完毕后,才能进行处理。
  3. 暂时没有事件可消费,在等待可用消费时,还需要使用某种等待策略进行等待。

看下SequenceBarrier实现类ProcessingSequenceBarrier的代码是如何实现waitFor方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final class ProcessingSequenceBarrier implements SequenceBarrier
{
    private final WaitStrategy waitStrategy; // 等待可用消费时,指定的等待策略
    private final Sequence dependentSequence; // 依赖的上组消费者的序号,如果当前为第一组则为cursorSequence(即生产者发布游标序列),否则使用FixedSequenceGroup封装上组消费者序列
    private volatile boolean alerted = false; // 当触发halt时,将标记alerted为true
    private final Sequence cursorSequence; // AbstractSequencer中的cursor引用,记录当前发布者发布的最新位置
    private final Sequencer sequencer; // MultiProducerSequencer 或 SingleProducerSequencer
    public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final WaitStrategy waitStrategy,
        final Sequence cursorSequence,
        final Sequence[] dependentSequences)
    {
        this.sequencer = sequencer;
        this.waitStrategy = waitStrategy;
        this.cursorSequence = cursorSequence;
        if (0 == dependentSequences.length) // 依赖的上一组序列长度,第一次是0
        {
            dependentSequence = cursorSequence;
        }
        else // 将上一组序列数组复制成新数组保存,引用不变
        {
            dependentSequence = new FixedSequenceGroup(dependentSequences);
        }
    }
    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        // 检查是否停止服务
        checkAlert();
        // 获取最大可用序号 sequence为给定序号,一般为当前序号+1,cursorSequence记录生产者最新位置,
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        // 返回已发布最高的序列值,将对每个序号进行校验
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
    // ... 
}

0x02.3 该用什么姿势等待可用事件-WaitStrategy

看来实际的等待操作还是在WaitStrategy#waitFor完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// WaitStrategy.java
/**
* Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}. <br>
* 消费者等待可用事件的策略
*/
public interface WaitStrategy
{
    /**
    * Wait for the given sequence to be available.  It is possible for this method to return a value
    * less than the sequence number supplied depending on the implementation of the WaitStrategy.  A common
    * use for this is to signal a timeout.  Any EventProcessor that is using a WaitStrategy to get notifications
    * about message becoming available should remember to handle this case.  The {@link BatchEventProcessor} explicitly
    * handles this case and will signal a timeout if required.
    *
    * @param sequence          to be waited on. 给定序号
    * @param cursor            the main sequence from ringbuffer. Wait/notify strategies will
    *                          need this as it's the only sequence that is also notified upon update. 生产者游标
    * @param dependentSequence on which to wait. 依赖的序列,一般是上一个消费者组序列的FixedSequenceGroup封装。如果消费者是第一组,则为cursor。
    * @param barrier          the processor is waiting on. 在等待时需要判断是否对消费者有alert操作
    * @return the sequence that is available which may be greater than the requested sequence.
    * @throws AlertException      if the status of the Disruptor has changed.
    * @throws InterruptedException if the thread is interrupted.
    * @throws TimeoutException
    */
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    /**
    * Implementations should signal the waiting {@link EventProcessor}s that the cursor has advanced. <br>
    * 当生产者发布新事件后,将通知等待的EventProcessor。当用锁机制时才会包含相应逻辑。
    */
    void signalAllWhenBlocking();
}
在各种等待策略中,我们选取阻塞策略研究。
public final class BlockingWaitStrategy implements WaitStrategy
{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();
    @Override
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence) // 当前游标小于给定序号,也就是无可用事件
        {
            lock.lock();
            try
            {
                while (cursorSequence.get() < sequence) // 当给定的序号大于生产者游标序号时,进行等待
                {
                    barrier.checkAlert();
                    // 循环等待,在Sequencer中publish进行唤醒;等待消费时也会在循环中定时唤醒。
                    // 循环等待的原因,是要检查alert状态。如果不检查将导致不能关闭Disruptor。
                    processorNotifyCondition.await();
                }
            }
            finally
            {
                lock.unlock();
            }
        }
// 给定序号大于上一个消费者组最慢消费者(如当前消费者为第一组则和生产者游标序号比较)序号时,需要等待。不能超前消费上一个消费者组未消费完毕的事件。
// 那么为什么这里没有锁呢?可以想一下此时的场景,代码运行至此,已能保证生产者有新事件,如果进入循环,说明上一组消费者还未消费完毕。
// 而通常我们的消费者都是较快完成任务的,所以这里才会考虑使用Busy Spin的方式等待上一组消费者完成消费。
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            barrier.checkAlert();
        }
        return availableSequence;
    }
    @Override
    public void signalAllWhenBlocking()
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }
    @Override
    public String toString()
    {
        return "BlockingWaitStrategy{" +
            "processorNotifyCondition=" + processorNotifyCondition +
            '}';
    }
}

阻塞等待策略使用Lock+Condition的方式等待生产者生产可用事件,而使用Busy Spin的方式等待可能出现的上一个消费者组未消费完成的情况。
这里给我们一个提示,在构建低延迟系统时,因为锁的性能消耗,尽量不要使用锁。如果必须要用锁,也要把锁粒度调到最小。
另外,消费者在等待可用消费事件时,会循环调用barrier.checkAlert(),再去调用锁的条件等待,等待可用消费事件。
有三个地方可以唤醒等待中的消费线程。两种是在Sequencer实现类中,一是有可用事件发布,通知消费线程继续消费;二是在调用next()获取可用的RingBuffer槽位时,发现RingBuffer满了(生产者速度大于消费者,导致生产者没有可用位置发布事件),将唤醒消费者线程,此功能在3.3.5版本新增(Resignal any waiting threads when trying to publish to a full ring buffer )。开始我百思不得,为什么要在buffer满了的时候不断唤醒消费者线程,直到看到这个issue才明白。大意是在log4j2中使用Disruptor时发生了死锁,为了避免在发布事件时,由于某种原因导致没有通知到消费者,在生产者尝试往一个已满的buffer发布数据时,就会再通知消费者进行消费。而这个bug最终也被Log4j认领,与Disruptor无关。Disruptor这里的再次通知也是为了更加保险。

1
2
3
4
5
6
7
8
9
//*ProducerSequencer.java
// next(n)中的代码
// 由于慢消费者,无可用坑位,只有当消费者消费,向前移动后,才能跳出循环
// 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
// 唤醒等待的消费者,正常情况下并无意义,只是为了避免极少数情况下未知原因导致的发布时锁机制出现异常,未通知到消费者
    waitStrategy.signalAllWhenBlocking();
    LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}

还有一种唤醒就是关闭Disruptor时,消费者关闭前将会处理完当前批次数据(并非RingBuffer的所有数据,而是此次循环取出的最大可用序号以下的所有未处理数据),如果消费者线程当前在等待状态,将被唤醒并终结。
BatchEventProcessor就讲到这。

0x02.4 使用WorkProcessor多线程处理事件

下面说一说WorkHandler+WorkProcessor。
上面讲过,使用EventHandler+BatchEventProcessor这种方式类似JMS的发布订阅,同一个事件会被不同线程的EventHandler并行消费。那么,如果单线程处理能力不足,想多线程处理同一主题下的不同事件该怎么办呢?这种方式就类似JMS的点到点模式,多个消费者可以监听同一个队列,谁先拿到就归谁处理。
在Disruptor中使用WorkHandler+WorkProcessor实现以上功能。当需要使用这种模式,可在设置Disruptor消费者时,通过使用handleEventsWithWorkerPool和thenHandleEventsWithWorkerPool设置消费链。

1
2
3
4
5
6
7
8
9
disruptor
    .handleEventsWithWorkerPool(
      new WorkHandler[]{
          journalHandler,
          journalHandler,
          journalHandler
      }
    )
    .thenHandleEventsWithWorkerPool(resultHandler);

先看下相关的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Disruptor
public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
    return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
    final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
    final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
    final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
    consumerRepository.add(workerPool, sequenceBarrier);
    final Sequence[] workerSequences = workerPool.getWorkerSequences();
    updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
    return new EventHandlerGroup<T>(this, consumerRepository, workerSequences);
}
// WorkerPool.java WorkerPool构造方法
public WorkerPool(
    final RingBuffer<T> ringBuffer,
    final SequenceBarrier sequenceBarrier,
    final ExceptionHandler<? super T> exceptionHandler,
    final WorkHandler<? super T>... workHandlers)
{
    this.ringBuffer = ringBuffer;
    final int numWorkers = workHandlers.length;
    workProcessors = new WorkProcessor[numWorkers];
    for (int i = 0; i < numWorkers; i++)
    {
        workProcessors[i] = new WorkProcessor<T>( // 为每个WorkHandler新建一个WorkProcessor
            ringBuffer,
            sequenceBarrier,
            workHandlers[i],
            exceptionHandler,
            workSequence);
    }
}

在使用线程池处理事件时,与单线程处理相比,最大的不同在于新增了一个WorkerPool。WorkerPool用于管理一组WorkProcessor,它的属性、方法如下。

image.png

WorkProcessor的原理和BatchEventProcessor类似,只是多了workSequence用来保存同组共用的处理序列。在更新workSequence时,涉及多线程操作,所以使用CAS进行更新。
WorkProcessor的run()方法如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Override
public void run()
{
    if (!running.compareAndSet(false, true))
    {
        throw new IllegalStateException("Thread is already running");
    }
    sequenceBarrier.clearAlert();
    notifyStart();
    boolean processedSequence = true;
    long cachedAvailableSequence = Long.MIN_VALUE;
    long nextSequence = sequence.get();
    T event = null;
    while (true)
    {
        try
        {
            // if previous sequence was processed - fetch the next sequence and set
            // that we have successfully processed the previous sequence
            // typically, this will be true
            // this prevents the sequence getting too far forward if an exception
            // is thrown from the WorkHandler
            if (processedSequence) // 表示nextSequence序号的处理情况(不区分正常或是异常处理)。只有处理过,才能申请下一个序号。
            {
                processedSequence = false;
                do
                {
                    // 同组中多个消费线程有可能会争抢一个序号,使用CAS避免使用锁。
                    // 同一组使用一个workSequence,WorkProcessor不断申请下一个可用序号,对workSequence设置成功才会实际消费。
                    nextSequence = workSequence.get() + 1L;
                    sequence.set(nextSequence - 1L);
                }
                while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
            }
            // 缓存的可用序号比要处理的序号大,才能进行处理
            if (cachedAvailableSequence >= nextSequence)
            {
                event = ringBuffer.get(nextSequence);
                workHandler.onEvent(event);
                processedSequence = true;
            }
            else // 更新缓存的可用序列。这个cachedAvailableSequence只用在WorkProcessor实例内,不同实例的缓存可能是不一样的
            {     // 和单线程模式类似,返回的也是最大可用序号
                cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
            }
        }
        catch (final TimeoutException e)
        {
            notifyTimeout(sequence.get());
        }
        catch (final AlertException ex)
        {
            if (!running.get())
            {
                break;
            }
        }
        catch (final Throwable ex)
        {
            // handle, mark as processed, unless the exception handler threw an exception
            exceptionHandler.handleEventException(ex, nextSequence, event);
            processedSequence = true;
        }
    }
    notifyShutdown();
    running.set(false);
}

代码逻辑和BatchEventProcessor类似,就不再赘述啦。
还有一点需要留意,Disruptor通过EventHandlerGroup代表一个消费者组,就表示之前那四张图中一个水平线上的消费者组。这样不同的消费者组之间不必关心各自的实现,从而可以实现更加复杂和灵活的消费链,即依赖图表。

0x03 消费者小结

从小语文老师就教育我们写作文要总结,好习惯不能忘~
本文主要探讨了Disruptor消费者内部概要实现,重点阐述了BatchEventProcessor、WorkProcess的消费代码原理。同时省略了超时通知、开始和结束通知、异常控制等内容,并非不重要,而只是尽量言简意赅,达到抛砖引玉的目的。
BatchEventProcessor主要用于处理单线程并行任务,同一消费者组的不同消费者会接收相同的事件,并在所有事件处理完毕后进入下一消费者组进行处理(是不是类似JUC里的Phaser、CyclicBarrier或CountDownLatch呢)。WorkProcessor通过WorkerPool管理多个WorkProcessor,达到多线程处理事件的目的,同一消费者组的多个WorkProcessor不会处理同一个事件。通过选择不同的WaitStragegy实现,可以控制消费者在没有可用事件处理时的等待策略。
好啦,有关Disruptor消费者的分享就到这。
欢迎大家留言讨论,一同探讨,一同进步。