MQClientInstance类分析


MQClientInstance类分析

MQClientInstance是位于org.apache.rocketmq.client.impl.factory下的一个为producer和consumer提供统一和Broker进行交互的底层工具类;

创建

  • MQClientManager
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        //判断当前client是否有过实例对象,有就取
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

MQClientInstance的创建是通过MQClientManager来进行创建和管理的,通过将实例和MQClientInstance放入一个ConcurrentHashMap中。默认情况下每个进程内共享同一个MQClientInstance对象连接一个集群,如果要设置多个集群需要手动指定实例名称。

GFEx1K.png

start()方法

public void start() throws MQClientException {

        synchronized (this) {
            //判断服务状态
            switch (this.serviceState) {
                case CREATE_JUST://创建服务
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果未指定Namesrv地址,尝试从远程服务器中获取地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // 启动各种定时任务
                    this.startScheduledTask();
                    // 启动拉去消息服务
                    this.pullMessageService.start();
                    // 启动负载服务
                    this.rebalanceService.start();
                    // 启动推送服务                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

MQClientInstance主要是与Broker进行通信的,因此在启动后需要存储NameServer查询出来的TopicBroker信息。在start()中使用了一下一个线程池去执行任务:

  1. startScheduledTask2分钟去请求一次最新的NameServer地址、更新Topic信息、清理失效的Broker、保存消费者的offset
  2. 启动拉消息的线程
  3. 启动topic负载线程
  4. 启动推送消息的线程

  TOC