Disruptor源码分析之Sequenced


Disruptor源码分析之Sequenced

Sequenced是对于生产者和RingBuffer之间操作协调的一个抽象概念类,通过Sequenced定义生产者所需要的原子操作,保证序号分配的线程安全性和正确性,下面就对Sequenced进行详细分析

源码分析

Sequenced

Sequenced的主要方法分为三部分:管理获取地址发布,

  1. 管理方法主要是获取存储事件大小和获取可用事件数组空间大小
  2. 获取地址的方法主要是Next()或tryNext()方法,通过这个方法可用获取可以写入的事件序号
  3. 发布事件,主要是publish()方法,通过这个方法将完成的事件发布声明出去

Sequenced的实现类图

Sequenced的实现类主要分为EventSequencerSequencer,重点分析com.lmax.disruptor.Sequencer类
Sequencer.java

Sequencer的主要方法分为两类,分别是管理Sequence和创建其他协作类:
管理Sequence的方法主要有添加、移除、判断是否可用等,创建其他协作类的方法主要是创建SequenceBarrier和创建EventPoller

  • PS:Sequenced、Sequencer、SequenceBarrier、Sequence的关系
    Sequenced、Sequencer、SequenceBarrier、Sequence的关系
    Sequenced是定义序号动作的顶级接口,所有与序号相关的操作都需要实现该接口;
    Sequencer是定一操作序号的管理动作的类,它负责管理Sequence,类似于排队模型中负责管理和发放号码的角色
    Sequence只是序号,类似于排队模型中的序号
    SequenceBarrier是使用序号的角色,类似于排队模型中比较序号的角色

AbstractSequencer

AbstractSequencer

AbstractSequencer是Sequencer的各种实现的基类,负责定义和实现所有Sequencer共享的通用状态和管理逻辑;
主要实现Sequenced的getCursor、getBufferSize、addGatingSequences、removeGatingSequence、getMinimumSequence、newBarrier、newPoller这些公共方法;

AbstractSequencer的基础属性有:

//bufferSize,用于标识数组长度,在初始化过程中就是由RingBuffer设置的长度字段向下传递过来的
protected final int bufferSize;
//消费者的等待策略
protected final WaitStrategy waitStrategy;
//声明序号cursor(光标),即代表消费进度
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
//volatile修饰的Sequence数组,用于存储RingBuffer上所有消费者当前的进度标记(Sequence对象)的集合
protected volatile Sequence[] gatingSequences = new Sequence[0];
  • getMinimumSequence方法

/**
 * 获取当前最小的消费序列
 * @see Sequencer#getMinimumSequence()
 */
@Override
public long getMinimumSequence()
{
    return Util.getMinimumSequence(gatingSequences, cursor.get());
}

//Util.getMinimumSequence
public static long getMinimumSequence(final Sequence[] sequences, final long minimum)
{
    //设置当前最小序列号
    long minimumSequence = minimum;
    //递归循环比较获取最小的序列号
    for (int i = 0, n = sequences.length; i < n; i++)
    {
        long value = sequences[i].get();
        //对比获取更小值
        minimumSequence = Math.min(minimumSequence, value);
    }

    return minimumSequence;
}

SingleProducerSequencer

SingleProducerSequencer

SingleProducerSequencer在之前已经讲解过它的next()方法了,这里重点对hasAvailableCapacity方法进行分析

hasAvailableCapacity()

这个方法有两个SingleProducerSequencer自身中定义的属性参与计算

//下一个序号
long nextValue = Sequence.INITIAL_VALUE;
//最小消费者进度的本地缓存
long cachedValue = Sequence.INITIAL_VALUE;

cachedValue是本地最小消费进度变量值,由于SingleProducerSequencer是单线程生产者因此不需要用额外的线程安全手动进行保证,在判断是否还有足够容量时只需要判断是否当前的值是否满足,如果当前的值不满足时对cacheValue进行更新;

另外一个方法是sameThread(),这个方法主要是用Map判断当前线程是否生产者线程,这个map的key为singleProducerSequencer,value为currentThread;

