<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                RocketMQ存儲--同步刷盤和異步刷盤

                小編:管理員 41閱讀 2022.08.02

                目錄
                一、問題思考
                二、Broker啟動刷盤有關調用鏈
                1.調用鏈
                2.線程類關系圖
                三、線程類工作流程
                1.堆外內存線程類CommitRealTimeService工作流程
                2.同步刷盤線程類GroupCommitService工作流程
                3.異步刷盤線程類FlushRealTimeService工作流程
                四、消息追加與線程類的交互
                1.調用鏈
                2.同步刷盤主要代碼
                3.異步刷盤主要代碼
                五、刷盤方式示意圖
                1.同步刷盤示意圖
                2.異步刷盤未開啟堆外緩存示意圖
                3.異步刷盤開啟堆外緩存示意圖
                六、文章總結
                七、主要源碼類清單
                復制一、問題思考

                1.同步刷盤是怎么工作的? 2.異步刷盤是怎么工作的? 3.上篇文章的疑問,寫入堆外內存的消息如何落盤的?

                二、Broker啟動刷盤有關調用鏈 1.調用鏈
                //初始化鏈條
                @1 BrokerStartup#main
                start(createBrokerController(args));
                @2 BrokerStartup#createBrokerController
                final BrokerController controller = new BrokerController(...)
                boolean initResult = controller.initialize();
                @3 BrokerController#initialize
                this.messageStore = new DefaultMessageStore(...);
                @4 DefaultMessageStore#DefaultMessageStore()
                this.commitLog = new CommitLog(this);
                @5 CommitLog#CommitLog()
                if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
                .getFlushDiskType()) {
                this.flushCommitLogService = new GroupCommitService();
                } else {
                this.flushCommitLogService = new FlushRealTimeService();
                }
                this.commitLogService = new CommitRealTimeService();
                //啟動鏈條
                @6 BrokerStartup#start
                controller.start();
                @7 BrokerController#start()
                this.messageStore.start();
                @8 DefaultMessageStore#start()
                this.commitLog.start();
                @9 CommitLog#start()
                this.flushCommitLogService.start();
                if (defaultMessageStore.getMessageStoreConfig()
                .isTransientStorePoolEnable()) {
                this.commitLogService.start();
                }
                復制

                小結:由調用鏈可以看出,初始化并啟動了以下線程類

                1.同步刷盤 GroupCommitService

                2.異步刷盤 FlushRealTimeService

                3.如果開啟堆外內存并且為異步刷盤 CommitRealTimeService

                2.線程類關系圖 三、線程類工作流程

                既然線程類在Broker啟動時就啟動了,他們在做啥呢?

                1.堆外內存線程類CommitRealTimeService工作流程

                小結: 1.CommitRealTimeService主要工作是將寫入堆外內存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道

                2.committedPosition用于記錄將writeBuffer數據寫入到fileChannel中的內存位點(相對偏移量offset) 3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對偏移量offset)

                2.同步刷盤線程類GroupCommitService工作流程注1:

                1.執行onWaitEnd時交換讀寫容器,該線程類提供兩個容器來裝GroupCommitRequest

                2.requestsWrite和requestsRead,每次執行提交(刷盤)前都會進行容器交換

                3.好處:讀寫請求容器分離,避免潛在的鎖競爭

                private void swapRequests() {
                List<GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
                }
                復制注2:

                1.flushedPosition 標記已經刷盤內存的位點。即刷盤相對偏移量,刷盤到什么位置了,下次從此處刷盤即可

                2.flushedWhere 標記已經刷盤的物理偏移量,根據此位置可精確查找到文件中消息的存儲位置。flushedWhere = 當前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition

                注3:流程圖中標記紅色部分,將刷盤結果通知給等待線程

                小結:同步刷盤線程類GroupCommitService主要工作 將請求從讀容器中取出并通過mappedByteBuffer.force()將數據落盤。

                3.異步刷盤線程類FlushRealTimeService工作流程

                小結:FlushRealTimeService主要工作 1.不開啟堆外外內存刷盤方式為mappedByteBuffer.force() 2.開啟堆外內存刷盤方式為fileChannel.force

                疑問:同步刷盤線程類GroupCommitService每執行一次都會交換讀寫容器,那刷盤請求什么時候放到寫容器(requestsWrite)呢?

                四、消息追加與線程類的交互

                分析完線程類后,把鏡頭切換到消息追加,看看消息進來后是如何跟線程類交互的?

                1.調用鏈
                @1 CommitLog#putMessage
                //同步刷盤或者異步刷盤
                handleDiskFlush(result, putMessageResult, msg);
                @2 CommitLog#handleDiskFlush
                復制2.同步刷盤主要代碼

                同步刷盤時構造刷盤請求,將請求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結果。

                if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                //等待MappedFile刷盤成功狀態通過countDownLatch來控制
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                }
                }
                復制3.異步刷盤主要代碼

                未開啟堆外內存喚醒FlushRealTimeServicee,開啟堆外內存喚醒CommitRealTimeService。

                if (!this.defaultMessageStore.getMessageStoreConfig()
                .isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
                } else {
                commitLogService.wakeup();
                }
                復制五、刷盤方式示意圖 1.同步刷盤示意圖 2.異步刷盤未開啟堆外緩存示意圖 3.異步刷盤開啟堆外緩存示意圖 六、文章總結

                1.同異步刷盤通過Broker屬性flushDiskType來設置,默認為ASYNC_FLUSH,同步刷盤配置為SYNC_FLUSH 2.同步刷盤是怎么工作的? 注:見GroupCommitService工作流程及與消息追加交互 3.異步刷盤是怎么工作的? 注:

                見FlushRealTimeService和CommitRealTimeService工作流程及與消息追加交互

                4.上篇文章的疑問,寫入堆外內存的消息如何落盤的? 注:見異步刷盤開啟堆外緩存示意圖

                七、主要源碼類清單
                • CommitLog.java
                • CommitLog#putMessage
                • CommitLog#GroupCommitService
                • CommitLog#FlushRealTimeService
                • CommitLog#CommitRealTimeService
                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>