解读Disruptor系列--解读源码(2)之生产者

之前我们一起分析了Disruptor的初始化和启动代码,接下来我们来分析下生产者的发布代码。还不太了解的同学建议看看我之前发的Disruptor原理翻译和导读文章,尤其是一些名词概念最好要清楚是做什么用的。

1 生产者线程

生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作(注:translate经常是“翻译”的意思,但其实还有“ move from one place or condition to another.”的转移、转换的意思)。
根据同一事件传入参数的多少,可以选择不同接口接收参数。

image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
@Override
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
}
}
// 生产线程0
Thread produceThread0 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){ // 除了下面这行代码,其他都没有关系
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
// 生产线程1
Thread produceThread1 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
produceThread0.start();
produceThread1.start();

在demo中,我们实例化并启动了两个线程,用来生产事件放置到Disruptor中。
接下来我们跟随源码一点点深入。

2 生产事件的整体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Disruptor.java
/**
* Publish an event to the ring buffer. 使用给定的事件翻译器,发布事件
*
* @param eventTranslator the translator that will load data into the event.
* @param arg A single argument to load into the event
*/
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
{
ringBuffer.publishEvent(eventTranslator, arg);
}
之前也讲过,Disruptor这个类是一个辅助类,在发布事件时其实是委托给RingBuffer完成发布操作。
RingBuffer.publishEvent()的逻辑大概分为两个步骤:第一步先占有RingBuffer上的一个可用位置,我们简称为“占坑”;第二步在可用位置发布数据,我们简称为“填坑”。
// RingBuffer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
final long sequence = sequencer.next(); // 第一步 占坑
translateAndPublish(translator, sequence, arg0); // 第二步 填坑
}

其中第二步中,在填坑完毕还要调用Sequencer接口的publish方法对外发布事件。为啥呢?先留个疑问。
在第一步占坑中,首先通过调用Sequencer.next()获取RingBuffer实例下一个能用的序号。
AbstractSequencer作为一个抽象类,实现了Sequencer接口,是单生产者Sequencer和多生产者Sequencer的父类。

3 Disruptor的核心–Sequencer接口

