<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                RocketMQ存儲--日志文件創建與映射流程

                小編:管理員 29閱讀 2022.08.03

                1.問題描述

                日志目錄(可配置)/data/rocketmq/store/commitlog會有20位長度的日志文件。 1.日志文件什么時候創建的? 2.日志文件創建流程是什么? 3.日志文件和內存映射是怎么樣的?

                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:50 00000117290188144640
                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:52 00000117291261886464
                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:54 00000117292335628288
                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117293409370112
                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:57 00000117294483111936
                -rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117295556853760
                復制2.日志映射相關類初始化

                在Broker啟動時實例化了兩個類DefaultMessageStore和AllocateMappedFileService。 AllocateMappedFileService是線程類繼承了Runnable接口,該線程類持有DefaultMessageStore的引用(即:可操作管理DefaultMessageStore),并啟動該線程類。 調用鏈

                //Broker啟動時調用
                @1 BrokerStartup#main#createBrokerController()
                boolean initResult = controller.initialize();
                @2 Controller#initialize()
                this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
                @3 DefaultMessageStore#DefaultMessageStore()
                //將DefaultMessageStore自身引用傳給AllocateMappedFileService
                this.allocateMappedFileService = new AllocateMappedFileService(this);
                this.allocateMappedFileService.start();//啟動該線程類
                @4 class AllocateMappedFileService extends ServiceThread
                public AllocateMappedFileService(DefaultMessageStore messageStore) {
                this.messageStore = messageStore;
                }
                復制3.線程類一直運行在干啥

                既然在Broker啟動時該線程類AllocateMappedFileService就啟動了,那么在做什么呢?run方法為while循環,即:只要服務不停止并且mmapOperation()返回true則一直運行。

                //異步處理,調用mmapOperation完成請求的處理
                public void run() {
                log.info(this.getServiceName() + " service started");
                //while循環,只要服務部停止即調用 mmapOperation方法
                while (!this.isStopped() && this.mmapOperation()) {
                }
                log.info(this.getServiceName() + " service end");
                }
                復制

                mmapOperation方法流程圖

                小結:mmapOperation方法主要做了兩件事:初始化MappedFile和預熱MappedFile。只要服務不停止和線程不被中斷,這個過程一直重復運行。

                4.提交映射文件請求(AllocateRequest)

                既然AllocateMappedFileService一直從容器(優先級隊列和ConcurrentHashMap)中獲取AllocateRequest。AllocateRequest是什么時候產生并放到容器中的呢? RocketMQ消息存儲概覽【源碼筆記】中寫入commitLog流程,獲取最新的日志文件。 調用鏈

                @1 CommitLog#putMessage
                //文件已滿或者沒有映射文件重新創建一個文件
                if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                // Mark: NewFile may be cause noise
                }
                @2 MappedFileQueue#getLastMappedFile
                復制

                流程圖

                小結:MappedFileQueue#getLastMappedFile會向線程類AllocateMappedFileServic提交兩個映射文件創建請求:分別為nextFilePath和nextNextFilePath;如果線程類AllocateMappedFileServic為null,則直接new一個MappedFile,此時只會創建一個文件。

                下面為提交兩個映射文件請求流程,即:

                AllocateMappedFileServic#putRequestAndReturnMappedFile

                調用鏈

                @1 MappedFileQueue#getLastMappedFile
                if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
                }
                @2 AllocateMappedFileService#putRequestAndReturnMappedFile
                復制

                流程圖

                小結:處理提交的映射文件請求指的是: 實例化兩個AllocateRequest并把他們提交到requestTable(ConcurrentHashMap) 和requestQueue(PriorityBlockingQueue)中,等待5秒,此段時間線程會從這兩個容器中獲取請求并創建MappedFile,并將結果返回。

                5.MappedFile初始化

                本段梳理下上文中mmapOperation方法流程圖第5步初始化MappedFile的流程 調用鏈

                @1 AllocateMappedFileService#mmapOperation
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                @2 MappedFile#init(fileName, fileSize);
                復制

                流程圖

                小結:MappedFile主要干了兩件事:1.創建日志文件。2.并將文件映射到內存中。

                6.總結

                1.日志文件什么時候創建的? 備注:在寫入消息時,需要獲取最新的日志文件(MappedFile),如果文件不存在或者已經寫滿,此時需要創建MappedFile。具體在MappedFile#init方法this.file = new File(fileName)進行創建。 2.日志文件創建流程是什么? 備注:將兩個映射創建請求(nextReq和nextNextReq)提交到requestTable(ConcurrentHashMap)和requestQueue(PriorityBlockingQueue)容器中;由線程不斷檢查并從容器中取出創建日志文件(MappedFile)。 3.日志文件和內存映射是怎么樣的? 備注:具體的映射時在MappedFile#init方法中通過this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);將日志文件映射到內存中的。

                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>