文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ Broker消息如何刷盘源码解析

2023-05-19 14:51

关注

前言

我们在学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个刷盘策略

同步刷盘即Broker消息已经被持久化到硬盘后才会向客户端返回成功。同步刷盘的优点是能保证消息不丢失,但是这是以牺牲写入性能为代价的。

异步刷盘是指Broker将信息存储到pagecache后就立即向客户端返回成功,然后会有一个异步线程定时将内存中的数据写入磁盘,默认时间间隔为500ms。

Broker中的刷盘策略是通过Broker配置文件中flushDiskType进行配置,可以配置ASYNC_FLUSH(异步刷盘)和SYNC_FLUSH(同步刷盘),默认配置是ASYNC_FLUSH

Broker的刷盘采用基于JDK NIO技术,消息首先会存储到内存中,然后再根据不同的刷盘策略在不同时间刷盘,如果有不了解的小伙伴可以参考这篇文章《【NIO实战】深入理解FileChannel》

刷盘相关类介绍

CommitLog中的内部类FlushCommitLogService及其子类CommitRealTimeService、GroupCommitService、FlushRealTimeService分别是用于不同场景下用于刷盘的刷盘行为,他们会单独或者配合起来使用。具体类图如下所示。

如果是同步刷盘会使用GroupCommitService。如果是异步刷盘,并且关闭了堆外缓存(TransientStorePool),则采用FlushRealTimeService刷盘。如果是异步刷盘,并且开启了堆外缓存,则会使用FlushRealTimeService与CommitRealTimeService配合刷盘。

默认的输盘策略是异步关闭堆外缓存,因此默认是采用FlushRealTimeService进行刷盘

Broker刷盘源码分析

消息刷盘相关逻辑都是围绕在CommitLog,因此要想知道消息时如何刷盘的关键是研究CommitLog

CommitLog构造&属性赋值

CommitLog中与刷盘相关的属性有flushCommitLogService、commitLogService。如果是同步刷盘则在构造函数中会给flushCommitLogService赋值GroupCommitService,如果是异步刷盘则给flushCommitLogService赋值FlushRealTimeService。commitLogService的值是CommitRealTimeService,从上面我们可以很明显的看出它只有在异步且开启TransientStorePoolEnabled时才会被使用。

public class CommitLog {
  // 如果是同步刷盘,则是GroupCommitService。如果是异步刷盘则是FlushRealTimeService
  // 默认是异步刷盘,因此是CommitLog$FlushRealTimeService
  private final FlushCommitLogService flushCommitLogService;
  // 开启TransientStorePoolEnable时使用CommitRealTimeService
  private final FlushCommitLogService commitLogService;
	// 构造函数
  public CommitLog(final DefaultMessageStore defaultMessageStore) {
      // 默认是异步刷盘,因此这里是false
      if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
          this.flushCommitLogService = new GroupCommitService();
      } else {
          this.flushCommitLogService = new FlushRealTimeService();
      }
      this.commitLogService = new CommitRealTimeService();
      // 消息回调
      this.appendMessageCallback = new DefaultAppendMessageCallback();
      flushDiskWatcher = new FlushDiskWatcher();
  }
}

TransientStorePoolEnabled介绍

transientStorePoolEnabled配置的默认值为false,开启transientStorePoolEnabled需要手动开启。如果开启transientStorePoolEnabled会开启堆外内存存储池,Broker在启动时会申请5个与CommitLog大小(1GB)相同的堆外内存交给TransientStorePool,创建MappedFile时会向TransientStorePool“借”一个堆外内存ByteBuffer,保存消息时会先将消息保存到堆外内存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盘中。TransientStorePool属性和一些核心方法源码如下,堆外内存ByteBuffer都是由它来管理。

// org.apache.rocketmq.store.TransientStorePool
public class TransientStorePool {
    // 存储池大小,默认是5
    private final int poolSize;
    // CommitLog MappedFile文件大小,默认1GB
    private final int fileSize;
    // 默认存5个ByteBuffer
    private final Deque<ByteBuffer> availableBuffers;
    // 消息存储配置
    private final MessageStoreConfig storeConfig;
		// TransientStorePool初始化
    public void init() {
        // 默认是5
        for (int i = 0; i < poolSize; i++) {
            // 分配1GB的直接内存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            // 生成的缓存保存到队列中
            availableBuffers.offer(byteBuffer);
        }
    }
    // 归还缓冲
    public void returnBuffer(ByteBuffer byteBuffer) {
        // 修改position和limit,"清空"缓冲
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
      	// 缓冲入队
        this.availableBuffers.offerFirst(byteBuffer);
    }
    // 向TransientStorePool借缓冲
    public ByteBuffer borrowBuffer() {
      	// 缓冲出队
        ByteBuffer buffer = availableBuffers.pollFirst();
        return buffer;
    }
}

