<form id="jrvnn"></form>

        <address id="jrvnn"><listing id="jrvnn"></listing></address>
        <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

          <sub id="jrvnn"><listing id="jrvnn"></listing></sub>

                聊聊RocketMQTemplate

                小編:管理員 69閱讀 2022.07.29

                本文主要研究一下RocketMQTemplate

                RocketMQTemplate

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {
                    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);
                
                    private DefaultMQProducer producer;
                
                    private ObjectMapper objectMapper;
                
                    private String charset = "UTF-8";
                
                    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();
                
                    private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!
                
                    //......
                
                    @Override
                    public void afterPropertiesSet() throws Exception {
                        if (producer != null) {
                            producer.start();
                        }
                    }
                
                    @Override
                    protected void doSend(String destination, Message<?> message) {
                        SendResult sendResult = syncSend(destination, message);
                        log.debug("send message to `{}` finished. result:{}", destination, sendResult);
                    }
                
                
                
                    @Override
                    protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) {
                        String content;
                        if (payload instanceof String) {
                            content = (String) payload;
                        } else {
                            // If payload not as string, use objectMapper change it.
                            try {
                                content = objectMapper.writeValueAsString(payload);
                            } catch (JsonProcessingException e) {
                                log.error("convert payload to String failed. payload:{}", payload);
                                throw new RuntimeException("convert to payload to String failed.", e);
                            }
                        }
                
                        MessageBuilder<?> builder = MessageBuilder.withPayload(content);
                        if (headers != null) {
                            builder.copyHeaders(headers);
                        }
                        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);
                
                        Message<?> message = builder.build();
                        if (postProcessor != null) {
                            message = postProcessor.postProcessMessage(message);
                        }
                        return message;
                    }
                
                    @Override
                    public void destroy() {
                        if (Objects.nonNull(producer)) {
                            producer.shutdown();
                        }
                
                        for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) {
                            if (Objects.nonNull(kv.getValue())) {
                                kv.getValue().shutdown();
                            }
                        }
                        cache.clear();
                    }
                
                    //......
                }
                復制
                • RocketMQTemplate繼承了spring-messaging的AbstractMessageSendingTemplate,實現了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
                • afterPropertiesSet方法執行producer.start();destroy方法執行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
                • doSend方法內部調用的是syncSend方法,返回的sendResult僅僅debug輸出;doConvert方法針對String類型的payload不做處理,其他類型使用objectMapper.writeValueAsString轉為String作為content,然后構造message,執行postProcessor.postProcessMessage,然后返回
                syncSend

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
                     *
                     * @param destination formats: `topicName:tags`
                     * @param message     {@link org.springframework.messaging.Message}
                     * @param timeout     send timeout with millis
                     * @param delayLevel  level for the delay message
                     * @return {@link SendResult}
                     */
                    public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("syncSend failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            long now = System.currentTimeMillis();
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            if (delayLevel > 0) {
                                rocketMsg.setDelayTimeLevel(delayLevel);
                            }
                            SendResult sendResult = producer.send(rocketMsg, timeout);
                            long costTime = System.currentTimeMillis() - now;
                            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                            return sendResult;
                        } catch (Exception e) {
                            log.error("syncSend failed. destination:{}, message:{} ", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                
                    /**
                     * syncSend batch messages in a given timeout.
                     *
                     * @param destination formats: `topicName:tags`
                     * @param messages    Collection of {@link org.springframework.messaging.Message}
                     * @param timeout     send timeout with millis
                     * @return {@link SendResult}
                     */
                    public SendResult syncSend(String destination, Collection<Message<?>> messages, long timeout) {
                        if (Objects.isNull(messages) || messages.size() == 0) {
                            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
                            throw new IllegalArgumentException("`messages` can not be empty");
                        }
                
                        try {
                            long now = System.currentTimeMillis();
                            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
                            org.apache.rocketmq.common.message.Message rocketMsg;
                            for (Message<?> msg:messages) {
                                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
                                    log.warn("Found a message empty in the batch, skip it");
                                    continue;
                                }
                                rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);
                                rmqMsgs.add(rocketMsg);
                            }
                
                            SendResult sendResult = producer.send(rmqMsgs, timeout);
                            long costTime = System.currentTimeMillis() - now;
                            log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                            return sendResult;
                        } catch (Exception e) {
                            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • syncSend方法支持單個及多個org.springframework.messaging.Message,其中單個Message的接口支持delayLevel
                syncSendOrderly

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
                     *
                     * @param destination formats: `topicName:tags`
                     * @param message     {@link org.springframework.messaging.Message}
                     * @param hashKey     use this key to select queue. for example: orderId, productId ...
                     * @param timeout     send timeout with millis
                     * @return {@link SendResult}
                     */
                    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            long now = System.currentTimeMillis();
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
                            long costTime = System.currentTimeMillis() - now;
                            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
                            return sendResult;
                        } catch (Exception e) {
                            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • syncSendOrderly方法內部調用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout)方法,同步返回SendResult
                asyncSend

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.
                     *
                     * @param destination  formats: `topicName:tags`
                     * @param message      {@link org.springframework.messaging.Message}
                     * @param sendCallback {@link SendCallback}
                     * @param timeout      send timeout with millis
                     * @param delayLevel   level for the delay message
                     */
                    public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("asyncSend failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            if (delayLevel > 0) {
                                rocketMsg.setDelayTimeLevel(delayLevel);
                            }
                            producer.send(rocketMsg, sendCallback, timeout);
                        } catch (Exception e) {
                            log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • asyncSend方法需要傳入SendCallback,內部執行的是producer.send(rocketMsg, sendCallback, timeout)
                asyncSendOrderly

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
                     * addition.
                     *
                     * @param destination  formats: `topicName:tags`
                     * @param message      {@link org.springframework.messaging.Message}
                     * @param hashKey      use this key to select queue. for example: orderId, productId ...
                     * @param sendCallback {@link SendCallback}
                     * @param timeout      send timeout with millis
                     */
                    public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
                                                 long timeout) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
                        } catch (Exception e) {
                            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • asyncSendOrderly方法內部執行的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout)
                sendOneWay

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Similar to <a >UDP</a>, this method won't wait for
                     * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.
                     * <p>
                     * One-way transmission is used for cases requiring moderate reliability, such as log collection.
                     *
                     * @param destination formats: `topicName:tags`
                     * @param message     {@link org.springframework.messaging.Message}
                     */
                    public void sendOneWay(String destination, Message<?> message) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("sendOneWay failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            producer.sendOneway(rocketMsg);
                        } catch (Exception e) {
                            log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • sendOneWay方法內部執行的是producer.sendOneway(rocketMsg)
                sendOneWayOrderly

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
                     *
                     * @param destination formats: `topicName:tags`
                     * @param message     {@link org.springframework.messaging.Message}
                     * @param hashKey     use this key to select queue. for example: orderId, productId ...
                     */
                    public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
                        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
                            log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);
                            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
                        }
                
                        try {
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
                        } catch (Exception e) {
                            log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
                            throw new MessagingException(e.getMessage(), e);
                        }
                    }
                復制
                • sendOneWayOrderly方法內部執行的是producer.sendOneway(rocketMsg, messageQueueSelector, hashKey)
                sendMessageInTransaction

                rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

                /**
                     * Send Spring Message in Transaction
                     *
                     * @param txProducerGroup the validate txProducerGroup name, set null if using the default name
                     * @param destination     destination formats: `topicName:tags`
                     * @param message         message {@link org.springframework.messaging.Message}
                     * @param arg             ext arg
                     * @return TransactionSendResult
                     * @throws MessagingException
                     */
                    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException {
                        try {
                            TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
                            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
                                charset, destination, message);
                            return txProducer.sendMessageInTransaction(rocketMsg, arg);
                        } catch (MQClientException e) {
                            throw RocketMQUtil.convert(e);
                        }
                    }
                復制
                • sendMessageInTransaction方法內部執行的是txProducer.sendMessageInTransaction(rocketMsg, arg)
                小結
                • RocketMQTemplate繼承了spring-messaging的AbstractMessageSendingTemplate,實現了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
                • afterPropertiesSet方法執行producer.start();destroy方法執行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
                • doSend方法內部調用的是syncSend方法,返回的sendResult僅僅debug輸出;doConvert方法針對String類型的payload不做處理,其他類型使用objectMapper.writeValueAsString轉為String作為content,然后構造message,執行postProcessor.postProcessMessage,然后返回
                doc
                關聯標簽:
                皇宫里的共享小公主

                  <form id="jrvnn"></form>

                      <address id="jrvnn"><listing id="jrvnn"></listing></address>
                      <form id="jrvnn"><nobr id="jrvnn"><meter id="jrvnn"></meter></nobr></form>

                        <sub id="jrvnn"><listing id="jrvnn"></listing></sub>