rocketmq顺序消息的分析
顺序消息是指的是一组需要有序的消息集合,在同一参照系下才有意义。rocketmq的顺序消息主要是分为两个方面:
- product保证将一组有序的消息发送到同一个messageQueue下
- consumer消费时保证同一时刻一个消费组下只有一个线程在消费消息
- 范围
- 分区顺序:单个messagequeue中的消息是有序的
- 全局顺序:单个Topic中的消息都是有序的
原理
发送时是如何保证顺序的?
发送时是通过MessageQueueSelector在生产者一端保证顺序的
java
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
},finalI);
MessageQueueSelector.select的执行时间是在DefaultMQProducerImpl.send时,会调用传入的MessageQueueSelector并执行select方法获取到messageQueue队列进行传值
ps.普通消息调用时候不会手动选择messageQueue,而是通过**selectOneMessageQueue()**方法进行选择的
java
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
核心代码就是简单的轮询
消息存储时是如何保证顺序的?
Broker将消息按照发送顺序写入Commitlog中,从而保证了消息的写入有序
消费端有序是如何保证的?
首先消费端有序必须使用MessageListenerOrderly来监听消息的拉取事件。
通过三把锁来保证消息消费的有序:
- messageQueueLock用来保证同一个时刻只有一个线程能访问messageQueue

GlwV41.png
-
Broker设置锁,用于在同一个消费组同一个时刻只能有一个消费者能进行访问
-
processQueue.lock 必须设置锁了才能进行消费,用于将1,2步骤的结构进行check生成最终的结果