前言
我们在学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个刷盘策略
- 同步刷盘
同步刷盘即Broker消息已经被持久化到硬盘后才会向客户端返回成功。同步刷盘的优点是能保证消息不丢失,但是这是以牺牲写入性能为代价的。
- 异步刷盘
异步刷盘是指Broker将信息存储到pagecache后就立即向客户端返回成功,然后会有一个异步线程定时将内存中的数据写入磁盘,默认时间间隔为500ms。
Broker中的刷盘策略是通过Broker配置文件中flushDiskType
进行配置,可以配置ASYNC_FLUSH
(异步刷盘)和SYNC_FLUSH
(同步刷盘),默认配置是ASYNC_FLUSH
。
Broker的刷盘采用基于JDK NIO技术,消息首先会存储到内存中,然后再根据不同的刷盘策略在不同时间刷盘,如果有不了解的小伙伴可以参考这篇文章《【NIO实战】深入理解FileChannel》
刷盘相关类介绍
CommitLog中的内部类FlushCommitLogService及其子类CommitRealTimeService、GroupCommitService、FlushRealTimeService分别是用于不同场景下用于刷盘的刷盘行为,他们会单独或者配合起来使用。具体类图如下所示。
如果是同步刷盘会使用GroupCommitService。如果是异步刷盘,并且关闭了堆外缓存(TransientStorePool),则采用FlushRealTimeService刷盘。如果是异步刷盘,并且开启了堆外缓存,则会使用FlushRealTimeService与CommitRealTimeService配合刷盘。
默认的输盘策略是异步且关闭堆外缓存,因此默认是采用FlushRealTimeService进行刷盘
Broker刷盘源码分析
消息刷盘相关逻辑都是围绕在CommitLog,因此要想知道消息时如何刷盘的关键是研究CommitLog
CommitLog构造&属性赋值
CommitLog中与刷盘相关的属性有flushCommitLogService、commitLogService。如果是同步刷盘则在构造函数中会给flushCommitLogService赋值GroupCommitService,如果是异步刷盘则给flushCommitLogService赋值FlushRealTimeService。commitLogService的值是CommitRealTimeService,从上面我们可以很明显的看出它只有在异步且开启TransientStorePoolEnabled时才会被使用。
public class CommitLog {
// 如果是同步刷盘,则是GroupCommitService。如果是异步刷盘则是FlushRealTimeService
// 默认是异步刷盘,因此是CommitLog$FlushRealTimeService
private final FlushCommitLogService flushCommitLogService;
// 开启TransientStorePoolEnable时使用CommitRealTimeService
private final FlushCommitLogService commitLogService;
// 构造函数
public CommitLog(final DefaultMessageStore defaultMessageStore) {
// 默认是异步刷盘,因此这里是false
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
// 消息回调
this.appendMessageCallback = new DefaultAppendMessageCallback();
flushDiskWatcher = new FlushDiskWatcher();
}
}
TransientStorePoolEnabled介绍
transientStorePoolEnabled配置的默认值为false,开启transientStorePoolEnabled需要手动开启。如果开启transientStorePoolEnabled会开启堆外内存存储池,Broker在启动时会申请5个与CommitLog大小(1GB)相同的堆外内存交给TransientStorePool,创建MappedFile时会向TransientStorePool“借”一个堆外内存ByteBuffer,保存消息时会先将消息保存到堆外内存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盘中。TransientStorePool属性和一些核心方法源码如下,堆外内存ByteBuffer都是由它来管理。
// org.apache.rocketmq.store.TransientStorePool
public class TransientStorePool {
// 存储池大小,默认是5
private final int poolSize;
// CommitLog MappedFile文件大小,默认1GB
private final int fileSize;
// 默认存5个ByteBuffer
private final Deque<ByteBuffer> availableBuffers;
// 消息存储配置
private final MessageStoreConfig storeConfig;
// TransientStorePool初始化
public void init() {
// 默认是5
for (int i = 0; i < poolSize; i++) {
// 分配1GB的直接内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
// 生成的缓存保存到队列中
availableBuffers.offer(byteBuffer);
}
}
// 归还缓冲
public void returnBuffer(ByteBuffer byteBuffer) {
// 修改position和limit,"清空"缓冲
byteBuffer.position(0);
byteBuffer.limit(fileSize);
// 缓冲入队
this.availableBuffers.offerFirst(byteBuffer);
}
// 向TransientStorePool借缓冲
public ByteBuffer borrowBuffer() {
// 缓冲出队
ByteBuffer buffer = availableBuffers.pollFirst();
return buffer;
}
}
消息保存源码分析
前面文章《【RocketMQ | 源码分析】Broker是如何保存消息的? 》我们虽然介绍了消息的保存过程,但是开启或者关闭TransientStorePoolEnabled时,消息保存的细节是不同的,我们再打开消息保存MappedFile的源码如下,下面代码中如果writeBuffer不空,则会将消息先追加到writeBuffer,否者直接写入到MappedFile的内存映射文件中。
// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// 如果写文件位置小于文件size
if (currentPos < this.fileSize) {
// 如果writeBuffer不空,则获取writeBuffer的浅拷贝,否则获取MappedFile的内存映射(MappedByteBuffer)的浅拷贝
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
// 如果是单条消息
if (messageExt instanceof MessageExtBrokerInner) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} // ...如果是批量消息
return result;
}
}
那么什么情况下MappedFile中的writeBuffer为空,什么情况下writeBuffer不为空呢?我们可以先来了解MappedFile是如何创建的,MappedFile是由AllocateMappedFileService创建的,具体源码如下,如果开启了TransientStorePoolEnabled,则在创建MappedFile时会向TransientStorePool“借”一个ByteBuffer,如果没有开启TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存数据时会将数据直接保存到MappedFile的直接内存映射(MappedByteBuffer)中。
private boolean mmapOperation() {
// ...
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
// 初始化mappedFile会向TransientStorePool"借"一个writeBuffer
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 创建MappedFile,没有writeBuffer
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
// ...
}
由上可知,消息保存如下图所示
消息刷盘入口方法源码分析
消息保存和刷盘的入口方法CommitLog#asyncPutMessage
,消息保存到mappedFile的缓存后,最后会调用submitFlushRequest方法提交刷盘请求,Broker会根据刷盘策略进行刷盘。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
//... 保存消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
// ...
// 提交刷盘请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// 提交复制请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// 合并提交刷盘请求和提交复制请求结果
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
提交了刷盘请求后,根据刷盘策略,是否开启堆外缓存,推送消息中是否要等待消息保存有如下四种刷盘方式
- 异步刷盘(关闭TransientStorePoolEnabled)
异步刷盘(关闭TransientStorePoolEnabled)是默认的刷盘方案,这个刷盘方案先会**异步唤醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于关闭了TransientStorePoolEnabled,消息是保存到MappedFile中的内存映射文件MappedByteBuffer,FlushRealTimeService将定时MappedByteBuffer刷到磁盘。
- 异步刷盘(开启TransientStorePoolEnabled)
异步刷盘(开启TransientStorePoolEnabled)会先**异步唤醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于开启了TransientStorePoolEnabled,消息会保存到MappedFile中的内存映射文件ByteBuffer,CommitRealTimeService定时将ByteBuffer中的数据刷到FileChannel中。
- 同步刷盘(等待消息保存)
同步刷盘(等待消息保存)会先创建一个刷盘请求(GroupCommitRequest),然后向GroupCommitService提交刷盘请求,最后等待刷盘结果并返回
- 同步刷盘(不等待消息保存)
同步刷盘(不等待消息保存)也是通过GroupCommitService刷盘,与等待消息保存不同的是不等待的方式异步唤醒(wakeup)GroupCommitService后,直接返回消息保存成功。
四种刷盘方式源码如下所示
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 获取同步刷盘Service
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// 创建GroupCommitRequest 刷盘偏移量nextOffset = 当前写入偏移量 + 当前消息写入大小
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 向刷盘监视器(flushDistWatch)提交刷盘请求
flushDiskWatcher.add(request);
// 提交刷盘请求,并且唤醒同步刷盘线程
service.putRequest(request);
return request.future();
} else {
// 同步刷盘,但是不需要等待刷盘结果,那么唤醒同步刷盘线程
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// 异步刷盘
else {
// 是否启动了堆外缓存
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 如果没有启动堆外缓存,则唤醒异步刷盘服务 flushRealTimeService
flushCommitLogService.wakeup();
} else {
// 如果启动了堆外缓存,则唤醒异步转存服务CommitRealTimeService
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
将上面四种场景及调用关系如下图所示
总结
本篇文章介绍了TransientStorePool机制以及开启和管理队消息保存的影响,我们还介绍了RocketMQ中四种刷盘策略
- 同步刷盘-等待消息保存到磁盘
- 同步刷盘-不等待消息保存到磁盘上
- 异步刷盘-开启堆外缓存
- 异步刷盘-不开启堆外缓存
以上就是RocketMQ Broker消息如何刷盘源码解析的详细内容,更多关于RocketMQ Broker消息刷盘的资料请关注编程网其它相关文章!