解读Disruptor系列--解读源码(1)之初始化

解读Disruptor源码系列文章将从一个demo入手,逐步探究Disruptor中的源码实现。
对原理不熟悉的同学建议先看我之前的两个翻译和导读文章。
对Disruptor源码感兴趣的同学,可以下载我注释的Disruptor代码

完整版Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package com.coderjerry.disruptor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
/**
* Disruptor例子
* jerry li
*/
public class DisruptorDSLExample {
  /**
  * 用户自定义事件
  */
  class ExampleEvent{
    Object data ;
    Object ext;
    @Override
    public String toString() {
      return "DisruptorDSLExample[data:"+this.data+",ext:"+ext+"]";
    }
  }
  /**
  * 用户事件工厂,实现EventFactory接口,用于初始化事件对象
  */
  class ExampleEventFactory implements EventFactory<ExampleEvent>{
    @Override
    public ExampleEvent newInstance() {
      return new ExampleEvent();
    }
  }
  /**
  * 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
  */
  static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
    static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
    @Override
    public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
      event.data = arg0 ;
      System.err.println("put data "+sequence+", "+event+", "+arg0);
    }
  }
  // 用于事件处理(EventProcessor)的线程工厂
  ThreadFactory threadFactory =
      new ThreadFactoryBuilder()
          .setNameFormat("disruptor-executor-%d")
          .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
              System.out.println("Thread " + t + "throw " + e);
              e.printStackTrace();
            }
          })
          .build();
  Disruptor disruptor = null;
  // 初始化Disruptor
  public void createDisruptor(final CountDownLatch latch){
    disruptor = new Disruptor<ExampleEvent>(
        new ExampleEventFactory(),  // 用于创建环形缓冲中对象的工厂
        8// 环形缓冲的大小
        threadFactory,  // 用于事件处理的线程工厂
        ProducerType.MULTI, // 生产者类型,单vs多生产者
        new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方
    // 消费者模拟-日志处理
    EventHandler journalHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(8);
        System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
      }
    };
    // 消费者模拟-复制处理
    EventHandler replicateHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
      }
    };
    // 消费者模拟-解码处理
    EventHandler unmarshallHandler = new EventHandler() { // 最慢
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        Thread.sleep(1*1000);
        if(event instanceof ExampleEvent){
          ((ExampleEvent)event).ext = "unmarshalled ";
        }
        System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
      }
    };
    // 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
    EventHandler resultHandler = new EventHandler() {
      @Override
      public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
        latch.countDown();
      }
    };
    // 定义消费链,先并行处理日志、解码和复制,再处理结果上报
    disruptor
        .handleEventsWith(
          new EventHandler[]{
              journalHandler,
              unmarshallHandler,
              replicateHandler
          }
        )
        .then(resultHandler);
    // 启动Disruptor
    disruptor.start();
  }
  public void shutdown(){
    disruptor.shutdown();
  }
  public Disruptor getDisruptor(){
    return disruptor;
  }
  public static void main(String[] args) {
    final int events = 20; // 必须为偶数
    DisruptorDSLExample disruptorDSLExample = new DisruptorDSLExample();
    final CountDownLatch latch = new CountDownLatch(events);
    disruptorDSLExample.createDisruptor(latch);
    final Disruptor disruptor = disruptorDSLExample.getDisruptor();
    // 生产线程0
    Thread produceThread0 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
        }
      }
    });
    // 生产线程1
    Thread produceThread1 = new Thread(new Runnable() {
      @Override
      public void run() {
        int x = 0;
        while(x++ < events / 2){
          disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
        }
      }
    });
    produceThread0.start();
    produceThread1.start();
    try {
      latch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    disruptorDSLExample.shutdown();
  }
}

构建Disruptor类

可以发现整个例子都是围绕Disruptor这个类实现的,相关内容可参见官方文档Disruptor Wizard。
其实不使用Disruptor类也是完全可以的,直接操作RingBuffer更加灵活也更麻烦。Disruptor类提供了操作RingBuffer和设置消费依赖的便捷API,如构建Ringbuffer、设置消费链、启动关闭Disruptor、暂停消费者、发布事件等。
接下来,我们把示例拆开看。

1
2
3
4
5
6
disruptor = new Disruptor<ExampleEvent>(
    new ExampleEventFactory(),  // 用于创建环形缓冲中对象的工厂
    8// 环形缓冲的大小
    threadFactory,  // 用于事件处理的线程工厂
    ProducerType.MULTI, // 生产者类型,单vs多生产者
    new BlockingWaitStrategy()); // 等待环形缓冲游标的等待策略,这里使用阻塞模式,也是Disruptor中唯一有锁的地方

这里调用构造方法创建了一个Disruptor对象,实际上创建了一个RingBuffer对象和一个Executor,并将引入传入私有化的构造方法创建了Disruptor对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Disruptor.java
public Disruptor(
        final EventFactory<T> eventFactory, // 用于创建环形缓冲中对象的工厂
        final int ringBufferSize, // 环形缓冲的大小
        final ThreadFactory threadFactory, // 用于事件处理的线程工厂
        final ProducerType producerType, // 生产者类型,单vs多生产者
        final WaitStrategy waitStrategy) // 等待环形缓冲游标的等待策略
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}
// RingBuffer.java
public static <E> RingBuffer<E> create(
        ProducerType producerType,
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        switch (producerType) // 构建RingBuffer时通过producerType来区分单生产者或多生产者
        {
            case SINGLE:
                return createSingleProducer(factory, bufferSize, waitStrategy);
            case MULTI:
                return createMultiProducer(factory, bufferSize, waitStrategy);
            default:
                throw new IllegalStateException(producerType.toString());
        }
    }
