跳到主要内容

二十一、RocketMQ源码分析之RocketMQ事务消息实现原理上篇


根据上节Demo示例,发送事务消息的入口为:TransactionMQProducer#sendMessageInTransaction:

public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) { // @1
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); // @2
}

  代码@1:如果transactionListener为空,则直接抛出异常。
  代码@2:调用defaultMQProducerImpl的sendMessageInTransaction方法。
   DefaultMQProducerImpl#sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg) throws MQClientException {

  Step1:首先先阐述一下参数含义。final Message msg:消息;TransactionListener tranExecuter:事务监听器; Object arg:其他附加参数,该参数会再TransactionListener 回调函数中原值传入。
   DefaultMQProducerImpl#sendMessageInTransaction

SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);