rocketmq的存储实现原理之commitlog
在之前的《rocketmq的存储原理》文章中分析了rocketmq的存储过程主要是通过两个类来实现的分别是commitlog和MappedFile,这篇文章重点分析commitlog这个类的实现
初始化过程
comitlog对象是通过三个方法来负责初始化处理过程分别是构造方法commitLog()/加载方法load()/启动方法start(),这里将commitlog的对象初始化过程和load过程以及启动-start过程都划分为初始化过程中
commitLog()构造方法
CommitLog()构造方法的调用链如图所示

可以看到在Broker启动过程中会通过DefaultMessageStore来调用Commitlog构造方法,commitlog的构造方法主要是做初始化日志环境的功能,下面详细的看一下commitLog()方法的实现
- CommitLog构造方法
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//1. 初始化文件信息在内存中的映射的queue
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
//2. 设置默认的消息存储
this.defaultMessageStore = defaultMessageStore;
//3. 设置刷盘策略
//TODO 这里的刷盘策略'FlushCommitLogService'是使用final关键字进行修饰的,在初始化完成以后就不允许更新刷盘策略的。暂时还不知道为什么这样做
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//设置同步刷盘策略
this.flushCommitLogService = new GroupCommitService();
} else {
//设置异步刷盘策略
this.flushCommitLogService = new FlushRealTimeService();
}
//4. 设置消息提交策略为实时提交,CommitRealTimeService继承FlushCommitLogService并作为默认的提交策略,具体实现类还有GroupCommitService/FlushRealTimeService
//TODO 这里的抽象不是很好,将提交策略通过内部类的方式来隐藏实现
this.commitLogService = new CommitRealTimeService();
//5. 设置默认响应策略
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
//6. 初始化提交消息线程
putMessageThreadLocal = ThreadLocal.withInitial(() -> new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()));
//7. 初始化提交消息的锁,TODO 会根据配置初始化出可重入锁或自旋锁??
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
步骤1~7是初始化日志环境的工作,依次是
- 初始化messagequeue\messafestore
- 设置刷盘策略、提交策略、响应策略
- 初始化提交线程池
load()方法
在构造方法完成后BrokerController会继续调用DefaultMessageStoreload()方法,来对CommitLog对象进行加载
- BrokerController.load
//initialize()方法执行message加载动作
public boolean initialize() throws CloneNotSupportedException {
//省略...
result = result && this.messageStore.load();
//省略...
}
- DefaultMessageStore.load()
public boolean load() {
boolean loadResult = true;
try {
//1. 通过是否存在临时文件判断是否上一次正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//2. TODO 这里的判断是多余的
if (null != scheduleMessageService) {
//执行调度
loadResult = loadResult && this.scheduleMessageService.load();
}
//3. 加载CommitLog
loadResult = loadResult && this.commitLog.load();
//4.加载loadConsumeQueue
loadResult = loadResult && this.loadConsumeQueue();
//5.创建成功后续操作
if (loadResult) {
//初始化文件存储的检查对象
this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载index的管理服务
this.indexService.load(lastExitOK);
//根据上次服务是否异常中断进行状态恢复
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
loadResult = false;
}
//加载失败时,关闭 TODO 这里就只有文件映射服务(MappedFileService)需要关闭吗?
if (!loadResult) {
this.allocateMappedFileService.shutdown();
}
return loadResult;
}
在DefaultMessageStore.load的第3步会将commitLog进行加载,其他方法是对异常恢复、延迟消息处理队列服务、ConsumerQueue进行加载。
下面介绍一下commitLog.loan的加载过程
- CommitLog.load()
public boolean load() {
//加载mappedFileQueue
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
可以看到CommitLog.load()方法就是加载MappedFileQueue
- MappedFileQueue.load()
/**
* 加载commit log文件
*/
public boolean load() {
//默认地址为System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; @see org.apache.rocketmq.store.config.MessageStoreConfig.storePathCommitLog
File dir = new File(this.storePath);
File[] files = dir.listFiles();
//文件不存在时,直接返回
if (Objects.isNull(files) || files.length == 0) {
return true;
}
//对文件按照升序进行排序 TODO 这里也是多余的文件的命名规则是有序
Arrays.sort(files);
for (File file : files) {
//commitLog的文件大小默认为1G TODO 这个大小是介于性能和容量之间的一个选择
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually");
return false;
}
try {
//设置MappedFile对象信息
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
//将MappedFile对象添加到mappedFiles,注意是通过CopyOnWriteArrayList容器进行保存
this.mappedFiles.add(mappedFile);
log.info("CommitLog call MappedFileQueue load {} :OK", file.getPath());
} catch (IOException e) {
log.error("CommitLog call MappedFileQueue load {} :ERROR", file.getPath(), e);
return false;
}
}
return true;
}
mappedFiles.load()方法主要通过指定的文件路径加载文件,并将文件对象通过CopyOnWriteArrayList容器进行存放
CopyOnWriteArrayList是读无锁-写有锁的容器,存储commitLog信息
start()方法
- commitLog.Start()
public void start() {
//1.启动刷盘线程
this.flushCommitLogService.start();
//2.判断是否启动异步提交
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
commitLog.start()方法主要是启动刷盘线程和判断是否启动异步提交
调用过程
CommitLog底层是通过asyncPutMessage()方法来实现异步向文件系统提交的,下面我们先根据asyncPutMessage()来分析从上至下的调用链过程
调用链
通过Arthas分析asyncPutMessage()的调用过程如图所示

- arthas cmd
stack org.apache.rocketmq.store.CommitLog asyncPutMessage -n 5
从图中可以看处理整个调用过程从网络I/O到文件I/O是非常短的
@org.apache.rocketmq.store.CommitLog.asyncPutMessage()
at org.apache.rocketmq.store.DefaultMessageStore.asyncPutMessage(DefaultMessageStore.java:435)
at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncSendMessage(SendMessageProcessor.java:314)
at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncProcessRequest(SendMessageProcessor.java:101)
at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncProcessRequest(SendMessageProcessor.java:82)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:225)
at org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80)
NettyRemotingAbstract -> SendMessageProcessor -> CommitLog 三个类就完成了处理