Disruptor源码分析之RingBuffer


Disruptor源码分析之RingBuffer

RingBuffer是Disruptor的核心组件之一,它是一个高效的环形缓冲区,用于在生产者和消费者之间传递事件对象,RingBuffer通过预分配内存和无锁设计实现了高性能的数据传输,本文将深入分析RingBuffer的源码实现,包括其继承结构、属性、初始化过程以及读写逻辑。

RingBuffer类

RingBuffer继承图

可用看到RingBuffer继承实现RingBufferPadCursoredSequenced,其中RingBufferPad是对RingBuffer自身结构的描述,Sequenced和Cursored接口定义RingBuffer的行为;

RingBufferPad:核心抽象

可以看到RingBuffer继承自RingBufferPad,其中RingBufferPad的作用是进行缓存行填充,防止伪共享,RingBufferPad对于缓存行填充的实现有前后两个版本,第一个版本是使用Long类型的字段进行填充,第二个版本是使用byte数组进行填充,具体代码如下:
Commit bd5d7d8

为什么要从Long类型改为byte数组呢?
因为Long类型的字段在JDK15之后由于内存布局调整,导致Long类型的继承布局结构可能会被优化,导致缓存行填充失效,而byte数组可以确保填充字段不会被优化掉,从而保证缓存行填充的效果,因此在使用padding trick技巧时需要关注不同JDK版本对继承字段的优化;

Cursored:序列接口

Cursored用于声明当前"当前最大序列号"的抽象接口,当RingBuffer继承Cursored时,需要实现getCursor()方法,用于获取当前最大序列号;

Sequenced:基本操作接口

Sequenced是RingBuffer的核心父类,它定义了RingBuffer的基本操作接口,包括获取容量、发布事件、检查是否有可用事件等方法,定义了其他类如何与RingBuffer进行交互的方式;

RingBuffer的属性

RingBuffer的核心属性都是放到RingBufferFields中的分别是:

  1. indexMask:用于计算核心环形缓冲区的索引的掩码,大小为RingBuffer长度减1,计算索引方式为:index = num & indexMask,等效与num % bufferSize,通过位运算可以提高计算效率;
    例如,RingBuffer的长度为4,则indexMask = 4 - 1 = 3(二进制为0011),当num=6(二进制为0110)时,index = 0011 & 0110 = 0010 = 2,所以索引为2;
  2. entries:核心环形缓冲区,用于存储事件对象的数组,有界数组;
  3. sequencer:生产者序列号对象,用于管理生产者的序列
    sequencer是Disruptor的核心调度器,它其实是继承Cursored(状态管理)和Sequenced(基本操作)两个接口的抽象类,它定义了生产者如何发布事件,以及如何与消费者进行协调,其中它有两个实现类:
    • SingleProducerSequencer:单生产者实现,适用于只有一个生产者的场景,它通过简单的自旋锁来实现线程间的协调,性能较高;
    • MultiProducerSequencer:多生产者实现,适用于有多个生产者的场景,它使用CAS操作来实现线程间的协调,从而避免锁的使用,性能稍低于单生产者实现;
  4. bufferSize:环形缓冲区的大小,必须是2的幂次方;

abstract class RingBufferFields<E> extends RingBufferPad
{
    //数组前后空闲填充数量,防止伪共享
    private static final int BUFFER_PAD = 32;

    //索引掩码
    private final long indexMask;
    //有界事件数组
    private final E[] entries;
    //数组大小
    protected final int bufferSize;
    //控制器
    protected final Sequencer sequencer;

}

RingBuffer初始化过程

RingBuffer初始化主要是通过Disruptor类来完成的,Disruptor.create()方法会创建RingBuffer对象,具体代码如下:

Disruptor.create

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy)
{
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        threadFactory);
}

RingBuffer.create()

public static <T> RingBuffer<T> create(
        final ProducerType producerType,
        final EventFactory<T> eventFactory,
        final int bufferSize,
        final WaitStrategy waitStrategy)
{
    final Sequencer sequencer;
    switch (producerType)
    {
        case SINGLE:
            //单生产者模式
            sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
            break;
        case MULTI:
            //多生产者模式
            sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
            break;
        default:
            throw new IllegalArgumentException("Unknown producer type: " + producerType);
    }

    return new RingBuffer<>(sequencer, eventFactory);
}

可以看到buffersize会被用于创建Sequencer对象,然后传递给RingBuffer的构造方法,在RingBuffer的构造方法中,会使用Sequencer对象中的buffersize属性来创建entries数组,具体代码如下:

