NameServer源代码分析


NameServer源代码分析

周末继续阅读RocketMq的学习,在上周已经简单的把RocketMq的安装和使用学习了一下。本周主要来看一下NameServer的源代码和设计

NameServer启动

NamesrvController启动

public static NamesrvController main0(String[] args) {

        try {
            //创建NamesrvController()
            NamesrvController controller = createNamesrvController(args);
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }

在RocketMQ的源代码NamesrvStartup.java中启动,调用时序主要是: main0() -> createNamesrvController()

createNamesrvController方法中,主要执行NamesrvController初始化NamesrvController.start()

NamesrvController初始化

NamesrvController初始化的步骤主要是:

  1. 获取到namesrvConfignettyServerConfig等配置信息
  2. 在根据配置生成NamesrvController初始化对象

NamesrvController.start()

//省略代码....
//初始化controller
boolean initResult = controller.initialize();

//添加关闭的钩子函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

//省略代码....

//start
controller.start();

return controller;

NamesrvController.start()主要做了两个工作:

  1. 通过controller.initialize()设置好controller的配置环境
    1.1 开启Netty服务
    1.2 启动线程池(remotingExecutor、两个定时执行的线程,一个用来扫描失效的
    Broker(scanNotActiveBroker)
    ,另一个用来打印配置信息(printAllPeriodically))
  2. remotingServer.start()
    2.1 使用netty开启服务
    2.2 prepareSharableHandlers中注册NettyServerHandlerNettyRequestProcessor注册到处理器中

Namesrv的处理逻辑

Namesrv的处理逻辑主要是通过NettyRequestProcessor的子类DefaultRequestProcessor来进行实现的

  • DefaultRequestProcessor
 switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.UNREGISTER_BROKER:
        return this.unregisterBroker(ctx, request);
    case RequestCode.GET_ROUTEINTO_BY_TOPIC:
        return this.getRouteInfoByTopic(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return deleteTopicInNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}

根据不同的请求返回不同的处理函数

RocketMQ元数据存储

NameSrv中信息都是存储在内存中的,而且每一个NameSrv保存的信息都是全量的。RouteInfoManager是保存这些信息的类


public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

在这里由于Namesev中的业务场景是读多写少,因此用可重入的读写锁来保证并发的安全性

总结

  1. 加载配置文件转换为NameServerConfig、NettyConfig
  2. 根据配置文件生成NamesrvController
  3. 创建线程池和定时扫描线程
  4. 开启Netty服务
  5. NettyRequestProcessor处理请求
  6. RouteInfoManager中存储信息

参考资料


  TOC