为什么说Sequencer是Disruptor的核心呢?其实这也不是我说的,是Disruptor官方Wiki Introduction上说的:
image.png
Sequencer是用来保证生产者和消费者之间正确、高速传递数据的。我们先来看看以生产者的角度看Sequencer有什么作用。
先来张类图。
Sequencer类图
下边是Sequencer接口及其父接口Cursored、Sequenced 定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Sequencer
/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
public interface Sequencer extends Cursored, Sequenced
{
/**
* Set to -1 as sequence starting point
* 序号开始位置
*/
long INITIAL_CURSOR_VALUE = -1L;
/**
* Claim a specific sequence. Only used if initialising the ring buffer to
* a specific value.
*
* @param sequence The sequence to initialise too.
* 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了
*/
void claim(long sequence);
/**
* Confirms if a sequence is published and the event is available for use; non-blocking.
*
* @param sequence of the buffer to check
* @return true if the sequence is available for use, false if not
* 用非阻塞方式,确认某个序号是否已经发布且事件可用。
*/
boolean isAvailable(long sequence);
/**
* Add the specified gating sequences to this instance of the Disruptor. They will
* safely and atomically added to the list of gating sequences.
*
* @param gatingSequences The sequences to add.
* 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者
*/
void addGatingSequences(Sequence... gatingSequences);
/**
* Remove the specified sequence from this sequencer.
*
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
* 从门控序列中移除指定序列
*/
boolean removeGatingSequence(Sequence sequence);
/**
* Create a new SequenceBarrier to be used by an EventProcessor to track which messages
* are available to be read from the ring buffer given a list of sequences to track.
*
* @param sequencesToTrack
* @return A sequence barrier that will track the specified sequences.
* @see SequenceBarrier
* 消费者使用,用于追踪指定序列(通常是上一组消费者的序列)
*/
SequenceBarrier newBarrier(Sequence... sequencesToTrack);
/**
* Get the minimum sequence value from all of the gating sequences
* added to this ringBuffer.
*
* @return The minimum gating sequence or the cursor sequence if
* no sequences have been added.
* 获取追踪序列中最小的序列
*/
long getMinimumSequence();
/**
* Get the highest sequence number that can be safely read from the ring buffer. Depending
* on the implementation of the Sequencer this call may need to scan a number of values
* in the Sequencer. The scan will range from nextSequence to availableSequence. If
* there are no available values <code>>= nextSequence</code> the return value will be
* <code>nextSequence - 1</code>. To work correctly a consumer should pass a value that
* is 1 higher than the last sequence that was successfully processed.
*
* @param nextSequence The sequence to start scanning from.
* @param availableSequence The sequence to scan to.
* @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
*
* 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence
* 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者
* 应该传递一个比最后成功处理的序列值大1的值。
*/
long getHighestPublishedSequence(long nextSequence, long availableSequence);
<T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}
// Cursored.java
/**
* Implementors of this interface must provide a single long value
* that represents their current cursor value. Used during dynamic
* add/remove of Sequences from a
* {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
* 游标接口,用于获取生产者当前游标位置
*/
public interface Cursored
{
/**
* Get the current cursor value.
*
* @return current cursor value
*/
long getCursor();
}
// Sequenced.java
public interface Sequenced
{
/**
* The capacity of the data structure to hold entries.
*
* @return the size of the RingBuffer.
* 获取环形缓冲的大小
*/
int getBufferSize();
/**
* Has the buffer got capacity to allocate another sequence. This is a concurrent
* method so the response should only be taken as an indication of available capacity.
*
* @param requiredCapacity in the buffer
* @return true if the buffer has the capacity to allocate the next sequence otherwise false.
* 判断是否含有指定的可用容量
*/
boolean hasAvailableCapacity(final int requiredCapacity);
/**
* Get the remaining capacity for this sequencer.
*
* @return The number of slots remaining.
* 剩余容量
*/
long remainingCapacity();
/**
* Claim the next event in sequence for publishing.
*
* @return the claimed sequence value
* 生产者发布时,申请下一个序号
*/
long next();
/**
* Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
* requires a little care and some math.
* <pre>
* int n = 10;
* long hi = sequencer.next(n);
* long lo = hi - (n - 1);
* for (long sequence = lo; sequence <= hi; sequence++) {
* // Do work.
* }
* sequencer.publish(lo, hi);
* </pre>
*
* @param n the number of sequences to claim
* @return the highest claimed sequence value
* 申请n个序号,用于批量发布
*/
long next(int n);
/**
* Attempt to claim the next event in sequence for publishing. Will return the
* number of the slot if there is at least <code>requiredCapacity</code> slots
* available.
*
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next()的非阻塞模式
*/
long tryNext() throws InsufficientCapacityException;
/**
* Attempt to claim the next n events in sequence for publishing. Will return the
* highest numbered slot if there is at least <code>requiredCapacity</code> slots
* available. Have a look at {@link Sequencer#next()} for a description on how to
* use this method.
*
* @param n the number of sequences to claim
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next(n)的非阻塞模式
*/
long tryNext(int n) throws InsufficientCapacityException;
/**
* Publishes a sequence. Call when the event has been filled.
*
* @param sequence
* 数据填充后,发布此序号
*/
void publish(long sequence);
/**
* Batch publish sequences. Called when all of the events have been filled.
*
* @param lo first sequence number to publish
* @param hi last sequence number to publish
* 批量发布序号
*/
void publish(long lo, long hi);
}

3.1 单生产者发布事件