RingBufferFields(
        final EventFactory<E> eventFactory,
        final Sequencer sequencer)
    {
        this.sequencer = sequencer;
        //使用sequencer中的BufferSize作为数组长度
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        //创建事件数组
        this.entries = (E[]) new Object[bufferSize + 2 * BUFFER_PAD];
        fill(eventFactory);
    }

在创建entries数组时,可以看到使用bufferSize加上了2倍的BUFFER_PAD,其中BUFFER_PAD是为了进行缓存行填充,防止伪共享;然后就进行了fill操作,用于初始化entries数组,具体代码如下:

private void fill(final EventFactory<E> eventFactory)
{
    for (int i = 0; i < bufferSize; i++)
    {
        //对数组进行填充,可以看到前BUFFER_PAD和后BUFFER_PAD的位置没有进行填充
        entries[i + BUFFER_PAD] = eventFactory.newInstance();
    }
}

对空数组进行填充使用EventFactory创建事件对象,eventFactory是用户自定义的,用于创建事件对象的工厂接口,用户可以实现自己的EventFactory来创建不同类型的事件对象;

小结

RingBuffer的初始化主要是通过Disruptor类来完成的,在创建RingBuffer时会根据生产者类型(单生产者或者多生产者)创建不同的Sequencer对象,然后传递给RingBuffer的构造方法,在构造方法中会使用Sequencer中的bufferSize来创建entries数组,并使用EventFactory进行初始化填充,同时还会在数组中使用空的位置防止伪共享;

RingBuffer的读写逻辑

RingBuffer的读写逻辑是整个Disruptor的关键,主要是通过Sequencer来实现的,生产者通过Sequencer来发布事件,消费者通过Sequencer来获取可用事件;

写入流程

生产者写入事件的流程主要包括以下几个步骤:

  1. 获取下一个可用的序列号:生产者通过调用sequencer.next()
  2. 通过第一步获取的序列号获取事件对象:生产者通过ringBuffer.get(sequence)获取对应的事件对象;
  3. 填充事件对象:生产者对获取到的事件对象进行填充;
  4. 发布事件:生产者通过sequencer.publish(sequence)将事件发布出去;

可以参考:Disruptor1P1C

根据这个流程依次进行分析:

  • 获取下一个可用序号的

Sequencer#next方法是实现Sequenced接口的next方法,有两种不同的实现,分别是SingleProducerSequencer和MultiProducerSequencer,先对SingleProducerSequencer进行分析:

SingleProducerSequencer#next


@Override
 public long next(final int n)
 {
     //判断线程和生产者是否是最开始的匹配关系,首次获取nextSeq时,会将两者之间存到Map中
     assert sameThread() : "Accessed by two threads - use ProducerType.MULTI!";

     //检测是否超过有效索引长度
     if (n < 1 || n > bufferSize)
     {
         throw new IllegalArgumentException("n must be > 0 and < bufferSize");
     }

     //this.nextValue作为上一次写入索引
     long nextValue = this.nextValue;

     //关注点1:下一次索引+n
     long nextSequence = nextValue + n;

     //关注点2:wrapPoint:理论上的最晚可以消费的索引地址
     long wrapPoint = nextSequence - bufferSize;
     //获取缓存下来的最小消费者序号
     long cachedGatingSequence = this.cachedValue;

     //关注点3:判断nextIndex是否超过当前最小的消费者序号
     if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
     {
         //重新设置内存屏障
         cursor.setVolatile(nextValue);  // StoreLoad fence

         //设置局部变量用于暂存最慢的消费索引
         long minSequence;

         //关注点4:循环等待,生产者等待最慢的消费者消费出空闲的位置
         while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
         {
             LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
         }
         //重新设置位置
         this.cachedValue = minSequence;
     }

     this.nextValue = nextSequence;

     return nextSequence;
 }

上述代码中的四个关注点参考这个图:
获取下一个位置

在获取下一个可用位置时,由于是单线程写入,因此在校验过是单线程写入后,后续主要做的事情就是在保证有空闲位置时,直接返回下一个可用位置,如果没有空闲位置,则通过循环等待的方式等待消费者消费出空闲位置;

ps:循环等待是通过LockSupport.parkNanos(1L);这种方式实现的;

  • MultiProducerSequencer#next
    @TODO 待补充

  • 获取对应的事件对象

获取事件对象是通过RingBuffer#get方法实现的,具体代码如下:


@Override
public E get(final long sequence)
{
    return elementAt(sequence);
}

