Consumer源代码分析


Consumer源代码分析

消费流程

DefaultMQPushConsumer

DefaultMQPushConsumer是默认进行消费的工具类,类图如下:

DefaultMQPushConsumer继承结构

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

默认的构造函数参数如下

参数 含义
namespace 指定producer/consumer实例,只有在同一个实例下的消息才能被消费到
consumerGroup 消费组
defaultMQPushConsumerImpl
allocateMessageQueueStrategy 默认的消费策略
namesrvAddr namesrv的地址
subscribe(topic,subExpression) 订阅的主题和tag过滤的规则

实现原理

主要是一下几个步骤

  1. 准备环境,设置各种环境变量(例如topic,过滤策略),根据环境变量初始化pullRequest
  2. 从本地或者Broker中获取到offerSet偏移量
  3. 从本地的brokerTable中获取到对应的broker地址
  4. 调用MQClientAPIImpl操作获取消息并执行回调(将Netty的操作封装成MQ层面的业务操作)
  5. 底层调用NettyRemotingClient进行网络连接,特殊的报文结构

消息的方式是通过“长轮询”的方式进行处理的,通过服务端hold住请求(默认是15s)来实现的

源代码分析

ConsumerMessageConcurrentlyService

上面讲到如何初始化一个DefaultMQPushConsumer,但是在实际执行获取动作的是ConsumerMessageConcurrentlyService来进行的。

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
    MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    //1. 创建一个主线程池用来拉去消息
    this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));

    //2. 创建一个用来执行推迟消费的线程池
    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    //3. 定时清理超时的消息
    this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
  • 主线程池执行的逻辑
@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    //1. msgs是一批消息,先判断这一批消息是否大于最大处理量
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            //2. 放入正常队列执行业务上的消费逻辑
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            //3. 消费失败,放入延迟等待线程池中
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        //4. 如果batch消息太多会按照最大消费长度分批进行处理
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
            //重复2、3的逻辑
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

消费的逻辑:

  1. 创建三个不同的线程池分别是正常消费的线程池、等待消费的线程池、清理消息的线程池
  2. 判断当前消息数量是否大于最大处理数量
  3. 尝试使用正常消费的线程池进行消费 如果失败 -> 等待消费的线程池

消费消息线程池的逻辑

  • ConsumeRequest
  1. 线程调用消息的处理逻辑,根据消息从处理逻辑返回结果进行下一步操作
  • 调用消息的逻辑
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        //listener就是消息的处理逻辑方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
  • 处理消息的逻辑
switch (status) {
    case CONSUME_SUCCESS:
        if (ackIndex >= consumeRequest.getMsgs().size()) {
            ackIndex = consumeRequest.getMsgs().size() - 1;
        }
        int ok = ackIndex + 1;
        int failed = consumeRequest.getMsgs().size() - ok;
        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
        break;
    case RECONSUME_LATER:
        ackIndex = -1;
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
            consumeRequest.getMsgs().size());
        break;
    default:
        break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }
  1. 将不同返回值的消息写入不同的队列中
  2. 根据不同的消费方式做不同的处理
    2.1 广播模式 消费不成功不处理
    2.2 集群模式消费不成功,推送broker,让broker通知其他consumer进行消费,还是失败进入延迟队列

ProcessQueue

ProcessQueue是consumer中每一个MessageQueue所对应的队列消耗快照。

private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

主要是由一个TreeMep和ReadWriteLock锁组成,TreeMap里以MessageQueue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到但是还未被处理的消息,读写锁控制着多个线程对TreeMap对象的并发访问。

总结

Consumer的消费过程是:

  1. 根据配置信息初始化Consumer client对象
  2. 获取Offerset偏移量
  3. 获取BrokerTable中的地址
  4. 调用NettyRemotingClint获取消息,然后业务处理完成回调

Consumer的消费的组件是:

  1. 处理正常消费的消息线程池
  2. 处理延迟消息的线程池
  3. 清理消息的线程池

每一个MessageQueue在消费时都有一个ProcessQueue来缓冲所有已消费未处理的消息,ProcessQueue底层是采用TreeMapReadWriteLock来进行实现的


  TOC