<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                MQ44# RocketMQ幾個最近被問的問題

                小編:管理員 44閱讀 2022.08.03

                前言

                本周有點瑣事,沒有源碼文章輸出,掉一次鏈子。整理了幾個最近被問的問題,大家隨便看看。

                問題一

                問: RocketMQ消費者訂閱了tag,但卻收不到消息無法消費,并且根據 msgid 去查詢,發現這條消息的狀態為 CONSUMED_BUT_FILTERED,那這是為什么?

                答: 在RocketMQ中,一個消費組能同時訂閱多個 tag,但一個消費組的不同消費者不能分開訂閱不同的tag,即同一個消費組的訂閱關系必須保持一樣。例如:常見錯誤使用方式同一個項目中,一段消費代碼訂閱tagA,然后拷貝到這段代碼再更改為tagB。

                正確用法

                public void subscribe(){
                 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer");
                 consumer.subscribe("melon_online_test","tag1 || tag2 || tag3");
                }
                復制

                錯誤用法

                public class SubscribeTest {
                  public void subscribeA(){
                    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer");
                    consumer.subscribe("melon_online_test","tag1");
                  } 
                
                  public void subscribeB(){
                    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer");
                    consumer.subscribe("melon_online_test","tag2");
                  } 
                }
                復制

                問題二

                問: 發現大量的RocketMQ client 大量的info日志輸出,我不關心,如何禁用呢?

                答: 嘗試以下設置,項目中使用了Slf4j

                @1 可以配置RocketmqClient的logger設置優先級為warn

                @2 也可以通過-Drocketmq.client.logUseSlf4j=false 和 -Drocketmq.client.logLevel=WARN 關閉MQ客戶端使用Slf4j并提高日志等級

                項目中沒有使用Slf4j,可以通過-Drocketmq.client.logLevel=WARN調高日志等級。

                問題三

                問: 我的服務消費后需要調用第三方接口,別人的接口調用有限制,Rocketmq消費可以限流嗎?

                答: RocketMQ本身沒有類似每秒消費多少條數據的精確限流,我們可以結合Sentienl來實現,示例代碼如下:

                private String KEY = "melon_topic:melon_consumer"; // 資源名稱由topic和消費組構成
                    
                    public static void main(String[] args) throws InterruptedException, MQClientException {
                        initFlowControlRule(); // Sentinel流控規則
                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_consumer");
                        consumer.setNamesrvAddr("localhost:9876");
                        consumer.subscribe("melon_topic", "*");
                        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                        consumer.registerMessageListener(new MessageListenerConcurrently() {
                            @Override
                            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                for (MessageExt msg : msgs) {
                                    Entry entry = null;
                                    try {
                                        ContextUtil.enter(KEY); // 定義資源
                                        entry = SphU.entry(KEY, EntryType.OUT);
                                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
                                    } catch (BlockException ex) {
                                        // Blocked.被限流后消息重試
                                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                    } finally {
                                        if (entry != null) {
                                            entry.exit();
                                        }
                                        ContextUtil.exit();
                                    }
                                }
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        });
                        consumer.start();
                        System.out.printf("Consumer Started.%n");
                    }
                
                    private static void initFlowControlRule() {
                        FlowRule rule = new FlowRule();
                        rule.setResource(KEY);
                        rule.setCount(5);// 每秒通過5條消息
                        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
                        rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
                        rule.setMaxQueueingTimeMs(5 * 1000); // 排隊超時時間5秒
                        FlowRuleManager.loadRules(Collections.singletonList(rule));
                    }
                復制

                問題四

                問:RocketMQ默認延遲等級有18個,我可以擴增嗎?

                答: 可以的,但是不建議擴增太多等級,可以通過修改broker屬性messageDelayLevel來實現,注意修改了后需要重啟broker。例如:

                messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 3d 7d 14d 21d
                復制
                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>