Disruptor源码分析之Sequenced
Sequenced是对于生产者和RingBuffer之间操作协调的一个抽象概念类,通过Sequenced定义生产者所需要的原子操作,保证序号分配的线程安全性和正确性,下面就对Sequenced进行详细分析
源码分析
Sequenced
Sequenced的主要方法分为三部分:管理、获取地址、发布,
- 管理方法主要是获取存储事件大小和获取可用事件数组空间大小
- 获取地址的方法主要是Next()或tryNext()方法,通过这个方法可用获取可以写入的事件序号
- 发布事件,主要是publish()方法,通过这个方法将完成的事件发布声明出去

Sequenced的实现类主要分为EventSequencer和Sequencer,重点分析com.lmax.disruptor.Sequencer类
Sequencer.java
Sequencer的主要方法分为两类,分别是管理Sequence和创建其他协作类:
管理Sequence的方法主要有添加、移除、判断是否可用等,创建其他协作类的方法主要是创建SequenceBarrier和创建EventPoller
- PS:Sequenced、Sequencer、SequenceBarrier、Sequence的关系

Sequenced是定义序号动作的顶级接口,所有与序号相关的操作都需要实现该接口;
Sequencer是定一操作序号的管理动作的类,它负责管理Sequence,类似于排队模型中负责管理和发放号码的角色
Sequence只是序号,类似于排队模型中的序号
SequenceBarrier是使用序号的角色,类似于排队模型中比较序号的角色
AbstractSequencer
AbstractSequencer是Sequencer的各种实现的基类,负责定义和实现所有Sequencer共享的通用状态和管理逻辑;
主要实现Sequenced的getCursor、getBufferSize、addGatingSequences、removeGatingSequence、getMinimumSequence、newBarrier、newPoller这些公共方法;
AbstractSequencer的基础属性有:
//bufferSize,用于标识数组长度,在初始化过程中就是由RingBuffer设置的长度字段向下传递过来的
protected final int bufferSize;
//消费者的等待策略
protected final WaitStrategy waitStrategy;
//声明序号cursor(光标),即代表消费进度
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//volatile修饰的Sequence数组,用于存储RingBuffer上所有消费者当前的进度标记(Sequence对象)的集合
protected volatile Sequence[] gatingSequences = new Sequence[0];
- getMinimumSequence方法
/**
* 获取当前最小的消费序列
* @see Sequencer#getMinimumSequence()
*/
@Override
public long getMinimumSequence()
{
return Util.getMinimumSequence(gatingSequences, cursor.get());
}
//Util.getMinimumSequence
public static long getMinimumSequence(final Sequence[] sequences, final long minimum)
{
//设置当前最小序列号
long minimumSequence = minimum;
//递归循环比较获取最小的序列号
for (int i = 0, n = sequences.length; i < n; i++)
{
long value = sequences[i].get();
//对比获取更小值
minimumSequence = Math.min(minimumSequence, value);
}
return minimumSequence;
}
SingleProducerSequencer
SingleProducerSequencer在之前已经讲解过它的next()方法了,这里重点对hasAvailableCapacity方法进行分析
这个方法有两个SingleProducerSequencer自身中定义的属性参与计算
//下一个序号
long nextValue = Sequence.INITIAL_VALUE;
//最小消费者进度的本地缓存
long cachedValue = Sequence.INITIAL_VALUE;
cachedValue是本地最小消费进度变量值,由于SingleProducerSequencer是单线程生产者因此不需要用额外的线程安全手动进行保证,在判断是否还有足够容量时只需要判断是否当前的值是否满足,如果当前的值不满足时对cacheValue进行更新;
另外一个方法是sameThread(),这个方法主要是用Map判断当前线程是否生产者线程,这个map的key为singleProducerSequencer,value为currentThread;
MultiProducerSequencer
MultiProducerSequencer 用于 “多个生产者并发发布事件到 RingBuffer” 的场景,负责安全、按顺序地分配序号。
MultiProducerSequencer主要分析三个方法:hasAvailableCapacity、next、tryNext
- hasAvailableCapacity
判断是否还有可用容量的方法,这个方法的实现与SingleProducerSequencer类中的实现基本类似,因为MultiProducerSequencer保证多线程访问并发安全性的是通过next方法实现的,并不是通过hasAvailableCapacity方法实现的;
- next方法
next方法是指定传入所需数组大小后进行申请的的方法;
public long next(final int n)
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
//计算生产者进度,当前生产者进度+n赋值到cursor上,返回原cursor的值
long current = cursor.getAndAdd(n);
//得到写入之后的值
long nextSequence = current + n;
//得到循环点
long wrapPoint = nextSequence - bufferSize;
//得到当前最慢的消费点
long cachedGatingSequence = gatingSequenceCache.get();
//关键点1:判断如果循环点大于消费点说明,本次写入会覆盖,因此需要等待消费位点
//如果最慢的消费位点大于当前生产者位点,说明GatingSequence已过期
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence;
//关键点2:获取最新的消费位点,如果循环点还是大于消费位点,证明本次写入还是会覆盖,继续等待
while (wrapPoint > (gatingSequence = Util.getMinimumSequence(gatingSequences, current)))
{
LockSupport.parkNanos(1L); // TODO, should we spin based on the wait strategy?
}
//将gatingSequenceCache设置成最新的值
gatingSequenceCache.set(gatingSequence);
}
//返回写入之后的值
return nextSequence;
}
关键点1:通过VarHandle来保证多线程访问的安全性,cursor.getAndAdd方法是原子性的,因此将cursor更新后不会存在回滚的可能性,因此生产者需要一直等待有空闲位置来写入数据,这个改动的commit,之前的实现是通过cursor.compareAndSet(),CAS的方式来实现的,在大量高并发场景下,会高频CAS,但是VarHandle.getAndAdd()是原子操作避免了这种场景,同时value还是整个缓冲行,性能更快;
关键点2:是进行两个判断,第一个判断是判断循环点是否大于当前最慢的消费点,上图中的例子所示wrapPoint=14,但是当前最慢的消费点是12,因此满足条件,证明本次写入会将未消费的[12,14]的数据进行覆盖,因此生产者需要等待;
另外一个是当最慢消费者序列大于等于生产者序列时,说明最慢消费组序列号已过期,需要更新后在进行检查;
关键点3:是判断获取最新的消费位点后,如果循环点还是大于消费位点,证明本次写入还是会覆盖,继续等待
next方法当RingBuffer数组没有可用空间时采取的做法是等待;
- tryNext方法
tryNext方法是指定传入所需数组大小后进行申请的的方法,容量不够时直接抛出异常;
do
{
//设置当前生产位点
current = cursor.get();
//设置本次生产更新后的位点
next = current + n;
//容量不够时直接抛出异常
if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
//通过CAS的方式更新
while (!cursor.compareAndSet(current, next));
可以看到这里就使用的是CAS的方式进行获取原始,并且如果获取不到时之前抛出异常;
总结
Sequenced 提供统一的序号分配接口;
Sequencer 负责管理序号和消费者进度;
SingleProducerSequencer 用简单变量实现无锁高效分配;
MultiProducerSequencer 通过 atomic getAndAdd 实现多线程安全、高性能的序号分配。