protected final E elementAt(final long sequence)
{
    //先加数组前BUFFER_PAD位插入的空白,在进行位运算返回事件
    return entries[BUFFER_PAD + (int) (sequence & indexMask)];
}

获取事件很简单,直接获取entries数组中对应位置的事件对象即可,需要注意的是由于entries数组前后有BUFFER_PAD个空白位置,因此在获取时需要加上BUFFER_PAD;

获取完成事件后,对事件进行填充,这个过程是用户自行实现的,填充完成后,需要发布事件通知消费者;

  • 发布事件

RingBuffer的pushlish方法是调用Sequencer的publish方法实现的,具体代码如下:

@Override
public void publish(final long sequence)
{
    sequencer.publish(sequence);
}
  • Sequencer#publish
    Sequencer的publish方法同样有两个实现,分别是SingleProducerSequencer和MultiProducerSequencer,先对SingleProducerSequencer进行分析:

  • SingleProducerSequencer#publish(long)

@Override
public void publish(final long sequence)
{
    //设置最大序列号
    cursor.set(sequence);
    //通知消费者等待策略唤醒
    waitStrategy.signalAllWhenBlocking();
}

cursor.set(sequence)的作用是发布当前事件的序列号,类似于声明该事件已处理完成,消费者可以开始消费该事件,cursor对象是Sequence的一个实例对象,cursor.set(sequence)方法会将cursor的值设置为传入的sequence值,表示当前已发布的最大序列号;

public Sequence(final long initialValue)
{
    //写屏障,保证写操作不会被指令重排序
    VarHandle.releaseFence();
    this.value = initialValue;
}

调用waitStrategy.signalAllWhenBlocking()方法通知等待策略唤醒消费者线程,从而让消费者开始消费事件;

signalAllWhenBlocking方法不同的消费者有不同的实现,例如BusySpinWaitStrategy从不释放CPU,也不存在唤醒
com.lmax.disruptor.BusySpinWaitStrategy#signalAllWhenBlocking

BlockingWaitStrategy使用信号量mutex来控制释放CPU时,对于signalAllWhenBlocking需要进行notifyAll操作

com.lmax.disruptor.BlockingWaitStrategy#signalAllWhenBlocking

@Override
public void signalAllWhenBlocking()
{
    synchronized (mutex)
    {
        mutex.notifyAll();
    }
}

读取流程

先从消费者如何注册开始分析,消费者注册分为两种注册方式分别是handleEventsWith()注册广播消息处理器和handleEventsWithWorkerPool注册事件消费组的方式;

  • 广播模式
    广播模式指的是消息会被每一个注册的消费器处理一次,需要继承EventHandler接口,实现onEvent方法;

  • 事件消费组模式
    事件消费模式指的是消息会被事件消费组中的某一个消费者处理一次,需要实现WorkHandler接口,实现onEvent方法;
    ps:WorkerPool的方式在Disruptor中已被废弃,建议使用EventHandler的方式;

下面基于disruptor4.0版本对EventHandler进行分析;

  1. 注册流程
    Disruptor#handleEventsWith() -> createEventProcessors()

EventHandlerGroup<T> createEventProcessors(
            final Sequence[] barrierSequences,
            final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();

        //每个消费者一个Sequence
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

        //依次处理EventHandler
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            //取出EventHandler包装成BatchEventProcessor
            final EventHandler<? super T> eventHandler = eventHandlers[i];

            final BatchEventProcessor<T> batchEventProcessor =
                    new BatchEventProcessorBuilder().build(ringBuffer, barrier, eventHandler);
            //设置异常处理器
            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }

            //核心关键点1:用consumerRepository来存储EventProcessor
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            //对序列数组进行初始化
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        //核心关键点2:更新消费者门闩,防止生产者将未消费位置覆盖
        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
    }

这个方法中有两个地方值得注意,第一个地方是在于consumerRepository,用于保存消费者对象的容器,内部是通过Map的方式进行实现的

  • ConsumerRepository

class ConsumerRepository
{
    private final Map<EventHandlerIdentity, EventProcessorInfo> eventProcessorInfoByEventHandler =
        new IdentityHashMap<>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence =
        new IdentityHashMap<>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<>();

    //省略......
}

PS:IdentityHashMap中的key是比较对象引用地址

第二个关键点是updateGatingSequencesForNextInChain方法,这个方法的作用是在于防止生产者提前将未消费的事件覆盖掉;