MultiProducerSequencer

MultiProducerSequencer 用于 “多个生产者并发发布事件到 RingBuffer” 的场景,负责安全、按顺序地分配序号。

MultiProducerSequencer主要分析三个方法:hasAvailableCapacitynexttryNext

  • hasAvailableCapacity
    判断是否还有可用容量的方法,这个方法的实现与SingleProducerSequencer类中的实现基本类似,因为MultiProducerSequencer保证多线程访问并发安全性的是通过next方法实现的,并不是通过hasAvailableCapacity方法实现的;

hasAvailableCapacity

  • next方法
    next方法是指定传入所需数组大小后进行申请的的方法;
public long next(final int n)
{
    if (n < 1 || n > bufferSize)
    {
        throw new IllegalArgumentException("n must be > 0 and < bufferSize");
    }

    //计算生产者进度,当前生产者进度+n赋值到cursor上,返回原cursor的值
    long current = cursor.getAndAdd(n);
    //得到写入之后的值
    long nextSequence = current + n;
    //得到循环点
    long wrapPoint = nextSequence - bufferSize;
    //得到当前最慢的消费点
    long cachedGatingSequence = gatingSequenceCache.get();

    //关键点1:判断如果循环点大于消费点说明,本次写入会覆盖,因此需要等待消费位点
    //如果最慢的消费位点大于当前生产者位点,说明GatingSequence已过期
    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
    {
        long gatingSequence;
        //关键点2:获取最新的消费位点,如果循环点还是大于消费位点,证明本次写入还是会覆盖,继续等待
        while (wrapPoint > (gatingSequence = Util.getMinimumSequence(gatingSequences, current)))
        {
            LockSupport.parkNanos(1L); // TODO, should we spin based on the wait strategy?
        }
        //将gatingSequenceCache设置成最新的值
        gatingSequenceCache.set(gatingSequence);
    }

    //返回写入之后的值
    return nextSequence;    
}

MultiProducerSequencer.next

关键点1:通过VarHandle来保证多线程访问的安全性,cursor.getAndAdd方法是原子性的,因此将cursor更新后不会存在回滚的可能性,因此生产者需要一直等待有空闲位置来写入数据,这个改动的commit,之前的实现是通过cursor.compareAndSet(),CAS的方式来实现的,在大量高并发场景下,会高频CAS,但是VarHandle.getAndAdd()是原子操作避免了这种场景,同时value还是整个缓冲行,性能更快;

关键点2:是进行两个判断,第一个判断是判断循环点是否大于当前最慢的消费点,上图中的例子所示wrapPoint=14,但是当前最慢的消费点是12,因此满足条件,证明本次写入会将未消费的[12,14]的数据进行覆盖,因此生产者需要等待;
另外一个是当最慢消费者序列大于等于生产者序列时,说明最慢消费组序列号已过期,需要更新后在进行检查;

关键点3:是判断获取最新的消费位点后,如果循环点还是大于消费位点,证明本次写入还是会覆盖,继续等待

next方法当RingBuffer数组没有可用空间时采取的做法是等待;

  • tryNext方法
    tryNext方法是指定传入所需数组大小后进行申请的的方法,容量不够时直接抛出异常;
do
{
    //设置当前生产位点
    current = cursor.get();
    //设置本次生产更新后的位点
    next = current + n;
    //容量不够时直接抛出异常
    if (!hasAvailableCapacity(gatingSequences, n, current))
    {
        throw InsufficientCapacityException.INSTANCE;
    }
}
//通过CAS的方式更新
while (!cursor.compareAndSet(current, next));

可以看到这里就使用的是CAS的方式进行获取原始,并且如果获取不到时之前抛出异常;

总结

Sequenced 提供统一的序号分配接口;
Sequencer 负责管理序号和消费者进度;
SingleProducerSequencer 用简单变量实现无锁高效分配;
MultiProducerSequencer 通过 atomic getAndAdd 实现多线程安全、高性能的序号分配。


  TOC