// 单生产者模式创建RingBuffer
public static <E> RingBuffer<E> createSingleProducer(
    EventFactory<E> factory,
    int bufferSize,
    WaitStrategy waitStrategy)
{
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
    return new RingBuffer<E>(factory, sequencer);
}
// 多生产者模式创建RingBuffer
public static <E> RingBuffer<E> createMultiProducer(
        EventFactory<E> factory,
        int bufferSize,
        WaitStrategy waitStrategy)
    {
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }
// RingBuffer构造器
RingBuffer(
    EventFactory<E> eventFactory,
    Sequencer sequencer)
{
    super(eventFactory, sequencer);
}

这里注意下,在构造RingBuffer时,需要传入用于创建事件对象的工厂eventFactory和记录生产者序号的sequencer。根据生产者是否是多线程生产,Sequencer又分为单、多生产者模式,后续还会讲到。
构建Disruptor实例后,需要设置Disruptor的消费者。

设置消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 消费者模拟-日志处理
EventHandler journalHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(8);
    System.out.println(Thread.currentThread().getId() + " process journal " + event + ", seq: " + sequence);
  }
};
// 消费者模拟-复制处理
EventHandler replicateHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(10);
    System.out.println(Thread.currentThread().getId() + " process replication " + event + ", seq: " + sequence);
  }
};
// 消费者模拟-解码处理
EventHandler unmarshallHandler = new EventHandler() { // 最慢
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    Thread.sleep(1*1000);
    if(event instanceof ExampleEvent){
      ((ExampleEvent)event).ext = "unmarshalled ";
    }
    System.out.println(Thread.currentThread().getId() + " process unmarshall " + event + ", seq: " + sequence);
  }
};
// 消费者处理-结果上报,只有执行完以上三种后才能执行此消费者
EventHandler resultHandler = new EventHandler() {
  @Override
  public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println(Thread.currentThread().getId() + " =========== result " + event + ", seq: " + sequence);
    latch.countDown();
  }
};

这里使用了两组消费者,第一组包含三个消费者,第二组包含一个消费者。当事件可消费后,只有当第一组全部消费者都处理完毕后,事件才能被第二组消费者处理。

1
2
3
4
5
6
7
8
9
10
// 定义消费链,先并行处理日志、解码和复制,再处理结果上报
disruptor
    .handleEventsWith(
      new EventHandler[]{
          journalHandler,
          unmarshallHandler,
          replicateHandler
      }
    )
    .then(resultHandler);

启动Disruptor

消费者设置成功后,即可启动Disruptor。

1
2
// 启动Disruptor
disruptor.start();

1
2
3
4
5
6
7
8
9
10
11
// Disruptor.java
public RingBuffer<T> start()
{
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository)
    {
        consumerInfo.start(executor);
    }
    return ringBuffer;
}

ConsumerRepository这个类实现了Iterable接口,iterator()方法返回ConsumerInfo集合的迭代器。ConsumerInfo是一个封装类,对应EventBatchProcessor和WorkProcessor有两种实现。EventProcessorInfo对应BatchEventProcessor,保存了与一个事件处理过程相关的EventProcessor、EventHandler、SequenceBarrier的引用。WorkerPoolInfo对应WorkProcessor,保存了WorkerPool、SequenceBarrier的引用以及代表消费者组是否为消费者链尾的标志endOfChain。
如果看不懂,不要着急哈,后续讲到消费者的时候就会明白了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// ConsumerRepository.java
class ConsumerRepository<T> implements Iterable<ConsumerInfo>
{
    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler =
        new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); // hander引用为key
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<Sequence, ConsumerInfo>(); // 处理器的序列引用为key
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
    // 省略代码若干... 
    @Override
    public Iterator<ConsumerInfo> iterator()
    {
        return consumerInfos.iterator();
    }
}

调用ConsumerInfo.start()方法,其实就是启动了消费者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// EventProcessorInfo.java
class EventProcessorInfo<T> implements ConsumerInfo
{
    // 省略代码若干...
    @Override
    public void start(final Executor executor)
    {
        executor.execute(eventprocessor);
    }
}
// WorkerPoolInfo.java
class WorkerPoolInfo<T> implements ConsumerInfo
{
     // 省略代码若干...
    @Override
    public void start(final Executor executor)
    {
        workerPool.start(executor);
    }
}
// WorkerPool.java
public final class WorkerPool<T>
{
     // 省略代码若干...
     public RingBuffer<T> start(final Executor executor)
     {
    if (!started.compareAndSet(false, true))
    {
        throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
    }
    final long cursor = ringBuffer.getCursor();
    workSequence.set(cursor);
    for (WorkProcessor<?> processor : workProcessors)
    {
        processor.getSequence().set(cursor);
        executor.execute(processor);
    }
    return ringBuffer;    
}

至此,Disruptor的初始化和启动就完成了。主要是完成了RingBuffer数据结构的初始化、设置消费者以及启动。
后续将继续分享消费者代码。