rocketmq顺序消息的分析


rocketmq顺序消息的分析

顺序消息是指的是一组需要有序的消息集合,在同一参照系下才有意义。rocketmq的顺序消息主要是分为两个方面:

  1. product保证将一组有序的消息发送到同一个messageQueue下
  2. consumer消费时保证同一时刻一个消费组下只有一个线程在消费消息
  • 范围
  1. 分区顺序:单个messagequeue中的消息是有序的
  2. 全局顺序:单个Topic中的消息都是有序的

原理

发送时是如何保证顺序的?

发送时是通过MessageQueueSelector在生产者一端保证顺序的

java
SendResult result = producer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(0);
    }
},finalI);

MessageQueueSelector.select的执行时间是在DefaultMQProducerImpl.send时,会调用传入的MessageQueueSelector并执行select方法获取到messageQueue队列进行传值

ps.普通消息调用时候不会手动选择messageQueue,而是通过**selectOneMessageQueue()**方法进行选择的

java
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
            return mq;
    }
}

核心代码就是简单的轮询

消息存储时是如何保证顺序的?

Broker将消息按照发送顺序写入Commitlog中,从而保证了消息的写入有序

消费端有序是如何保证的?

首先消费端有序必须使用MessageListenerOrderly来监听消息的拉取事件。

通过三把锁来保证消息消费的有序:

  1. messageQueueLock用来保证同一个时刻只有一个线程能访问messageQueue

GlwV41.png
GlwV41.png

  1. Broker设置锁,用于在同一个消费组同一个时刻只能有一个消费者能进行访问

  2. processQueue.lock 必须设置锁了才能进行消费,用于将1,2步骤的结构进行check生成最终的结果