<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                快手基于 RocketMQ 的在線消息系統建設實踐

                小編:管理員 22閱讀 2022.08.03

                為什么建設在線消息系統

                在引入 RocketMQ 之前,快手已經在大量的使用 Kafka 了,但并非所有情況下 Kafka 都是最合適的,比如以下場景:

                • 業務希望個別消費失敗以后可以重試,并且不堵塞后續其它消息的消費。
                • 業務希望消息可以延遲一段時間再投遞。
                • 業務需要發送的時候保證數據庫操作和消息發送是一致的(也就是事務發送)。
                • 為了排查問題,有的時候業務需要一定的單個消息查詢能力。

                為了應對以上這類場景,我們需要建設一個主要面向在線業務的消息系統,作為 Kafka 的補充。在考察的一些消息中間件中,RocketMQ 和業務需求匹配度比較高,同時部署結構簡單,使用的公司也比較多,于是最后我們就采用了 RocketMQ。

                部署模式和落地策略

                在一個已有的體系內落地一個開源軟件,通常大概有兩種方式:

                方式一:在開源軟件的基礎上做深度修改,很容易實現公司內需要的定制功能。但和社區開源版本分道揚鑣,以后如何升級?

                方式二:盡量不修改社區版本(或減少不兼容的修改),而是在它的外圍或者上層進一步包裝來實現公司內部需要的定制功能。

                注:上圖方式一的圖畫的比較極端,實際上很多公司是方式一、方式二結合的。

                我們選擇了方式二。最早的時候,我們使用的是 4.5.2 版本,后來社區 4.7 版本大幅減小了同步復制的延遲,正好我們的部署模式就是同步復制,于是就很輕松的升級了 4.7 系列,享受了新版本的紅利。

                在部署集群的時候,還會面臨很多部署策略的選擇:

                • 大集群 vs 小集群
                • 選擇副本數
                • 同步刷盤 vs 異步刷盤
                • 同步復制 vs 異步復制
                • SSD vs 機械硬盤

                大集群會有更好的性能彈性,而小集群具有更好的隔離型,此外小集群可以不需要跨可用區 /IDC 部署,所以會有更好的健壯性。我們非?粗胤定性,因此選擇了小集群。集群同步復制異步刷盤,首選 SSD。

                客戶端封裝策略

                如上所述,我們沒有在 RocketMQ 里面做深度修改,所以需要提供一個 SDK 來實現公司內需要的定制功能,這個 SDK 大概是這樣的:

                對外只提供最基本的 API,所有訪問必須經過我們提供的接口。簡潔的 API 就像冰山的一個角,除了對外的簡單接口,下面所有的東西都可以升級更換,而不會破壞兼容性。

                業務開發起來也很簡單,只要需要提供 Topic(全局唯一)和 Group 就可以生產和消費,不用提供環境、NameServer 地址等。SDK 內部會根據 Topic 解析出集群 NameServer 的地址,然后連接相應的集群。生產環境和測試環境環境會解析出不同的地址,從而實現了隔離。

                上圖分為 3 層,第二層是通用的,第三層才對應具體的 MQ 實現,因此,理論上可以更換為其它消息中間件,而客戶端程序不需要修改。

                SDK 內部集成了熱變更機制,可以在不重啟 Client 的情況下做動態配置,比如下發路由策略(更換集群 NameServer 的地址,或者連接到別的集群去),Client 的線程數、超時時間等。通過 Maven 強制更新機制,可以保證業務使用的 SDK 基本上是最新的。

                集群負載均衡 & 機房災備

                所有的 Topic 默認都分配到兩個可用區,生產者和消費者會同時連接至少兩個獨立集群(分布在不同的可用區),如下圖:

                生產者同時連接兩個集群,如果可用區 A 出現故障,流量就會自動切換到可用區 B 的集群 2 去。我們開發了一個小組件來實現自適應的集群負載均衡,它包含以下能力:

                • 千萬級 OPS
                • 靈活的權重調整策略
                • 健康檢查支持/事件通知
                • 并發度控制(自動降低響應慢的服務器的請求數)
                • 資源優先級(類似 Envoy,實現本地機房優先,或是被調服務器很多的時候選取一個子集來調用)
                • 自動優先級管理
                • 增量熱變更

                實際上它并不僅僅用于消息生產者,而是一個通用的主調方負載均衡類庫,可以在 Github 上找到:https://github.com/PhantomThief/simple-failover-java。

                核心的 SimpleFailover 接口和 PriorityFailover 類沒有傳遞第三方依賴,非常容易整合。

                多樣的消息功能
                延遲消息

                延遲消息是非常重要的業務功能,不過 RocketMQ 內置的延遲消息只能支持幾個固定的延遲級別,所以我們又開發了單獨的 Delay Server 來調度延遲消息:

                上圖這個結構沒有直接將延遲消息發到 Delay Server,而是更換 Topic 以后存入 RocketMQ。這樣的好處是可以復用現有的消息發送接口(以及上面的所有擴展能力)。對業務來說,只需要在構造消息的時候額外指定一個延遲時間字段即可,其它用法都不變。

                事務消息

                RocketMQ 4.3 版本以后支持了事務消息,可以保證本地事務和消費發送同時成功或者失敗,對于一些業務場景很有幫助。事務消息的用法和原理有很多資料,這里就不細述了。但關于事務消息的實踐網上資料較少,我們可以給出一些建議。

                首先,事務消息功能一直在不斷完善,應該使用最新的版本,至少是 4.6.1 以后的版本,可以避免很多問題。

                其次,事務消息性能是不如普通消息的,它在內部實際上會生成 3 個消息(一階段 1 個,二階段 2 個),所以性能大約只有普通消息的 1/3,如果事務消息量大的話,要做好容量規劃;夭檎{度線程也只有 1 個,不要用極限壓力去考驗它。

                最后有一些參數注意事項。在 Broker 的配置中:

                • transientStorePoolEnable 這個參數必須保持默認值 false,否則會有嚴重的問題。
                • endTransactionThreadPoolNums是事務消息二階段處理線程大小,sendMessageThreadPoolNums 則指定一階段處理線程池大小。如果二階段的處理速度跟不上一階段,就會造成二階段消息丟失導致大量回查,所以建議 endTransactionThreadPoolNums 應該大于 sendMessageThreadPoolNums,建議至少 4 倍。
                • useReentrantLockWhenPutMessage 設置為 true(默認值是 false),以免線程搶鎖出現嚴重的不公平,導致二階段處理線程長時間搶不到鎖。
                • transactionTimeOut 默認值 6 秒太短了,如果事務執行時間超過 6 秒,就可能導致消息丟失。建議改到 1 分鐘左右。

                生產者 Client 也有一個注意事項,如果有多組 Broker,并且是 2 副本(有 1 個 Slave),應該打開 retryAnotherBrokerWhenNotStoreOK,以免某個 Slave 出現故障以后,大量消息發送失敗。

                分布式對賬監控

                除了比較一些常規的監控手段以外,我們開發了一個監控程序做分布式對賬?梢园l現我們的集群以及我們提供的 SDK 是否有異常。

                具體做法是在每個 Broker 上都建立一個監控專用的 Topic,監控程序使用我們自己提供的 SDK 框架來連接集群(就像我們的業務用戶那樣),監控生產者會給每個集群發送少量消息。然后檢查發送是否成功:

                發送成功

                成功

                刷盤超時

                Slave 超時

                Slave 不可用

                發送失敗

                具體錯誤碼

                生產者只對這些結果進行打點,不判斷是否正常,具體到監控(或者演練)場景可以配置不同的報警規則。

                消費者收到了消息會通過 TCP 旁路 ACK 生產者,生產者這邊會做分布式對賬,將對賬結果打點:

                • 收到消息
                • 消息丟失(或超時未收到消息)
                • 重復收到消息
                • 消息生成到最終消費的時間差
                • ACK 生產者失。ㄓ上M者打點)

                同樣監控程序只負責打點,報警規則可另外配置。

                這套機制也可以用于分布式性能壓測和故障演練。在做壓測的時候,每個消息都 ACK 的話,對生產者的內存壓力很大,因為它發出去的消息,需要在內存中保留一段時間(直到到達這個消息的對賬時間),這段時間消費者 ACK 或者重復 ACK 都需要記錄。所以我們實現了按比例抽樣對賬的功能,開啟以后只有需要對賬的消息才會在內存中保留一段時間。

                順便說一下,我們做壓測時,合格的標準是異步生產不失敗、消費不延遲、每一個消息都不丟失。這樣做是為了保證壓測時能給出更加準確的,可供線上系統參考的性能數字,而不是制造理想條件,追求一個大的數字。比如異步生產比同步生產更脆弱(壓測 Client 如果同步生產,Broker 抖動的時候,同步 Client 會被堵塞導致發送速度降低,于是降低了 Broker 壓力,消息發送不容易失敗,但是會看到發送速率在波動),更貼近生產環境的實際情況,我們就選擇異步生產來評估。

                性能優化

                Broker 默認的參數在我們的場景下(SSD、同步復制、異步刷盤)不是最優的,有的參數也許在大多數場景下都不是最優的。我們列出一些重要的參數,供大家參考:

                參數

                默認值

                說明

                flushCommitLogTimed

                False

                默認值不合理,異步刷盤這個參數應該設置成 true,導致頻繁刷盤,對性能影響極大。

                deleteWhen

                04

                幾點刪除過期文件的時間,刪除文件時有很多磁盤讀,這個默認值是合理的,有條件的話還是建議低峰刪除。

                sendMessageThreadPoolNums

                1

                處理生產消息的線程數,這個線程干的事情很多,建議設置為 2~4,但太多也沒有什么用。因為最終寫 commit log 的時候只有一個線程能拿到鎖。

                useReentrantLockWhenPutMessage

                False

                如果前一個參數設置比較大,這個最好設置為 true,避免高負載下自旋鎖空轉消耗 CPU。

                sendThreadPoolQueueCapacity

                10000

                處理生產消息的隊列大小,默認值可能有點小,比如 5 萬 TPS(異步發送)的情況下,卡 200ms 就會爆。設置比較小的數字可能是擔心有大量大消息撐爆內存(比如 100K 的話, 1 萬個的消息大概占用 1G 內存,也還好),具體可以自己算,如果都是小消息,可以把這個數字改大?梢孕薷 Broker 參數限制 Client 發送大消息。

                brokerFastFailureEnable

                True

                Broker 端快速失。ㄏ蘖鳎,和下面兩個參數配合。這個機制可能有爭議,client 設置了超時時間,如果 client 還愿意等,并且 sendThreadPoolQueue 還沒有滿,不應該失敗,sendThreadPoolQueue 滿了自然會拒絕新的請求。但如果 Client 設置的超時時間很短,沒有這個機制可能導致消息重復?梢宰孕袥Q定是否開啟。理想情況下,能根據 Client 設置的超時時間來清理隊列是最好的。

                waitTimeMillsInSendQueue

                200

                200ms 很容易導致發送失敗,建議改大,比如 1000ms。

                osPageCacheBusyTimeOutMills

                1000

                Page cache 超時時間,如果內存比較多,比如 32G 以上,建議改大點。

                總結


                得益于簡單、幾乎 0 依賴的部署模式,使得我們部署小集群的成本非常低;不對社區版本進行魔改,保證我們可以及時升級;統一 SDK 入口方便集群維護和功能升級;通過復合小集群+自動負載均衡實現多機房多活;充分利用 RocketMQ 的功能,比如事務消息、延遲消息(增強)來滿足業務的多樣性需求;通過自動的分布式對賬,對每一個 Broker 以及我們的 SDK 進行正確性監控。

                本文也進行了一些性能參數的分享,但寫的比較簡單,基本只說了怎么調,但沒能細說為什么,以后我們會另寫文章詳述。目前 RocketMQ 已經應用在公司在大多數業務線,期待將來會有更好的發展!

                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>