<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                RocketMq實戰-不同類型生產者(DefaultMQProducer)

                小編:管理員 33閱讀 2022.08.01

                前言 本文來介紹RocketMQ生產者發送消息默認使用的DefaultMQProducer類。

                生產者 向消息隊列里寫入消息,不 同的業務場景需要生產者采用不同的寫入策略 。 比如同步發送、異步發送、 延遲發送、 發送事務消息等。

                正文 我們結合代碼來了解一下,

                代碼位置在package org.apache.rocketmq.example.simple;

                public class AsyncProducer {
                    public static void main(
                        String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
                
                        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
                        producer.setNamesrvAddr("127.0.0.1:9876");
                        producer.start();
                        producer.setRetryTimesWhenSendAsyncFailed(0);
                
                        for (int i = 0; i < 10000000; i++) {
                            try {
                                final int index = i;
                                Message msg = new Message("Jodie_topic_1023",
                                    "TagA",
                                    "OrderID188",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                                producer.send(msg, new SendCallback() {
                                    @Override
                                    public void onSuccess(SendResult sendResult) {
                                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                                    }
                
                                    @Override
                                    public void onException(Throwable e) {
                                        System.out.printf("%-10d Exception %s %n", index, e);
                                        e.printStackTrace();
                                    }
                                });
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        producer.shutdown();
                    }
                }
                復制

                代碼講解

                發送消息要經過幾個步驟 :

                (1 )設置 Producer 的 GroupName。

                (2 )設置 lnstanceName,當一個 Jvm 需要啟動多個 Producer 的時候,通過設置不同的 InstanceName來區分,不設置的話系統使用默認名稱“DEFAULT”。(本例沒有寫)

                ( 3 )設置發送失敗重試次數,當網絡出現異常的時候,這個次數影響消息的重復投遞次數。想保證不丟消息,可以設置多重試幾次 。

                (4 )設置 NameServer 地址 。 (5 )組裝消息并發送 。

                消息的發送有同步和異步兩種方式,上面的代碼使用的是異步方式 。消息發送的返回狀態有如下四種 : FLUSH_DISK_TIMEOUT 、 FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK,不同狀態在不同的刷盤策略和同步策略的配置下含義是不同的 。

                FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要 Broker 的刷盤策被設置成 SYNC_FLUSH 才會報這個錯誤) 。 FLUSH_SLAVE_TIMEOUT :表示在主備方式下,并且 Broker被設 置 成 SYNC_MASTER 方式,沒有在設定時間內完成 主從同步 。 SLAVE_NOT_AVAILABLE : 這個狀態 產生的場景和 FLUSH_SLAVE_TIMEOUT 類似, 表示在主備 方式下,并且 Broker被設置成 SYNC_MASTER,但是沒有找到被配置成 Slave 的 Broker。 SEN_ OK :表示發送成功,發送成功的具體含義,比如消息是否已經 被存儲到融盤?消息是否被同步到了 Slave上?消息在 Slave上是否被 寫人磁盤?需要結合所配置的刷盤策略、主從策略來定 。 這個狀態還可 以簡單理解為,沒有發生上面列出的 三個問題狀態就是 SEND OK。

                發送延遲消息

                Broker收到這類消息后 ,延遲一段時間再處理, 使消息在規定的一段時間后生效。

                使用方法:在創建 Message對象時,調用 setDelayTimeLevel ( int level) 方法設置延遲時間, 然后再把這個消息發送 出去。 目前延遲的時間不支 持任意設置,僅支持預設值的時間長度 ( 1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/ 7m/8m/9m/1Om/20m/30m/1h/2h)。 比如 setDelayTimeLevel(3)表示延遲 10s。

                自定義消息發送規則

                一個 Topic會有多個 Message Queue,如果使用 Producer的默認配置,這 個 Producer 會輪流向各個 Message Queue 發 送 消息 。 Consumer 在消費消息的 時候,會根據負載均衡策略,消費被分配到的 Message Queue,如果不經過特 定的設置,某條消息被發往哪個 Message Queue,被哪個 Consumer 消費是未 知的。

                如果業務 需 要我們把消息 發 送到指定的 Message Queue 里,比如把同 一 類型 的消息都發 往 相同的 Message Queue,可以用 Message- QueueSelector。

                發送消息的時候,把 MessageQueueSelector 的對象作為參數,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函 數發送消 息即可 。 在 MessageQueueSelector 的實現中,根據傳人的 Object參 數,或者根據 Message 消息內容確定把消息發往那個 Message Queue,返回被 選中的 Message Queue。

                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>