MQClientInstance类分析
MQClientInstance是位于org.apache.rocketmq.client.impl.factory下的一个为producer和consumer提供统一和Broker进行交互的底层工具类;
创建
- MQClientManager
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
//判断当前client是否有过实例对象,有就取
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
MQClientInstance的创建是通过MQClientManager来进行创建和管理的,通过将实例和MQClientInstance放入一个ConcurrentHashMap中。默认情况下每个进程内共享同一个MQClientInstance对象连接一个集群,如果要设置多个集群需要手动指定实例名称。
start()方法
public void start() throws MQClientException {
synchronized (this) {
//判断服务状态
switch (this.serviceState) {
case CREATE_JUST://创建服务
this.serviceState = ServiceState.START_FAILED;
// 如果未指定Namesrv地址,尝试从远程服务器中获取地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// 启动各种定时任务
this.startScheduledTask();
// 启动拉去消息服务
this.pullMessageService.start();
// 启动负载服务
this.rebalanceService.start();
// 启动推送服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
MQClientInstance主要是与Broker进行通信的,因此在启动后需要存储NameServer查询出来的Topic、Broker信息。在start()中使用了一下一个线程池去执行任务:
- startScheduledTask2分钟去请求一次最新的NameServer地址、更新Topic信息、清理失效的Broker、保存消费者的offset
- 启动拉消息的线程
- 启动topic负载线程
- 启动推送消息的线程