消息保存源码分析

前面文章《【RocketMQ | 源码分析】Broker是如何保存消息的? 》我们虽然介绍了消息的保存过程,但是开启或者关闭TransientStorePoolEnabled时,消息保存的细节是不同的,我们再打开消息保存MappedFile的源码如下,下面代码中如果writeBuffer不空,则会将消息先追加到writeBuffer,否者直接写入到MappedFile的内存映射文件中。

// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 如果写文件位置小于文件size
    if (currentPos < this.fileSize) {
        // 如果writeBuffer不空,则获取writeBuffer的浅拷贝,否则获取MappedFile的内存映射(MappedByteBuffer)的浅拷贝
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是单条消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } // ...如果是批量消息
        return result;
    }
}

那么什么情况下MappedFile中的writeBuffer为空,什么情况下writeBuffer不为空呢?我们可以先来了解MappedFile是如何创建的,MappedFile是由AllocateMappedFileService创建的,具体源码如下,如果开启了TransientStorePoolEnabled,则在创建MappedFile时会向TransientStorePool“借”一个ByteBuffer,如果没有开启TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存数据时会将数据直接保存到MappedFile的直接内存映射(MappedByteBuffer)中。

private boolean mmapOperation() {
  // ...
  if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
      try {
          mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        	// 初始化mappedFile会向TransientStorePool"借"一个writeBuffer
          mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      } catch (RuntimeException e) {
          mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      }
  } else {
    	// 创建MappedFile,没有writeBuffer
      mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
  }
  // ...
}

由上可知,消息保存如下图所示

消息刷盘入口方法源码分析

消息保存和刷盘的入口方法CommitLog#asyncPutMessage,消息保存到mappedFile的缓存后,最后会调用submitFlushRequest方法提交刷盘请求,Broker会根据刷盘策略进行刷盘。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //... 保存消息
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    // ...
    // 提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 提交复制请求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 合并提交刷盘请求和提交复制请求结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

提交了刷盘请求后,根据刷盘策略,是否开启堆外缓存,推送消息中是否要等待消息保存有如下四种刷盘方式

异步刷盘(关闭TransientStorePoolEnabled)是默认的刷盘方案,这个刷盘方案先会**异步唤醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于关闭了TransientStorePoolEnabled,消息是保存到MappedFile中的内存映射文件MappedByteBuffer,FlushRealTimeService将定时MappedByteBuffer刷到磁盘。

异步刷盘(开启TransientStorePoolEnabled)会先**异步唤醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于开启了TransientStorePoolEnabled,消息会保存到MappedFile中的内存映射文件ByteBuffer,CommitRealTimeService定时将ByteBuffer中的数据刷到FileChannel中。

同步刷盘(等待消息保存)会先创建一个刷盘请求(GroupCommitRequest),然后向GroupCommitService提交刷盘请求,最后等待刷盘结果并返回

同步刷盘(不等待消息保存)也是通过GroupCommitService刷盘,与等待消息保存不同的是不等待的方式异步唤醒(wakeup)GroupCommitService后,直接返回消息保存成功。

四种刷盘方式源码如下所示

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 获取同步刷盘Service
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            // 创建GroupCommitRequest 刷盘偏移量nextOffset = 当前写入偏移量 + 当前消息写入大小
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            // 向刷盘监视器(flushDistWatch)提交刷盘请求
            flushDiskWatcher.add(request);
            // 提交刷盘请求,并且唤醒同步刷盘线程
            service.putRequest(request);
            return request.future();
        } else {
            // 同步刷盘,但是不需要等待刷盘结果,那么唤醒同步刷盘线程
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 异步刷盘
    else {
        // 是否启动了堆外缓存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            // 如果没有启动堆外缓存,则唤醒异步刷盘服务 flushRealTimeService
            flushCommitLogService.wakeup();
        } else  {
            // 如果启动了堆外缓存,则唤醒异步转存服务CommitRealTimeService
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

将上面四种场景及调用关系如下图所示

总结

本篇文章介绍了TransientStorePool机制以及开启和管理队消息保存的影响,我们还介绍了RocketMQ中四种刷盘策略

以上就是RocketMQ Broker消息如何刷盘源码解析的详细内容,更多关于RocketMQ Broker消息刷盘的资料请关注编程网其它相关文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