下边先看使用单生产者SingleProducerSequencer具体是怎么占坑的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// SingleProducerSequencer.java
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// 复制上次申请完毕的序列值
long nextValue = this.nextValue;
// 加n,得到本次需要申请的序列值,单个发送n为1
long nextSequence = nextValue + n; // 本次要验证的值
// 可能发生绕环的点,本次申请值 - 一圈长度
long wrapPoint = nextSequence - bufferSize; // 400米跑到,小明跑了599米,小红跑了200米。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。
long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者
// wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。
// 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧?
// 没有空坑位,将进入循环等待。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 只有当消费者消费,向前移动后,才能跳出循环
// 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 环形等待的消费者
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
// 当消费者向前消费后,更新缓存的最小序号
this.cachedValue = minSequence;
}
// 将成功申请的序号赋值给对象实例变量
this.nextValue = nextSequence;
return nextSequence;
}

next()占坑成功将会返回坑位号,回到RingBuffer的publishEvent方法,执行translateAndPublish方法,进行填坑和发布操作。

1
2
3
4
5
6
7
8
9
10
11
12
// RingBuffer.java
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
try
{
translator.translateTo(get(sequence), sequence);
}
finally
{
sequencer.publish(sequence);
}
}

translator参数用户定义的对EventTranslator接口的实现对象。
上文已经介绍过EventTranslator接口,除EventTranslator外,还有EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg。功能是将给定的数据填充到指定坑位的对象(因为RingBuffer上已经预先分配了对象)上,只不过分别对应不同参数。简单看下EventTranslatorOneArg接口定义。

1
2
3
4
5
6
7
8
9
10
11
public interface EventTranslatorOneArg<T, A>
{
/**
* Translate a data representation into fields set in given event
*
* @param event into which the data should be translated.
* @param sequence that is assigned to event.
* @param arg0 The first user specified argument to the translator
*/
void translateTo(final T event, long sequence, final A arg0);
}

在放好数据后,就可以调用sequencer的publish方法发布对象了。首先是更新当前游标,更新完毕再通知等待中的消费者,消费者将继续消费。关于消费者的等待策略,后续还会讲到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// SingleProducerSequencer.java
@Override
public void publish(long sequence)
{ // 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费
cursor.set(sequence);
// 除signalAllWhenBlocking外都是空实现
waitStrategy.signalAllWhenBlocking();
}
// BlockingWaitStrategy.java
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}

3.2 插播Disruptor中的高效AtomicLong–Sequence

注意那个cursor,这个cursor可不是简单的long类型,而是Disruptor内部实现的Sequence类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{ // value的前后各有7个long变量,用于缓存行填充,前后各7个保证了不管怎样,当64位的缓存行加载时value,不会有其他变量共享缓存行,从而解决了伪共享问题
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
* Sequence可以按照AtomicLong来理解,除了Sequence消除了伪共享问题,更加高效
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
/**
* Create a sequence initialised to -1.
*/
public Sequence()
{
this(INITIAL_VALUE);
}
/**
* Create a sequence with a specified initial value.
*
* @param initialValue The initial value for this sequence.
*/
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
/**
* Perform a volatile read of this sequence's value.
*
* @return The current value of the sequence.
*/
public long get()
{
return value;
}
/**
* Perform an ordered write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* store.
*
* @param value The new value for the sequence.
* 此方法等同于AtomicLong#lazySet(long newValue),
* 和直接修改volatile修饰的value相比,非阻塞,更高效,但更新的值会稍迟一点看到
*/
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* Performs a volatile write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* write and a Store/Load barrier between this write and any
* subsequent volatile read.
*
* @param value The new value for the sequence.
*/
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
* Perform a compare and set operation on the sequence.
*
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
/**
* Atomically increment the sequence by one.
*
* @return The value after the increment
*/
public long incrementAndGet()
{
return addAndGet(1L);
}
/**
* Atomically add the supplied value.
*
* @param increment The value to add to the sequence.
* @return The value after the increment.
*/
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}

