RocketMQ主从同步机制分析
Broker分为Slave、Master两种角色,因此Slave会从master中同步信息、topicConfig、ConsumerOffer、DelayOffset、SubscriptionGroupConfig等信息
启动同步
- handleSlaveSynchronize
handleSlaveSynchronize是处理Slave Broker同步Marster的方法,分别在三个地方有调用:
- BrokerController.start()中调用
- 该节点变成Marster节点时会调用
- 该节点变成Slaver节点时会调用
主要方法
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
syncTopicConfig
syncTopicConfig主要是同步topicConfig信息,主要步骤如下:
syncConsumerOffset
syncConsumerOffset为同步offset数据不进行判断直接覆盖,这样做是因为offset在主从结点进行切换时允许丢失部分offset信息
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
//直接进行替换
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
syncDelayOffset
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName =
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
} catch (IOException e) {
log.error("Persist file Exception, {}", fileName, e);
}
}
}
}
核心代码就是从Broker中获取到死信队列的offset,然后将此offset记录下来
syncSubscriptionGroupConfig
这个同步类似于Topic的同步机制,需要用版本号严格控制
同步消息体
同步消息是M/S架构中最重要的功能,主要执行代码放在rocketmq-store中
- 模块引用
Broker启动
在Broker启动时会先判断当前Broker是什么类型的节点,如果是Slave类型的节点会设置masterAddr
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
//
this.haService.updateMasterAddress(newAddr);
Slave连接Master
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
//根据Master地址注册一个TCP连接
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
//开启channel
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
//添加selector
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
Master响应Slave
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
Master注册完成一个Tcp通道,连接成功后双方通过不断对比offset来进行同步