private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
    //消费者处理序列长度大于0时,才进行处理
    if (processorSequences.length > 0)
    {
        // 1. 把新的消费者序列 (B) 加入 RingBuffer 的监控名单
        ringBuffer.addGatingSequences(processorSequences);
        // 2. 把旧的屏障序列 (A) 从监控名单移除
        // 因为 B 依赖 A,所以 B 肯定比 A 慢,RingBuffer 只要盯着 B 就够了
        for (final Sequence barrierSequence : barrierSequences)
        {
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        // 3. 同时也告诉这个消费者组,下次如果再有人接在 B 后面(比如 .then(C)),
        // B 就变成了由于旧的序列,需要被移除
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

因为disruptor支持消费者之间相互依赖进行消费,因此对于有相互依赖关系的两个消费者,例如A消费者完成在then B消费者这种场景,只需要判断B消费者是否完成消费即可,如果B完成表示A肯定完成,因此只需要监控B消费者的Sequences即可;

第三步设置处理链结束的方法是EventProcessorInfo#markAsUsedInBarrier
将endOfChain设置未false即可;

  1. 启动消费者流程

disruptor的启动方法是[Disruptor#start],这个方法会调用ConsumerRepository#startAll

public void startAll(final ThreadFactory threadFactory)
{
    consumerInfos.forEach(c -> c.start(threadFactory));
}

EventProcessorInfo#start

public void start(final ThreadFactory threadFactory)
{
    //通过线程工厂创建一个指定好任务的线程
    final Thread thread = threadFactory.newThread(eventprocessor);
    if (null == thread)
    {
        throw new RuntimeException("Failed to create thread to run: " + eventprocessor);
    }

    //立即启动线程执行任务
    thread.start();
}

通过这一个步骤又和注册consumerhandler关联到一起了,注册时将eventHandler包装成BatchEventProcessor,ConsumerInfo启动后会执行BatchEventProcessor子类的run方法,下面以BatchEventProcessor来进行分析:
BatchEventProcessor.run()方法主要做的事为:清除屏障标识、开始循环处理事件


private void processEvents()
   {
       T event = null;
       //设置需要处理的下一个序列号
       long nextSequence = sequence.get() + 1L;

       while (true)
       {
           final long startOfBatchSequence = nextSequence;
           //外层try-catch处理系统异常
           try
           {
               //内层try-catch处理可重试异常
               try
               {
                   //关键点1:检测是否可以进行消费:是否有可消费的消息,是否前置消费处理器已处理完成
                   //返回的最大的可消费位置
                   final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                   //设置单次消费的最大限制
                   final long endOfBatchSequence = min(nextSequence + batchLimitOffset, availableSequence);

                   //判断是否有消费,有消费时通知消费事件
                   if (nextSequence <= endOfBatchSequence)
                   {
                       //通知开始处理
                       eventHandler.onBatchStart(endOfBatchSequence - nextSequence + 1, availableSequence - nextSequence + 1);
                   }

                   //循环开始处理
                   while (nextSequence <= endOfBatchSequence)
                   {
                       //关键点2:从RingBuffer数组中获取元素
                       event = dataProvider.get(nextSequence);
                       //通知事件
                       eventHandler.onEvent(event, nextSequence, nextSequence == endOfBatchSequence);
                       nextSequence++;
                   }

                   retriesAttempted = 0;

                   //设置本次读取的终止位置
                   sequence.set(endOfBatchSequence);
               }
               //省略...
           }
           //省略...
       }
   }

这个方法有两个重要的关键点,分别是关键点1负责检测是否可以进行消费和关键点2从RingBuffer数组中获取元素进行消费的动作
关键点1sequenceBarrier.waitFor方法的作用是在于使用传入的等待策略来等待可消费的消息;
关键点2dataProvider.get(nextSequence)方法的作用是去取RingBuffer中的event数组中的数据,然后在将event传递到eventHandler.onEvent方法中

整个消费流程如下所示:

eventHandler消费流程

  • 小结
    eventHandler的注册和消费流程可以说都是围绕这EventHandler的操作类BatchEventProcessor来进行的,其中注册流程主要是通过Disruptor将EventHandler封装成EventProcess在注册到consumerRepository中;
    消费流程是通过EventProcess来启动线程任务来实现的,需要注意的是processEvents()方法中会执行消费等待策略和执行消费消息的方法;

总结

RingBuffer从逻辑上是一个环形结构可重复填充的数组,通过Sequenced来对RingBuffer的写入和访问进行控制,同时对于所以操作采用无锁的方法极限提高性能;

RingBuffer_操作流程图

参考资料

Disruptor—3.核心源码实现分析
环形缓冲器有何特别之处


  TOC