这个Sequence其实相当于AtomicLong,最大的区别在于Sequence解决了伪共享问题。另外Sequence#set相当于AtomicLong#lazySet。
致此,使用单生产者发布事件的流程就完成了。

3.3 多生产者发布事件

如果使用的是多生产者,占坑则调用MultiProducerSequencer.next()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get(); // 当前游标值,初始化时是-1
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}

可以发现,多生产者模式占坑和放置数据的逻辑和单生产者模式区别不大。区别主要是最后调用publish发布坑位的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// MultiProducerSequencer.java
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBuffer
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer; // 初始全是-1
private final int indexMask;
private final int indexShift;
@Override
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作
}
@Override
public void publish(long lo, long hi)
{
for (long l = lo; l <= hi; l++)
{
setAvailable(l);
}
waitStrategy.signalAllWhenBlocking();
}
/
* availableBuffer设置可用标志
* 主要原因是避免发布者线程之间共享一个序列对象。
* 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据)
*/
private void setAvailable(final long sequence)
{ // calculateIndex 求模%, calculateAvailabilityFlag 求除/
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{ // 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress
long bufferAddress = (index * SCALE) + BASE;
// availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
private int calculateAvailabilityFlag(final long sequence)
{ // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence)
{ // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1
return ((int) sequence) & indexMask;
}

对比SingleProducerSequencer的publish,MultiProducerSequencer的publish没有设置cursor,而是将内部使用的availableBuffer数组对应位置进行设置。availableBuffer是一个记录RingBuffer槽位状态的数组,通过对序列值sequence取ringBuffer大小的模,获得槽位号,再通过与ringBuffer大小相除,获取序列值所在的圈数,进行设置。这里没有直接使用模运算和触发运算,而使用更高效的位与和右移操作。
其他的操作,MultiProducerSequencer和SingleProducerSequencer类似,就不再赘述了。

4 剖析SingleProducerSequencer设计

上面已经把Disruptor的主要发布事件流程过了一遍,好奇如你,必然觉得意犹未尽。如果你没有,那肯定还是我讲的有问题,不代表Disruptor本身的精彩。
接下来说一说SingleProducerSequencer的设计。从中我们可以看到Disruptor解决伪共享问题的实际代码。
SingleProducerSequencer继承了抽象类SingleProducerSequencerFields,SingleProducerSequencerFields又继承了抽象类SingleProducerSequencerPad。其中SingleProducerSequencerFields是实际放置有效实例变量的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// SingleProducerSequencer.java
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
/**
* Set to -1 as sequence starting point
*/
protected long nextValue = Sequence.INITIAL_VALUE; // 生产者申请的下一个序列值
protected long cachedValue = Sequence.INITIAL_VALUE; // 缓存上一次比较的门控序列组和next的较小值(最慢消费者序列值)
}
/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
* <p>
* <p>Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
* to {@link Sequencer#publish(long)} is made.
*/
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
// ...省略
}

可以发现,在两个实例变量前后各有7个long型变量。为什么这样做呢?对CPU缓存有了解的同学一定知道的……对,就是为了解决伪共享问题。
CPU在加载内存到缓存行时,一个缓存行中最多只有这两个有效变量,最大限度地避免了因伪共享问题,导致缓存失效,而造成性能损失。
为了更清晰地阐述这个道理,我们尝试看一下SingleProducerSequencer实例的内存布局。
使用HSDB(HotSpot Debugger,可通过 java -cp .;”%JAVA_HOME%/lib/sa-jdi.jar” sun.jvm.hotspot.HSDB 启动)跟踪demo对应的已断点的HotSpot进程,从Object Histogram对象图中筛选出SingleProducerSequencer实例,并通过Inspector工具对SingleProducerSequencer实例进行查看。
本例中,0x00000000828026f8为com.lmax.disruptor.SingleProducerSequencer实例在JVM中的内存起始位置。以此内存地址通过mem命令查看后续的30个内存地址内容。为啥要30个呢?其实20个就够了,可以看到”Object Histogram”中SingleProducerSequencer实例的size是160字节,mem打印一行表示一字长,对应到我本机的64位机器即8字节,所以长度选择大于等于160/8=20就可以看到SingleProducerSequencer实例的内存布局全貌。

hsdb

左侧红框中的地址0x0000000082802750和0x0000000082802758分别对应右侧红框中的nextValue和cachedValue两个实例变量。而在它们前后,各有7个连续的long型整数0。CPU在加载连续内存到缓存时,以缓存行为单位。缓存行通常为64B,通过占位,可以让实际变量独享一个缓存行。从而解决了伪共享问题。
缓存行查看:linux可使用以下命令查看。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
windows可使用CPU-Z查看。

cpu-z

附录:JAVA对象的内存布局相关知识

最后再说点Java对象的内存布局,和本文主题关系不大,可以略过。
HotSpot对象内存布局
HotSpot中一个对象(非数组)的内存布局大概是这样的:对象头(Mark Word + klass pointer) + 实际数据 + 为了保持8字节对齐的填充。其中对象头的Mark Word和klass pointer长度各为一机器字(machine-word),即32位机器对应32bit(4字节),64位机器对应64bit(8字节)。如64位JVM开启了指针压缩,klass pointer将压缩到4字节。
查看是否开启了指针压缩
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即为开启,或jinfo -flags pid 查看全部选项。
此例中返回了-XX:+UseCompressedOops,表示开启了指针压缩(jdk1.8默认开启)。此时普通类型指针将被压缩为4字节。
下面通过SingleProducerSequencer举一个实际的例子。
SingleProducerSequencer属性

image.png

使用HSDB Inspector查看实例。

image.png

查看对象内存内容

hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存储对象运行时数据,如哈希码、GC分代年龄、锁状态标志、线程持有锁、偏向线程ID、偏向时间戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38。由于开启了指针压缩,低4位表示klass pointer,由于使用的JDK1.8,klass metadata保存在Metadataspace中。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括当前行的以下7行 SingleProducerSequencerPad中定义的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定义的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000

计算此对象的Shallow Heap size 和 Retained Heap size
可以发现此对象一共占用208=160B内存,此值即Shallow Heap size。也可以手工计算:mark_word[8] + klass_pointer[4] + 2 ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留内存大小Retained Heap size = Shallow Heap size + (当前对象的引用对象排除GC Root引用对象)的Shallow Heap size。
这里涉及到的引用为:cursor 0x00000000828028e0 ,waitStrategy 0x0000000082809e98 ,gatingSequences 0x000000008284b390。

分别使用revptrs命令查找反向引用,发现只有gatingSequences为此对象唯一引用,故计算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。这里由于开启了压缩指针,引用指针占用4B,此时占用20B,需要填充4B补满24B。故对象的Retained Heap size为160+24=184。

hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0

数组对象的Shallow Heap size=引用对象头大小12字节+存储数组长度的空间大小4字节+数组的长度*数组中对象的Shallow Heap size+padding大小

最后还有个问题,我们知道从Java8开始,Metaspace替代之前的PermGen存储元信息。使用Java7的HSDB是可以通过universe命令查看到PermGen信息的,而Java8就查不到Metaspace信息。

Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]

Disruptor生产者相关源码就分享到这,后续将对消费者一探究竟。


参考资料

  1. Java对象内存布局(推荐,写的很棒) http://www.jianshu.com/p/91e398d5d17c
  2. JVM——深入分析对象的内存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
  3. 借HSDB来探索HotSpot VM的运行时数据 http://rednaxelafx.iteye.com/blog/1847971
  4. markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
  5. Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
  6. AtomicLong.lazySet是如何工作的? http://ifeve.com/how-does-atomiclong-lazyset-work/
  7. 《深入理解Java虚拟机》2.3.2 对象的内存布局