<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                RocketMQ深入淺出-詳細介紹與安裝

                小編:管理員 39閱讀 2022.08.02

                寫這個專題的初衷:一直都用MQ,但是沒有系統的學習過,當和同事交流時,MQ的很多細節都說不上來,最近刷了兩遍RocketMQ的視頻,打算把筆記整理出來,和大家一起打卡,探究RocketMQ的真面目

                一、RocketMQ概述

                1.1 簡介

                RocketMQ是一個統一消息引擎、輕量級數據處理平臺。 RocketMQ是?款阿?巴巴開源的消息中間件。2016年11?28?,阿?巴巴向 Apache 軟件基?會捐贈RocketMQ,成為 Apache 孵化項?。2017 年 9 ? 25 ?,Apache 宣布 RocketMQ孵化成為 Apache 頂級項?(TLP ),成為國內?個互聯?中間件在 Apache 上的頂級項?。它使用Java語言開發,在阿里內部,RocketMQ承接了例如“雙11”等高并發場景的消息流轉,能夠處理萬億級別的消息。

                1.2 發展歷程

                2007年,阿里開始五彩石項目,Notify作為項目中交易核心消息流轉系統,應運而生。Notify系統是RocketMQ的雛形。

                2010年,B2B大規模使用ActiveMQ作為阿里的消息內核。阿里急需一個具有海量堆積能力的消息系統。

                2011年初,Kafka開源。淘寶中間件團隊在對Kafka進行了深入研究后,開發了一款新的MQ,MetaQ。

                2012年,MetaQ發展到了v3.0版本,在它基礎上進行了進一步的抽象,形成了RocketMQ,然后就將其進行了開源。

                2015年,阿里在RocketMQ的基礎上,又推出了一款專門針對阿里云上用戶的消息系統Aliware MQ。2016年雙十一,RocketMQ承載了萬億級消息的流轉,跨越了一個新的里程碑。11?28?,阿?巴巴向 Apache 軟件基?會捐贈 RocketMQ,成為 Apache 孵化項?。

                2017 年 9 ? 25 ?,Apache 宣布 RocketMQ孵化成為 Apache 頂級項?(TLP ),成為國內?個互聯?中間件在 Apache 上的頂級項?。

                1.3 系統架構

                1.3.1 Producer

                消息生產者,負責生產消息,即往MQ中發送消息的角色。Producer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。

                例如,業務系統產生的日志寫入到MQ的過程,就是消息生產的過程 再如,電商平臺中用戶提交的秒殺請求寫入到MQ的過程,就是消息生產的過程

                RocketMQ中的消息生產者都是以生產者組(Producer Group)的形式出現的。生產者組是同一類生產者的集合,這類Producer發送相同Topic類型的消息,通俗的講就是producer集群部署,多個producer構成一個生產者組具有相同的組名。一個生產者組可以同時發送多個主題的消息。

                1.3.2 consumer

                消息消費者,負責消費消息,即監聽MQ,從MQ中獲取消費進行業務處理的角色。一個消息消費者會從Broker服務器中獲取到消息,并對消息進行相關業務處理。

                例如,QoS系統從MQ中讀取日志,并對日志進行解析處理的過程就是消息消費的過程。再如,電商平臺的業務系統從MQ中讀取到秒殺請求,并對請求進行處理的過程就是消息消費的 過程。

                RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現的。消費者組是同一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息,通俗的講就是消費者集群部署,具有相同的消費者組名。消費者組使得在消息消費方面,實現負載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著消費原Consumer消費的Queue)的目標變得非常容易。

                消費者組中Consumer的數量應該小于等于訂閱Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。上圖Consumer3沒有多余的queue給它消費。

                不過,一個Topic類型的消息可以被多個消費者組同時消費。

                注意: 1)消費者組只能消費一個Topic的消息,不能同時消費多個Topic消息 2)一個消費者組中的消費者必須訂閱完全相同的Topic

                1.3.3 name server

                功能介紹

                NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態注冊與發現。

                RocketMQ的思想來自于Kafka,而Kafka是依賴了Zookeeper的。所以,在RocketMQ的早期版本,即在MetaQ v1.0與v2.0版本中,也是依賴于Zookeeper的。從MetaQ v3.0,即RocketMQ開始去掉了Zookeeper依賴,使用了自己的NameServer。

                主要包括兩個功能:

                ?Broker管理:接受Broker集群的注冊信息并且保存下來作為路由信息的基本數據;提供心跳檢測機制,檢查Broker是否還存活。

                ?路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用于客戶端查詢的隊列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進行消息的投遞和消費。

                路由注冊

                NameServer通常也是以集群的方式部署,不過,NameServer是無狀態的,即NameServer集群中的各個節點間是無差異的,各節點間相互不進行信息通訊。那各節點中的數據是如何進行數據同步的呢?在Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起注冊請求。在NameServer內部維護著?個Broker列表,用來動態存儲Broker的信息。

                注意: 這是與其它像zk、Eureka、Nacos等注冊中心不同的地方。 這種NameServer的無狀態方式,有什么優缺點: 優點:NameServer集群搭建簡單,擴容簡單。 缺點:對于Broker,必須明確指出所有NameServer地址。否則未指出的將不會去注冊。也正因 為如此,NameServer并不能隨便擴容。因為,若Broker不重新配置,新增的NameServer對于 Broker來說是不可見的,其不會向這個NameServer進行注冊。

                Broker節點為了證明自己是活著的,為了維護與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、 Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。

                路由剔除

                由于Broker關機、宕機或網絡抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。

                NameServer中有?個定時任務,每隔10秒就會掃描?次Broker表,查看每一個Broker的最新心跳時間戳距離當前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。

                擴展:對于RocketMQ日常運維工作,例如Broker升級,需要停掉Broker的工作。OP需要怎么做? OP需要將Broker的讀寫權限禁掉。一旦client(Consumer或Producer)向broker發送請求,都會收 到broker的NO_PERMISSION響應,然后client會進行對其它Broker的重試。 當OP觀察到這個Broker沒有流量后,再關閉它,實現Broker從NameServer的移除。 OP:運維工程師 SRE:Site Reliability Engineer,現場可靠性工程師

                路由發現

                RocketMQ的路由發現采用的是Pull模型。當Topic路由信息出現變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取主題最新的路由。默認客戶端每30秒會拉取一次最新的路由。

                擴展: 1)Push模型:推送模型。其實時性較好,是一個“發布-訂閱”模型,需要維護一個長連接。而長連接的維護是需要資源成本的。該模型適合于的場景: ?實時性要求較高 ?Client數量不多,Server數據變化較頻繁 2)Pull模型:拉取模型。存在的問題是,實時性較差。 3)Long Polling模型:長輪詢模型。其是對Push與Pull模型的整合,充分利用了這兩種模型的優 勢,屏蔽了它們的劣勢。

                客戶端NameServer選擇策略

                這里的客戶端指的是Producer與Consumer

                客戶端在配置時必須要寫上NameServer集群的地址,那么客戶端到底連接的是哪個NameServer節點呢?客戶端首先會生產一個隨機數,然后再與NameServer節點數量取模,此時得到的就是所要連接的節點索引,然后就會進行連接。如果連接失敗,則會采用round-robin策略,逐個嘗試著去連接其它節點。

                首先采用的是隨機策略進行的選擇,失敗后采用的是輪詢策略。

                擴展:Zookeeper Client是如何選擇Zookeeper Server的? 簡單來說就是,經過兩次Shuf?e,然后選擇第一臺Zookeeper Server。 詳細說就是,將配置文件中的zk server地址進行第一次shuf?e,然后隨機選擇一個。這個選擇出 的一般都是一個hostname。然后獲取到該hostname對應的所有ip,再對這些ip進行第二次 shuf?e,從shuf?e過的結果中取第一個server地址進行連接。

                1.3.4 broker

                功能介紹

                Broker充當著消息中轉角色,負責存儲消息、轉發消息。Broker在RocketMQ系統中負責接收并存儲從生產者發送來的消息,同時為消費者的拉取請求作準備。Broker同時也存儲著消息相關的元數據,包括消費者組消費進度偏移offset、主題、隊列等。

                Kafka 0.8版本之后,offset是存放在Broker中的,之前版本是存放在Zookeeper中的。

                模塊構成

                下圖為Broker Server的功能模塊示意圖。

                Remoting Module: 整個Broker的實體,負責處理來自clients端的請求。而這個Broker實體則由以下模塊構成。

                Client Manager: 客戶端管理器。負責接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護Consumer的Topic訂閱信息

                Store Service: 存儲服務。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。

                HA Service: 高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。

                Index Service: 索引服務。根據特定的Message key,對投遞到Broker的消息進行索引服務,同時也提供根據Message Key對消息進行快速查詢的功能。

                集群部署

                為了增強Broker性能與吞吐量,Broker一般都是以集群形式出現的。各集群節點中可能存放著相同Topic的不同Queue。不過,這里有個問題,如果某Broker節點宕機,如何保證數據不丟失呢?其解決方案是,將每個Broker集群節點進行橫向擴展,即將Broker節點再建為一個HA集群,解決單點問題。

                Broker節點集群是一個主從集群,即集群中具有Master與Slave兩種角色。Master負責處理讀寫操作請求,Slave負責對Master中的數據進行備份。當Master掛掉了,Slave則會自動切換為Master去工作。所以這個Broker集群是主備集群。一個Master可以包含多個Slave,但一個Slave只能隸屬于一個Master。Master與Slave 的對應關系是通過指定相同的BrokerName、不同的BrokerId來確定的。BrokerId為0表 示Master,非0表示Slave。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。

                1.4 基本概念

                1.4.1 消息(Message)

                消息是指,消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。

                1.4.2 主題(Topic)

                Topic表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。

                topic:message 1:n message:topic 1:1

                一個生產者可以同時發送多種Topic的消息;而一個消費者只對某種特定的Topic感興趣,即只可以訂閱和消費一種Topic的消息。

                producer:topic 1:n consumer:topic 1:1

                1.4.3 標簽(Tag)

                為消息設置的標簽,用于同一主題下區分不同類型的消息。

                來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。

                Topic是消息的一級分類,Tag是消息的二級分類。

                比如用topic標識隊列中消息的類型是貨物,而用tag標記是發往不同目的地 Topic:貨物 tag=上海 tag=江蘇 tag=浙江

                ------- 消費者 ----- 消費者訂閱topic,根據tag的不同做不同的業務處理 topic=貨物 tag = 上海 topic=貨物 tag = 上海|浙江 topic=貨物 tag = *

                1.4.4 隊列(Queue)

                存儲消息的物理實體。

                一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。一個Topic的Queue也被稱為一個Topic中消息的分區(Partition)。

                一個Topic的Queue中的消息只能被一個消費者組(多個具備相同業務處理邏輯的消費者,用相同的消費者組名標識)中的一個消費者消費。一個Queue中的消息不允許同一個消費者組中的多個消費者同時消費。

                在學習參考其它相關資料時,還會看到一個概念:分片(Sharding)。 分片不同于分區。 在RocketMQ中,分片指的是存放相應Topic的Broker,分片將同一個topic的消息根據負載均衡策略分發到不同的RocketMQ實例上。每個分片中會創建出相應數量的分區,即Queue,每個Queue的大小都是相同的。

                1.4.5 消息標識(MessageId/Key)

                RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業務標識的Key,以方便對消息的查詢。

                不過需要注意的是,MessageId有兩個:在生產者send()消息時會自動生成一個MessageId(msgId),當消息到達Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。

                msgId、offsetMsgId與key都稱為消息標識。

                msgId:由producer端生成,其生成規則為:producerIp + 進程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當前時間 + AutomicInteger自增計數器

                offsetMsgId:由broker端生成,其生成規則為:brokerIp + 物理分區的offset(Queue中的偏移量)

                key:由用戶指定的業務相關的唯一標識

                1.5 工作流程

                1.5.1 具體流程

                1)啟動NameServer,NameServer啟動后開始監聽端口,等待Broker、Producer、Consumer連接。

                2)啟動Broker時,Broker會與所有的NameServer建立并保持長連接,然后每30秒向NameServer定時 發送心跳包。

                3)發送消息前,可以先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,當然,在創建Topic時也會將Topic與Broker的關系寫入到NameServer中。不過,這步是可選的,也可以在發送消息時自動創建Topic。

                4)Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取路由信息,即當前發送的Topic消息的Queue與Broker的地址(IP+Port)的映射關系。然后根據算法策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發消息。當然,在獲取到路由信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。

                5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據算法策略從路由信息中獲取到其所要消費的Queue,然后直接跟Broker建立長連接,開始消費其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會向Broker發送心跳,以確保Broker的存活狀態。

                1.5.2 Topic的創建模式

                手動創建Topic時,有兩種模式:

                ?集群模式:該模式下創建的Topic在該集群中,所有Broker中的Queue數量是相同的。

                ?Broker模式:該模式下創建的Topic在該集群中,每個Broker中的Queue數量可以不同。

                自動創建Topic時,默認采用的是Broker模式,會為每個Broker默認創建4個Queue。

                1.5.3 讀/寫隊列

                從物理上來講,讀/寫隊列是同一個隊列。所以,不存在讀/寫隊列數據同步問題。讀/寫隊列是邏輯上進 行區分的概念。一般情況下,讀/寫隊列數量是相同的。

                例如,創建Topic時設置的寫隊列數量為8,讀隊列數量為4,此時系統會創建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到這8個隊列,但Consumer只會消費0 1 2 3這4個隊列中的消息,4 5 6 7中的消息是不會被消費到的。

                再如,創建Topic時設置的寫隊列數量為4,讀隊列數量為8,此時系統會創建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到0 1 2 3 這4個隊列,但Consumer只會消費0 1 2 3 4 5 6 7這8個隊列中 的消息,但是4 5 6 7中是沒有消息的。此時假設Consumer Group中包含兩個Consuer,Consumer1消 費0 1 2 3,而Consumer2消費4 5 6 7。但實際情況是,Consumer2是沒有消息可消費的。

                也就是說,當讀/寫隊列數量設置不同時,總是有問題的。那么,為什么要這樣設計呢?

                其這樣設計的目的是為了,方便Topic的Queue的縮容。

                例如,原來創建的Topic中包含16個Queue,如何能夠使其Queue縮容為8個,還不會丟失消息?可以動態修改寫隊列數量為8,讀隊列數量不變。此時新的消息只能寫入到前8個隊列,而消費都消費的卻是16個隊列中的數據。當發現后8個Queue中的消息消費完畢后,就可以再將讀隊列數量動態設置為8。整個縮容過程,沒有丟失任何消息。

                perm用于設置對當前創建Topic的操作權限:2表示只寫,4表示只讀,6表示讀寫。

                二、安裝

                2.1 準備工作

                2.1.1 下載RocketMQ

                RocketMQ最新版本:4.9.1

                下載地址https://rocketmq.apache.org/release_notes/release-notes-4.9.1/

                2.2.2 環境說明

                Linux64位系統,centos7 JDK1.8(64位) 源碼安裝需要安裝Maven 3.2.x

                2.2 安裝RocketMQ

                2.2.1 安裝步驟

                本教程以二進制包方式安裝

                下載后上傳到Linux,然后解壓安裝包,進入安裝目錄

                2.2.2 目錄介紹

                bin:啟動腳本,包括shell腳本和CMD腳本 conf:實例配置文件 ,包括broker配置文件、logback配置文件等 lib:依賴jar包,包括Netty、commons-lang、FastJSON等

                2.3 啟動RocketMQ

                Tips:RocketMQ默認的虛擬機內存較大,啟動Broker如果因為內存不足失敗,需要編輯如下兩個配置文件,修改JVM內存大小

                編輯runbroker.sh和runserver.sh修改默認JVM大小

                vi runbroker.sh
                vi runserver.sh
                復制

                參考設置:

                JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m  -XX:MaxMetaspaceSize=320m
                復制

                啟動NameServer

                1.啟動NameServer

                nohup sh bin/mqnamesrv &
                復制

                2.查看啟動日志

                tail -f ~/logs/rocketmqlogs/namesrv.log
                復制

                啟動Broker

                1.啟動Broker

                nohup sh bin/mqbroker -n localhost:9876 &
                復制

                2.查看啟動日志

                tail -f ~/logs/rocketmqlogs/broker.log
                復制

                2.4 測試RocketMQ

                2.4.1 發送消息

                1.設置環境變量

                export NAMESRV_ADDR=localhost:9876
                復制

                2.使用安裝包的Demo發送消息

                sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
                復制

                2.4.2 接收消息

                1.設置環境變量

                export NAMESRV_ADDR=localhost:9876
                復制

                2.接收消息

                sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
                復制

                2.5 關閉RocketMQ

                1.關閉NameServer

                sh bin/mqshutdown namesrv
                復制

                2.關閉Broker

                sh bin/mqshutdown broker
                復制
                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>