NameServer源代码分析
周末继续阅读RocketMq的学习,在上周已经简单的把RocketMq的安装和使用学习了一下。本周主要来看一下NameServer的源代码和设计
NameServer启动
NamesrvController启动
public static NamesrvController main0(String[] args) {
try {
//创建NamesrvController()
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
在RocketMQ的源代码NamesrvStartup.java中启动,调用时序主要是: main0() -> createNamesrvController()
在createNamesrvController方法中,主要执行NamesrvController初始化、NamesrvController.start()
NamesrvController初始化
NamesrvController初始化的步骤主要是:
- 获取到namesrvConfig、nettyServerConfig等配置信息
- 在根据配置生成NamesrvController初始化对象
NamesrvController.start()
//省略代码....
//初始化controller
boolean initResult = controller.initialize();
//添加关闭的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//省略代码....
//start
controller.start();
return controller;
NamesrvController.start()主要做了两个工作:
- 通过controller.initialize()设置好controller的配置环境
1.1 开启Netty服务
1.2 启动线程池(remotingExecutor、两个定时执行的线程,一个用来扫描失效的Broker(scanNotActiveBroker),另一个用来打印配置信息(printAllPeriodically)) - remotingServer.start()
2.1 使用netty开启服务
2.2 prepareSharableHandlers中注册NettyServerHandler将NettyRequestProcessor注册到处理器中
Namesrv的处理逻辑
Namesrv的处理逻辑主要是通过NettyRequestProcessor的子类DefaultRequestProcessor来进行实现的
- DefaultRequestProcessor
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
根据不同的请求返回不同的处理函数
RocketMQ元数据存储
NameSrv中信息都是存储在内存中的,而且每一个NameSrv保存的信息都是全量的。RouteInfoManager是保存这些信息的类
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
在这里由于Namesev中的业务场景是读多写少,因此用可重入的读写锁来保证并发的安全性
总结
- 加载配置文件转换为NameServerConfig、NettyConfig
- 根据配置文件生成NamesrvController
- 创建线程池和定时扫描线程
- 开启Netty服务
- 用NettyRequestProcessor处理请求
- RouteInfoManager中存储信息
参考资料
- 《RocketMQ实战与原理解析》
- RocketMQ 系列文章