rocketmq的存储实现原理之commitlog


rocketmq的存储实现原理之commitlog

在之前的《rocketmq的存储原理》文章中分析了rocketmq的存储过程主要是通过两个类来实现的分别是commitlog和MappedFile,这篇文章重点分析commitlog这个类的实现

初始化过程

comitlog对象是通过三个方法来负责初始化处理过程分别是构造方法commitLog()/加载方法load()/启动方法start(),这里将commitlog的对象初始化过程和load过程以及启动-start过程都划分为初始化过程中

commitLog()构造方法

CommitLog()构造方法的调用链如图所示
CommitLog()
可以看到在Broker启动过程中会通过DefaultMessageStore来调用Commitlog构造方法,commitlog的构造方法主要是做初始化日志环境的功能,下面详细的看一下commitLog()方法的实现

  • CommitLog构造方法
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    //1. 初始化文件信息在内存中的映射的queue
    this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
            defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
    //2. 设置默认的消息存储
    this.defaultMessageStore = defaultMessageStore;
    //3. 设置刷盘策略
    //TODO 这里的刷盘策略'FlushCommitLogService'是使用final关键字进行修饰的,在初始化完成以后就不允许更新刷盘策略的。暂时还不知道为什么这样做
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        //设置同步刷盘策略
        this.flushCommitLogService = new GroupCommitService();
    } else {
        //设置异步刷盘策略
        this.flushCommitLogService = new FlushRealTimeService();
    }
    //4. 设置消息提交策略为实时提交,CommitRealTimeService继承FlushCommitLogService并作为默认的提交策略,具体实现类还有GroupCommitService/FlushRealTimeService
    //TODO 这里的抽象不是很好,将提交策略通过内部类的方式来隐藏实现
    this.commitLogService = new CommitRealTimeService();

    //5. 设置默认响应策略
    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());

    //6. 初始化提交消息线程
    putMessageThreadLocal = ThreadLocal.withInitial(() -> new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()));
    //7. 初始化提交消息的锁,TODO  会根据配置初始化出可重入锁或自旋锁??
    this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}

步骤1~7是初始化日志环境的工作,依次是

  • 初始化messagequeue\messafestore
  • 设置刷盘策略、提交策略、响应策略
  • 初始化提交线程池

load()方法

在构造方法完成后BrokerController会继续调用DefaultMessageStoreload()方法,来对CommitLog对象进行加载

  • BrokerController.load
//initialize()方法执行message加载动作
public boolean initialize() throws CloneNotSupportedException {
    //省略...
    result = result && this.messageStore.load();
    //省略...
}
  • DefaultMessageStore.load()
public boolean load() {
    boolean loadResult = true;

    try {
        //1. 通过是否存在临时文件判断是否上一次正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        //2. TODO 这里的判断是多余的
        if (null != scheduleMessageService) {
            //执行调度
            loadResult = loadResult && this.scheduleMessageService.load();
        }

        //3. 加载CommitLog
        loadResult = loadResult && this.commitLog.load();

        //4.加载loadConsumeQueue
        loadResult = loadResult && this.loadConsumeQueue();

        //5.创建成功后续操作
        if (loadResult) {
            //初始化文件存储的检查对象
            this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加载index的管理服务
            this.indexService.load(lastExitOK);
            //根据上次服务是否异常中断进行状态恢复
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        loadResult = false;
    }

    //加载失败时,关闭 TODO 这里就只有文件映射服务(MappedFileService)需要关闭吗?
    if (!loadResult) {
        this.allocateMappedFileService.shutdown();
    }
    return loadResult;
}

在DefaultMessageStore.load的第3步会将commitLog进行加载,其他方法是对异常恢复、延迟消息处理队列服务、ConsumerQueue进行加载。
下面介绍一下commitLog.loan的加载过程

  • CommitLog.load()
public boolean load() {
    //加载mappedFileQueue
    boolean result = this.mappedFileQueue.load();
    log.info("load commit log " + (result ? "OK" : "Failed"));
    return result;
}

可以看到CommitLog.load()方法就是加载MappedFileQueue

  • MappedFileQueue.load()

/**
 * 加载commit log文件
 */
public boolean load() {
    //默认地址为System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; @see org.apache.rocketmq.store.config.MessageStoreConfig.storePathCommitLog
    File dir = new File(this.storePath);
    File[] files = dir.listFiles();
    //文件不存在时,直接返回
    if (Objects.isNull(files) || files.length == 0) {
        return true;
    }
    //对文件按照升序进行排序 TODO 这里也是多余的文件的命名规则是有序
    Arrays.sort(files);
    for (File file : files) {
        //commitLog的文件大小默认为1G TODO 这个大小是介于性能和容量之间的一个选择
        if (file.length() != this.mappedFileSize) {
            log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually");
            return false;
        }

        try {
            //设置MappedFile对象信息
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            //将MappedFile对象添加到mappedFiles,注意是通过CopyOnWriteArrayList容器进行保存
            this.mappedFiles.add(mappedFile);
            log.info("CommitLog  call MappedFileQueue load {} :OK", file.getPath());
        } catch (IOException e) {
            log.error("CommitLog  call MappedFileQueue load {} :ERROR", file.getPath(), e);
            return false;
        }
    }
    return true;
}

mappedFiles.load()方法主要通过指定的文件路径加载文件,并将文件对象通过CopyOnWriteArrayList容器进行存放
CopyOnWriteArrayList是读无锁-写有锁的容器,存储commitLog信息

start()方法

  • commitLog.Start()
public void start() {
    //1.启动刷盘线程
    this.flushCommitLogService.start();
    //2.判断是否启动异步提交
    if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        this.commitLogService.start();
    }
}

commitLog.start()方法主要是启动刷盘线程和判断是否启动异步提交

调用过程

CommitLog底层是通过asyncPutMessage()方法来实现异步向文件系统提交的,下面我们先根据asyncPutMessage()来分析从上至下的调用链过程

调用链

通过Arthas分析asyncPutMessage()的调用过程如图所示

/调用链路.jpg

  • arthas cmd
stack org.apache.rocketmq.store.CommitLog asyncPutMessage  -n 5 

从图中可以看处理整个调用过程从网络I/O到文件I/O是非常短的

@org.apache.rocketmq.store.CommitLog.asyncPutMessage()
    at org.apache.rocketmq.store.DefaultMessageStore.asyncPutMessage(DefaultMessageStore.java:435)
    at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncSendMessage(SendMessageProcessor.java:314)
    at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncProcessRequest(SendMessageProcessor.java:101)
    at org.apache.rocketmq.broker.processor.SendMessageProcessor.asyncProcessRequest(SendMessageProcessor.java:82)
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:225)
    at org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80)

NettyRemotingAbstract -> SendMessageProcessor -> CommitLog 三个类就完成了处理


  TOC