<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                RocketMQ之Pull Consumer負載均衡拉取正確姿勢?

                小編:管理員 29閱讀 2022.08.03

                RocketMQ分布式消息中間件Pull模式下,我們的一般消費步驟如下:

                1. 讀取 topic的消息隊列message queue的信息; 2. 按隊列去拉取一定數目的消息; 3.(持久化message queue的消費進度consume offset)

                首先有一些關鍵概念,我們需要理清楚:

                consume offset 是基于topic 以及 消費組 consumer group的,意思是什么?

                意思就是當采用集群消費模型(CLUSTERING),我們的consume offset 默認是存儲在broker服務器上的config/consumerOffset.json文件。

                {
                	"offsetTable":{
                		"TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578
                		}
                	}
                }
                復制

                (TopicTest為某個topic,初始化為4個隊列, pullConsumerGroupTest為某個消費組)

                那么當同一個consumer group的多個consumer instance默認是共享一個消費進度。

                那么問題來了,我們同一個consumer group的多個consumer instance在第一個步驟的時候,如何快速感知和分配合適的消息隊列message queue,給每一個consumer instance消費呢??

                ---即 負載均衡問題如何解決?

                廢話不多說,貼個代碼:

                public Set<MessageQueue> fetchMessageQueuesForPullOperation(String topic)
                			throws MQClientException, InterruptedException {
                		DefaultMQPullConsumer pullConsumer; // please init
                		long fetchQueueTimeoutMillis = 5000l;
                		long fetchQueueNextDelayTimeMillis = 200l;
                
                		Set<MessageQueue> msgQueues = null;
                		switch (pullConsumer.getMessageModel()) {
                			case BROADCASTING:
                				msgQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
                				break;
                			case CLUSTERING:
                				msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
                				// 未獲取到負載均衡的時候,等待fetchQueueNextDelayTimeMillis毫秒重新獲取,直到超時
                				long timeout = 0L;
                				while (CollectionUtils.isEmpty(msgQueues) && timeout < fetchQueueTimeoutMillis) {
                					Thread.sleep(fetchQueueNextDelayTimeMillis);
                					timeout += fetchQueueNextDelayTimeMillis;
                					msgQueues = pullConsumer.fetchMessageQueuesInBalance(topic);
                				}
                				break;
                			default:
                				break;
                		}
                		return msgQueues;
                	}
                復制

                當然還有一種內置的

                MQPullConsumerScheduleService
                復制

                也是可以實現。不同的是這個是回調模式。

                by 斯武丶風晴 https://my.oschina.net/langxSpirit

                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>