文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

RocketMQ设计之同步刷盘的示例分析

2023-06-29 14:25

关注

这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsgOK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}class GroupCommitService extends FlushCommitLogService {        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();        //提交刷盘任务到任务列表        public synchronized void putRequest(final GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            if (hasNotified.compareAndSet(false, true)) {                waitPoint.countDown(); // notify            }        }        private void swapRequests() {            List<GroupCommitRequest> tmp = this.requestsWrite;            this.requestsWrite = this.requestsRead;            this.requestsRead = tmp;        }        private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {                try {                    this.waitForRunning(10);                    this.doCommit();                } catch (Exception e) {                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                }            }            // Under normal circumstances shutdown, wait for the arrival of the            // request, and then flush            try {                Thread.sleep(10);            } catch (InterruptedException e) {                CommitLog.log.warn("GroupCommitService Exception, ", e);            }            synchronized (this) {                this.swapRequests();            }            this.doCommit();            CommitLog.log.info(this.getServiceName() + " service end");        }        @Override        protected void onWaitEnd() {            this.swapRequests();        }        @Override        public String getServiceName() {            return GroupCommitService.class.getSimpleName();        }        @Override        public long getJointime() {            return 1000 * 60 * 5;        }    }

GroupCommitRequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

GroupCommitService每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

两个队列读写分离,requestsWrite是写队列,用户保存添加进来的刷盘任务,requestsRead是读队列,在刷盘之前会把写队列的数据放入读队列。

CommitLog的doCommit方法:

private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            //根据offset确定是否已经刷盘                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    //清空已刷盘的列表                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }

读写分离设计的目的是在刷盘时不影响任务提交到列表。

CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:

public boolean flush(final int flushLeastPages) {    boolean result = true;    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);    if (mappedFile != null) {        long tmpTimeStamp = mappedFile.getStoreTimestamp();        int offset = mappedFile.flush(flushLeastPages);        long where = mappedFile.getFileFromOffset() + offset;        result = where == this.flushedWhere;        this.flushedWhere = where;        if (0 == flushLeastPages) {            this.storeTimestamp = tmpTimeStamp;        }    }    return result;}

通过MappedFile映射的CommitLog文件写入磁盘

这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。

以上是“RocketMQ设计之同步刷盘的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网行业资讯频道!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