RocketMQ主从同步机制分析


RocketMQ主从同步机制分析

Broker分为SlaveMaster两种角色,因此Slave会从master中同步信息topicConfigConsumerOfferDelayOffsetSubscriptionGroupConfig等信息

启动同步

  • handleSlaveSynchronize

GAqiCj.png

handleSlaveSynchronize是处理Slave Broker同步Marster的方法,分别在三个地方有调用:

  1. BrokerController.start()中调用
  2. 该节点变成Marster节点时会调用
  3. 该节点变成Slaver节点时会调用

主要方法

public void syncAll() {
    this.syncTopicConfig();
    this.syncConsumerOffset();
    this.syncDelayOffset();
    this.syncSubscriptionGroupConfig();
}

syncTopicConfig

syncTopicConfig主要是同步topicConfig信息,主要步骤如下:
GEV9de.png

syncConsumerOffset

syncConsumerOffset为同步offset数据不进行判断直接覆盖,这样做是因为offset在主从结点进行切换时允许丢失部分offset信息

private void syncConsumerOffset() {
    String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
        try {
            ConsumerOffsetSerializeWrapper offsetWrapper =
                this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
            //直接进行替换
            this.brokerController.getConsumerOffsetManager().getOffsetTable()
                .putAll(offsetWrapper.getOffsetTable());
            this.brokerController.getConsumerOffsetManager().persist();
            log.info("Update slave consumer offset from master, {}", masterAddrBak);
        } catch (Exception e) {
            log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
        }
    }
}

syncDelayOffset

private void syncDelayOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
              String delayOffset =
                  this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
              if (delayOffset != null) {
                  String fileName =
                      StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                          .getMessageStoreConfig().getStorePathRootDir());
                  try {
                      MixAll.string2File(delayOffset, fileName);
                  } catch (IOException e) {
                      log.error("Persist file Exception, {}", fileName, e);
                  }
              }
        }
    }

核心代码就是从Broker中获取到死信队列的offset,然后将此offset记录下来

syncSubscriptionGroupConfig

这个同步类似于Topic的同步机制,需要用版本号严格控制

同步消息体

同步消息是M/S架构中最重要的功能,主要执行代码放在rocketmq-store

  • 模块引用
    GEnEtg.png

Broker启动

在Broker启动时会先判断当前Broker是什么类型的节点,如果是Slave类型的节点会设置masterAddr

if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}

//
this.haService.updateMasterAddress(newAddr);

Slave连接Master

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            //根据Master地址注册一个TCP连接
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                //开启channel
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    //添加selector
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

Master响应Slave

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

Master注册完成一个Tcp通道,连接成功后双方通过不断对比offset来进行同步


  TOC