Rocketmq源码分析14:事务消息
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
rocketMq
支持一类特别的消息:事务消息,本文将从源码角度分析事务消息的实现原理。
1. demo 准备
事务消息的示例位于org.apache.rocketmq.example.transaction
包中,我们先来看看它的使用:
1.1 准备事务监听器:TransactionListener
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
TransactionListener
是事务监听接口,它有两个方法:
executeLocalTransaction(...)
:执行事务,这里是事务的内容checkLocalTransaction(...)
:检查事务的执行状态
1.2 事务消息的producer
接着就是事务消息的生产者了,代码如下:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 这里的 producer 类型是 TransactionMQProducer
TransactionMQProducer producer
= new TransactionMQProducer("please_rename_unique_group_name");
// 准备一线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}
);
producer.setExecutorService(executorService);
// 设置监听
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
// 发送事务消息
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
与普通消息的producer
不同的是,事务消息的producer
类型是TransactionMQProducer
,并且需要设置事务监听器。
有了示例demo,接着我们就来分析这其中的流程了。
2. 启动:TransactionMQProducer#start
TransactionMQProducer
的启动方法为start()
,内容如下:
@Override
public void start() throws MQClientException {
// 初始化环境
this.defaultMQProducerImpl.initTransactionEnv();
// 调用父类DefaultMQProducer的方法
super.start();
}
这个方法先是调用DefaultMQProducerImpl#initTransactionEnv
方法进行了一些初始化操作,然后调用父类DefaultMQProducer
的start()
方法进行启动操作。从这里可以看出,与普通消息的producer
启动流程相比,事务消息的producer
仅是多了一步初始化事务环境操作。
我们进入DefaultMQProducerImpl#initTransactionEnv
方法,看看它做了什么:
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
// 整个方法就是对 checkExecutor 进行赋值
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(
producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
从代码中可以看到,这整个方法就是对成员变量checkExecutor
进行赋值操作。
3. 发送消息:TransactionMQProducer#sendMessageInTransaction(...)
发送消息的方法为 TransactionMQProducer#sendMessageInTransaction(...)
,代码如下:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
继续跟进,进入 DefaultMQProducerImpl#sendMessageInTransaction
方法:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 获取 TransactionListener
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 清除延迟级别,可以看到,事务消息不支持延迟
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 设置消息属性,指定消息类型为事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
try {
// 发送消息,按同步模式发送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 处理返回值
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
// 这里省略了好多的判断
...
// 发送成功,执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 结束远程事务,注意传入的 localTransactionState
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn(...);
}
// 构造返回值
TransactionSendResult transactionSendResult = new TransactionSendResult();
...
return transactionSendResult;
}
这个方法就是用来发送事务消息的方法了,这里将其中的关键点总结如下:
获取 TransactionListener
,这个TransactionListener
就是我们在示例demo
中调用producer.setTransactionListener(...)
设置的如果延迟级别不等于0,则将其清除,这就表明事务消息不支持延迟 设置消息属性,指定消息类型为事务消息, broker
在收到消息时,会对事务消息进行特别处理发送消息,发送方式与普通消息的发送并不区别,不过需要指明的是,这里是按同步模式发送的 处理消息的发送结果,如果发送失败,则将事务状态设置为 ROLLBACK_MESSAGE
,表示需要回滚;发送成功则执行本地事务,也就是执行transactionListener.executeLocalTransaction(...)
方法,方法返回事务状态结束远程事务,这一步会将第5步得到的事务状态发往 broker
,接下来的事就由broker
进行处理了
这里我们来看一眼transactionListener.executeLocalTransaction(...)
方法的内容:
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
...
}
这是示例demo中的内容,在executeLocalTransaction(...)
方法中可以返回事务的执行状态,这个状态非常重要,因为这个状态之后会发往broker
,broker
会根据这个状态来判断是要提交还是回滚消息。
我们再来看看结束事务的方法DefaultMQProducerImpl#endTransaction
:
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException,
MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 找到一个broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(
sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 设置消息头,根据消息状态设置 提交/回滚 标识
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " +
localException.toString()) : null;
// 发送方式为 oneway
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader,
remark, this.defaultMQProducer.getSendMsgTimeout());
}
这个方法就是向broker
发送结束事务操作了,代码中关键之处有两点:
根据 localTransactionState
状态来设置事务提交/回滚
的标识,localTransactionState
的值来源于事务消息的发送结果,或本地事务的执行结果消息的发送方式为 oneway
,这表明rocketMq
并不关心该消息的返回值,为何不关心呢?因为事务消息还有个broker
反查机制,即broker
定时向producer
发送消息反查事务的状态,这点本文后面会分析。
到这里,producer
就处理完事务消息的发送流程了,接下来我们来看看broker
是如何处理事务相关消息的。
4. broker
处理事务消息
在上一节的TransactionMQProducer#sendMessageInTransaction(...)
方法中,一共向broker
发送了两条消息,这里我们来分析这两条消息所做的内容。
4.1 处理事务消息:SendMessageProcessor#asyncSendMessage
producer
向broker
发送事务消息后,处理流程同普通消息的处理流程一致,本文仅关注两者不同之处,在SendMessageProcessor#asyncSendMessage
方法中,会区分普通消息与事务消息:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx,
RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
// 如果是事务消息
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
// 处理事务消息
putMessageResult = this.brokerController.getTransactionalMessageService()
.asyncPrepareMessage(msgInner);
} else {
// 发送普通消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner,
responseHeader, mqtraceContext, ctx, queueIdInt);
}
继续跟进TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(
MessageExtBrokerInner messageInner) {
// parseHalfMessageInner(...):消息转换
// asyncPutMessage(...):消息存储,就是保存到commitLog中
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
这个方法中有两个操作:
parseHalfMessageInner(...)
:消息转换,这个方法会将事务消息暂存到事务消息的专属队列中asyncPutMessage(...)
:消息存储,就是保存到commitLog
中,这点与普通消息并无差别
由于事务消息存储与普通消息的存储并无差别,因此这里,我们主要来看看事务消息的转换过程,进入TransactionalMessageBridge#parseHalfMessageInner
方法:
/**
* 构建消息内容
* @param msgInner
* @return
*/
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 保存原始的topic与queueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 指定新的 topic 与 queue,其实就是暂存到事务相关的queue中
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder
.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
这一步的操作很直观,就是将消息的topic
与queueId
保存下来,然后换成事务专用的topic
与queueId
,然后存储到commitLog
中,由些,事务消息的发送也就结束了。
4.2 处理结束事务的消息:EndTransactionProcessor#processRequest
结束事务消息的code
为END_TRANSACTION
,处理该code
的方法为 EndTransactionProcessor#processRequest
:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request
.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
...
OperationResult result = new OperationResult();
// 事务提交操作
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 从commitLog中获取消息
result = this.brokerController.getTransactionalMessageService()
.commitMessage(requestHeader);
// 如果返回成功
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 获取消息,在这里方法里会处理消息转换操作,即拿到真正要发送的topic与queue
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
// 省略一些setXxx(...)方法
...
// 真正的投递操作
RemotingCommand sendResult = sendFinalMessage(msgInner);
// 投递完成,删除消息,当然不是真正地从磁盘上删除,只是将消息标记为删除
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService()
.deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 事务回滚操作
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 获取消息
result = this.brokerController.getTransactionalMessageService()
.rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 没有投递操作,直接删除,不是真正地从磁盘上删除,只是将消息标记为删除
this.brokerController.getTransactionalMessageService().deletePrepareMessage(
result.getPrepareMessage());
}
return res;
}
}
// 并没有处理 UNKNOW 的操作
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
在这个方法里,会分别处理事务的提交与回滚操作,
在事务的提交处理中,可以看到此时事务才真正地投递出去,投递出去后,会把原本的事务消息标记为删除; 在事务的回滚操作中,直接就把原本的事务消息标识为删除了
我们来看看事务消息的真正投递过程,进入EndTransactionProcessor#sendFinalMessage
方法:
private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 投递操作
final PutMessageResult putMessageResult
= this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// 省略结果的处理
...
}
}
}
这里的brokerController.getMessageStore().putMessage(...)
操作,就是把消息再一次写入到commitLog
,不过此时的topic
与queueId
就是最初的了,接下来consumer
就能对其进行消费了。
5. 事务的反查机制
前面我们分析了broker
是如何处理事务消息的COMMIT_MESSAGE
与ROLLBACK_MESSAGE
状态,实际上,事务消息除了以上两种状态外,还有第三种状态:UNKNOW
,从EndTransactionProcessor#processRequest
方法来看,broker
并没有处理这种状态!
当出现UNKNOW
状态时,rocketMq
该怎么办呢?实际上,EndTransactionProcessor#processRequest
没有处理UNKNOW
状态,这就表明UNKNOW
状态的事务消息既不会执行提交操作,也不会提交回滚操作,它会由一个单独的线程来进行操作,这个线程就是事务消息的检查线程。
5.1 检查线程的启动
在broker
的启动流程中,BrokerController#start
会执行这样一个方法:
public void start() throws Exception {
...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 启动一些处理器
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
...
}
/**
* 在这里会启动 事务消息的检查线程
*/
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
在这个方法里会TransactionalMessageCheckService
的start()
方法,我们先来看看这个操作做了什么,然后就来到了ServiceThread#start
方法:
public abstract class ServiceThread implements Runnable {
...
public void start() {
log.info(...);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
...
}
可以看到,TransactionalMessageCheckService
的start()
方法来自于ServiceThread
,在ServiceThread
的start()
方法中,会启动一个线程来处理操作。这里我们直接进入TransactionalMessageCheckService#run
方法看看这个线程做了什么:
@Override
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig()
.getTransactionCheckInterval();
while (!this.isStopped()) {
// 运行操作
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
跟进ServiceThread#waitForRunning
方法:
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 执行操作
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 检查操作
this.brokerController.getTransactionalMessageService().check(timeout, checkMax,
this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}",
System.currentTimeMillis() - begin);
}
最终会执行到TransactionalMessageServiceImpl#check
方法,这个方法就是用来处理事务消息的检查操作的:
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
// 事务消息的队列名
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取要检查的消息队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
// 省略了好多的内容
...
// 从队列上获取事务消息
GetResult getResult = getHalfMsg(messageQueue, i);
// 省略了好多的内容
...
// 检查事务状态
listener.resolveHalfMsg(msgExt);
// 依然是省略了好多的内容
...
}
} catch (Throwable e) {
log.error("Check error", e);
}
}
这个方法中省略了大量代码,关键操作就两个:
从事务消息的 topic
上获取消息检查消息的事务状态
5.2 broker
发送检查消息
这里直接来看检查事务状态的操作,进入Broker2Client#checkProducerTransactionState
方法:
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
// 发送检测消息到producer,code 为 CHECK_TRANSACTION_STATE
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error(...);
}
}
从上面的代码来看,检查消息的code
为CHECK_TRANSACTION_STATE
,请求方式为Oneway
这表明broker
并不关心该消息的返回结果。
producer
收到broker
发送过来的检查消息后,又会怎么处理呢?下一小节我们再揭晓。
5.3 producer
处理检查消息
从上一小节的分析可知,broker
发送的检查消息的code
为CHECK_TRANSACTION_STATE
,producer
处理该code的方法为ClientRemotingProcessor#processRequest
:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
// 检查事务状态
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
// 省略其他
...
default:
break;
}
return null;
}
我们跟进ClientRemotingProcessor#checkTransactionState
方法:
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader)
request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(),
this.mqClientFactory.getClientConfig().getNamespace()));
}
String transactionId = messageExt
.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 获得一个producer
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 检查状态
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
这个方法虽然有点长,但主要操作就两个:
获得一个 producer
:this.mqClientFactory.selectProducer(group)
检查事务状态: producer.checkTransactionState(...)
这里我们直接看检查事务状态的操作,进入DefaultMQProducerImpl#checkTransactionState
方法:
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
// 省略了一堆的内容
...
};
this.checkExecutor.submit(request);
}
在DefaultMQProducerImpl#checkTransactionState
方法中,先是创建了一个Runnable
对象,然后将该对象提交到checkExecutor
线程池中,在本文的一开始,我们在分析TransactionMQProducer
的启动流程中就提到过,它的赋值在DefaultMQProducerImpl#initTransactionEnv
方法,现在看到了它的使用。
根据线程池的运行流程,它运行的内容主要就是Runnable
的run()
方法了,它的run()
方法内容如下:
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
// 获得 checkListener
TransactionCheckListener transactionCheckListener
= DefaultMQProducerImpl.this.checkListener();
// 1. 获取 listener
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener
.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
// 2. 检查事务状态
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn(...);
}
} catch (Throwable e) {
log.error(...);
exception = e;
}
// 处理事务状态
this.processTransactionState(localTransactionState, group, exception);
} else {
log.warn(...);
}
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
// 设置检查标记
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties()
.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
// 3. 处理事务 提交/回滚 状态
switch (localTransactionState) {
case COMMIT_MESSAGE:
// 设置提交状态为:提交
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
// 设置提交状态为:回滚
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn(...);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn(...);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: "
+ RemotingHelper.exceptionSimpleDesc(exception);
}
try {
// 4. 发送消息,结束事务
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl()
.endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
这部分的代码有点长,不过关键部分就4点:
获取 listener
,这就是我们一开始在demo
里设置的TransactionListenerImpl
检查事务状态,这里就是运行我们自己指定的方法: TransactionListenerImpl#checkLocalTransaction
处理事务 提交/回滚 状态,这一步就是根据 TransactionListenerImpl#checkLocalTransaction
方法的执行结果,来设置提交/回滚
状态向 broker
发送结束事务的消息,这个同前面DefaultMQProducerImpl#sendMessageInTransaction
方法中的操作是一致的
6. 总结
分析完事务消息的流程后,我们来对整个流程做个总结:
这是官网提供的一张图,流程如下:
producer
发送一条“半消息”,broker
收到后,返回“ok”,进入第2步执行本地事务,得到执行结果,成功则进行第3步,失败则进行第4步 本地事务执行成功,发送“commit”消息到 broker
,此时第1步发送的“半消息”才真正投递出去本地事务执行失败,发送“rollback”消息到 broker
,此第1步发送的“半消息”就取消了,再也不会进行发送了
正常情况下,以上4步就满足事务消息的流程了,但实际中可能会异常情况:第3步或第4步发送失败了,导致broker
中的半消息迟迟收不到回滚或提交的通知,此时就会用到回查机制:
broker
迟迟收不到回滚或提交的通知,发送一条单向消息给producer
,通知producer
反查本地的事务执行结果producer
收到broker
的消息后,调用回查方法,检查本地事务状态producer
得到本地事务的状态,再发一条单向消息告知broker
此前的"半消息"是提交还是回滚
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!