<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                分布式事務之消息最終一致性事務——RocketMQ的實現

                小編:管理員 69閱讀 2022.07.29

                號外:最近整理了一下以前編寫的一系列Spring Boot內容,整了個《Spring Boot基礎教程》的PDF,關注我,回復:001,快來領取吧~!更多內容持續整理中,幫助大家更好的學習Spring相關的系列內容!

                上一篇《我說分布式事務之消息最終一致性事務(一):原理及實現》中,我們講解了可靠消息最終一致性的實現原理及如何基于一款開源的消息中間件,實現一個可靠消息服務的思路。

                本文,我們講解如何利用開源消息中間件RocketMQ的特性–事務消息,實現基于消息一致性的最終一致的分布式事務。

                RocketMQ是阿里巴巴開源的一款高性能、高可靠的消息中間件,經歷過雙11等大流量高并發的大考,是國內開源界的翹楚,在業界有著廣泛的應用。

                我假設你對RocketMQ有著一定的了解,就不對它的基礎概念及使用做進一步的展開,如果需要,請參考官方文檔做進一步的學習了解RocketMQ官網。

                按照我們的套路,先上圖。

                原理簡介
                1. RocketMQ提供了類似X/Open XA的分布事務功能,通過MQ的事務消息能達到分布式事務的最終一致。
                2. 發送方在業務執行開始會先向消息隊列中投遞 “半消息” ,半消息即暫時不會真正投遞的消息,當發送方(即生產者)將消息成功發送給了MQ服務端且并未將該消息的二次確認結果返回,此時消息狀態是“暫時不可投遞”狀態(可以認為是狀態未知)。該狀態下的消息即半消息。
                3. 如果出現網絡閃斷、生產者應用重啟等原因導致事務消息二次確認丟失,MQ服務端會通過掃描發現某條消息長期處于 “半消息” 狀態,MQ服務端會主動向生產者查詢該消息的最終狀態是處于Commit(消息提交)還是Rollback(消息回滾)。這個過程稱為消息回查。

                有了上述的概念,我們詳細解釋一下事務消息交互的過程。

                1. 首先,MQ發送方向MQ服務(即RocketMQ的Broker)發送半消息。
                2. MQ服務端會將消息做持久化處理,并發送ACK確認消息已經發送成功。
                3. MQ發送方執行本地事務
                4. MQ發送方根據本地事務執行的結果向MQ服務提交二次確認:如果本地事務執行成功,則提交消息狀態為Commit,否則為Rollback。MQ服務端收到Commit狀態的消息將消息標記為可投遞狀態,訂閱方最終會收到該條消息。如果收到的是Rollback,最終MQ服務端會刪除該條半消息,訂閱方不會接收到這條消息。
                5. 如果出現網絡閃斷、應用重啟等情況,4階段替提交的二次確認最終并未能到達MQ服務端,一定時間之后,MQ服務端會對此消息發起回查操作,確認發送方本地事務的執行狀態。
                6. 發送方需要實現服務回查邏輯供MQ服務端進行回調。當發送方收到回查后,需要檢查對應消息的本地事務執行的最終結果,此處也需要根據本地事務的成功或失敗返回Commit或者Rollback,即再次提交消息狀態的二次確認,MQ服務端仍會按照步驟4對該半消息進行操作。

                注意 1-4 為事務消息的發送過程, 5-6 為事務消息的回查過程。

                如何使用

                此處我引用官網的demo,進行簡單說明。

                事務狀態:rocketmq定義了三種事務狀態

                1. TransactionStatus.CommitTransaction:消息提交,當消息狀態為 CommitTransaction,表示允許消費者允許消費當前消息
                2. TransactionStatus.RollbackTransaction:消息回滾,表示MQ服務端將會刪除當前半消息,不允許消費者消費。
                3. TransactionStatus.Unknown:中間狀態,表示MQ服務需要發起回查操作,檢測當前發送方本地事務的執行狀態。

                發送事務消息

                (1)創建事務消息生產者

                使用TransactionMQProducer創建消息發送客戶端。并指定一個唯一的生產者組producerGroup,當執行完本地事務,需要返回給MQ服務端執行結果,返回上面的三種事務狀態。CommitTransaction、RollbackTransaction、Unknown

                public class TransactionProducer {
                    public static void main(String[] args) throws MQClientException, InterruptedException {
                        TransactionListener transactionListener = new TransactionListenerImpl();
                        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
                        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                             @Override
                             public Thread newThread(Runnable r) {
                                 Thread thread = new Thread(r);
                                 thread.setName("client-transaction-msg-check-thread");
                                 return thread;
                             }
                        });
                
                        producer.setExecutorService(executorService);
                        producer.setTransactionListener(transactionListener);
                        producer.start();
                
                        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
                        for (int i = 0; i < 10; i++) {
                            try {
                                Message msg =
                                      new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                                          ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                                   SendResult sendResult = producer.sendMessageInTransaction(msg, null);                   System.out.printf("%s%n", sendResult);
                                Thread.sleep(10);
                            } catch (MQClientException | UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                        }
                           for (int i = 0; i < 100000; i++) {               hread.sleep(1000);        }
                        producer.shutdown();
                    }
                }
                復制

                (2) 實現TransactionListener接口

                實現executeLocalTransaction方法。消息生產者需要在executeLocalTransaction中執行本地事務當事務半消息提交成功,執行完畢后需要返回事務狀態碼。 實現checkLocalTransaction方法,該方法用于進行本地事務執行情況回查,并回應事務狀態給MQ的broker,執行完成之后需要返回對應的事務狀態碼。

                public class TransactionListenerImpl implements TransactionListener {
                
                
                   private AtomicInteger transactionIndex = new AtomicInteger(0);
                
                   private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
                
                   @Override
                   public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                       int value = transactionIndex.getAndIncrement();
                       int status = value % 3;
                       localTrans.put(msg.getTransactionId(), status);
                       return LocalTransactionState.UNKNOW;
                   }
                
                   @Override
                   public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                       Integer status = localTrans.get(msg.getTransactionId());
                       if (null != status) {
                           switch (status) {
                               case 0:
                                   return LocalTransactionState.UNKNOW;
                               case 1:
                                   return LocalTransactionState.COMMIT_MESSAGE;
                               case 2:
                                   return LocalTransactionState.ROLLBACK_MESSAGE;
                           }
                       }
                       return LocalTransactionState.COMMIT_MESSAGE;
                   }
                }
                復制

                對于消費者,需要通過業務參數保證消費的冪等。

                附錄:RocketMQ事務消息實現原理

                有些同學想要深入了解RocketMQ實現事務消息操作的原理,這里我引用一下官方的一段博客的內容,具體的地址為:里程碑|Apache RocketMQ 正式開源分布式事務消息

                RocketMQ事務消息在實現上充分利用了RocketMQ本身機制,在實現零依賴的基礎上,同樣實現了高性能、可擴展、全異步等一系列特性。 在具體實現上,RocketMQ通過使用Half Topic 以及Operation Topic 兩個內部隊列來存儲事務消息推進狀態,如下圖所示:

                其中,Half Topic對應隊列中存放著prepare消息,Operation Topic對應的隊列則存放了prepare message對應的commit/rollback消息,消息體中則是prepare message對應的offset,服務端通過比對兩個隊列的差值來找到尚未提交的超時事務,進行回查。 在具體實現上,事務消息作為普通消息的一個應用場景,在實現過程中進行了分層抽象,從而避免了對RocketMQ原有存儲機制的修改,如下圖所示:

                從用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可;而在service層,則對事務消息的兩階段提交進行了抽象,同時針對超時事務實現了回查邏輯,通過不斷掃描當前事務推進狀態,來不斷反向請求Producer端獲取超時事務的執行狀態,在避免事務掛起的同時,也避免了Producer端的單點故障。而在存儲層,RocketMQ通過Bridge封裝了與底層隊列存儲的相關操作,用以操作兩個對應的內部隊列,用戶也可以依賴其它他存儲介質實現自己的service,RocketMQ會通過ServiceProvider加載進來。 從上述事務消息設計中可以看到,RocketMQ事務消息較好的解決了事務的最終一致性問題,事務發起方僅需要關注本地事務執行以及實現回查接口給出事務狀態判定等實現,而且在上游事務峰值高時,可以通過消息隊列,避免對下游服務產生過大壓力。 事務消息不僅適用于上游事務對下游事務無依賴的場景,還可以與一些傳統分布式事務架構相結合,而MQ的服務端作為天生的具有高可用能力的協調者,使得我們未來可以基于RocketMQ提供一站式輕量級分布式事務解決方案,用以滿足各種場景下的分布式事務需求。

                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>