Rocketmq源码分析14:事务消息

java技术探秘

共 57311字,需浏览 115分钟

 ·

2021-05-02 15:10

注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.

rocketMq支持一类特别的消息:事务消息,本文将从源码角度分析事务消息的实现原理。

1. demo 准备

事务消息的示例位于org.apache.rocketmq.example.transaction包中,我们先来看看它的使用:

1.1 准备事务监听器:TransactionListener

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

TransactionListener是事务监听接口,它有两个方法:

  • executeLocalTransaction(...):执行事务,这里是事务的内容
  • checkLocalTransaction(...):检查事务的执行状态

1.2 事务消息的producer

接着就是事务消息的生产者了,代码如下:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 这里的 producer 类型是 TransactionMQProducer
        TransactionMQProducer producer 
            = new TransactionMQProducer("please_rename_unique_group_name");
        // 准备一线程池
        ExecutorService executorService = new ThreadPoolExecutor(25100, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            }
        );

        producer.setExecutorService(executorService);
        // 设置监听
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();

        // 发送事务消息
        String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

与普通消息的producer不同的是,事务消息的producer类型是TransactionMQProducer,并且需要设置事务监听器。

有了示例demo,接着我们就来分析这其中的流程了。

2. 启动:TransactionMQProducer#start

TransactionMQProducer的启动方法为start(),内容如下:

@Override
public void start() throws MQClientException {
    // 初始化环境
    this.defaultMQProducerImpl.initTransactionEnv();
    // 调用父类DefaultMQProducer的方法
    super.start();
}

这个方法先是调用DefaultMQProducerImpl#initTransactionEnv方法进行了一些初始化操作,然后调用父类DefaultMQProducerstart()方法进行启动操作。从这里可以看出,与普通消息的producer启动流程相比,事务消息的producer仅是多了一步初始化事务环境操作。

我们进入DefaultMQProducerImpl#initTransactionEnv方法,看看它做了什么:

public void initTransactionEnv() {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
    // 整个方法就是对 checkExecutor 进行赋值
    if (producer.getExecutorService() != null) {
        this.checkExecutor = producer.getExecutorService();
    } else {
        this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(
            producer.getCheckRequestHoldMax());
        this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.checkRequestQueue);
    }
}

从代码中可以看到,这整个方法就是对成员变量checkExecutor进行赋值操作。

3. 发送消息:TransactionMQProducer#sendMessageInTransaction(...)

发送消息的方法为 TransactionMQProducer#sendMessageInTransaction(...),代码如下:

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg)
 throws MQClientException 
{
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null"null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

继续跟进,进入 DefaultMQProducerImpl#sendMessageInTransaction 方法:

public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)

        throws MQClientException 
{
    // 获取 TransactionListener
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null"null);
    }

    // 清除延迟级别,可以看到,事务消息不支持延迟
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 设置消息属性,指定消息类型为事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, 
        this.defaultMQProducer.getProducerGroup());
    try {
        // 发送消息,按同步模式发送
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    // 处理返回值
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                // 这里省略了好多的判断
                ...

                // 发送成功,执行本地事务
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 结束远程事务,注意传入的 localTransactionState
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn(...);
    }

    // 构造返回值
    TransactionSendResult transactionSendResult = new TransactionSendResult();
    ...

    return transactionSendResult;
}

这个方法就是用来发送事务消息的方法了,这里将其中的关键点总结如下:

  1. 获取 TransactionListener,这个TransactionListener就是我们在示例demo中调用producer.setTransactionListener(...)设置的
  2. 如果延迟级别不等于0,则将其清除,这就表明事务消息不支持延迟
  3. 设置消息属性,指定消息类型为事务消息,broker在收到消息时,会对事务消息进行特别处理
  4. 发送消息,发送方式与普通消息的发送并不区别,不过需要指明的是,这里是按同步模式发送的
  5. 处理消息的发送结果,如果发送失败,则将事务状态设置为ROLLBACK_MESSAGE,表示需要回滚;发送成功则执行本地事务,也就是执行transactionListener.executeLocalTransaction(...)方法,方法返回事务状态
  6. 结束远程事务,这一步会将第5步得到的事务状态发往broker,接下来的事就由broker进行处理了

这里我们来看一眼transactionListener.executeLocalTransaction(...)方法的内容:

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }
    ...
}

这是示例demo中的内容,在executeLocalTransaction(...)方法中可以返回事务的执行状态,这个状态非常重要,因为这个状态之后会发往brokerbroker会根据这个状态来判断是要提交还是回滚消息。

我们再来看看结束事务的方法DefaultMQProducerImpl#endTransaction

public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException)
 throws RemotingException, 
        MQBrokerException, InterruptedException, UnknownHostException 
{
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // 找到一个broker地址
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(
        sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    // 设置消息头,根据消息状态设置 提交/回滚 标识
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + 
        localException.toString()) : null;
    // 发送方式为 oneway
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, 
        remark, this.defaultMQProducer.getSendMsgTimeout());
}

这个方法就是向broker发送结束事务操作了,代码中关键之处有两点:

  1. 根据localTransactionState状态来设置事务提交/回滚的标识,localTransactionState的值来源于事务消息的发送结果,或本地事务的执行结果
  2. 消息的发送方式为oneway,这表明rocketMq并不关心该消息的返回值,为何不关心呢?因为事务消息还有个broker反查机制,即broker定时向producer发送消息反查事务的状态,这点本文后面会分析。

到这里,producer就处理完事务消息的发送流程了,接下来我们来看看broker是如何处理事务相关消息的。

4. broker 处理事务消息

在上一节的TransactionMQProducer#sendMessageInTransaction(...)方法中,一共向broker发送了两条消息,这里我们来分析这两条消息所做的内容。

4.1 处理事务消息:SendMessageProcessor#asyncSendMessage

producerbroker发送事务消息后,处理流程同普通消息的处理流程一致,本文仅关注两者不同之处,在SendMessageProcessor#asyncSendMessage方法中,会区分普通消息与事务消息:

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, 
        RemotingCommand request,
        SendMessageContext mqtraceContext,
        SendMessageRequestHeader requestHeader)
 
{
    // 如果是事务消息
    if (transFlag != null && Boolean.parseBoolean(transFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        }
        // 处理事务消息
        putMessageResult = this.brokerController.getTransactionalMessageService()
            .asyncPrepareMessage(msgInner);
    } else {
        // 发送普通消息
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, 
        responseHeader, mqtraceContext, ctx, queueIdInt);
}

继续跟进TransactionalMessageBridge#asyncPutHalfMessage

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(
        MessageExtBrokerInner messageInner)
 
{
    // parseHalfMessageInner(...):消息转换
    // asyncPutMessage(...):消息存储,就是保存到commitLog中
    return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

这个方法中有两个操作:

  • parseHalfMessageInner(...):消息转换,这个方法会将事务消息暂存到事务消息的专属队列中
  • asyncPutMessage(...):消息存储,就是保存到commitLog中,这点与普通消息并无差别

由于事务消息存储与普通消息的存储并无差别,因此这里,我们主要来看看事务消息的转换过程,进入TransactionalMessageBridge#parseHalfMessageInner方法:

/**
 * 构建消息内容
 * @param msgInner
 * @return
 */

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 保存原始的topic与queueId
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    // 指定新的 topic 与 queue,其实就是暂存到事务相关的queue中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), 
        MessageSysFlag.TRANSACTION_NOT_TYPE));
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder
        .messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

这一步的操作很直观,就是将消息的topicqueueId保存下来,然后换成事务专用的topicqueueId,然后存储到commitLog中,由些,事务消息的发送也就结束了。

4.2 处理结束事务的消息:EndTransactionProcessor#processRequest

结束事务消息的codeEND_TRANSACTION,处理该code的方法为 EndTransactionProcessor#processRequest

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException 
{
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request
        .decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.debug("Transaction request:{}", requestHeader);
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }

    ...

    OperationResult result = new OperationResult();
    // 事务提交操作
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        // 从commitLog中获取消息
        result = this.brokerController.getTransactionalMessageService()
            .commitMessage(requestHeader);
        // 如果返回成功
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 获取消息,在这里方法里会处理消息转换操作,即拿到真正要发送的topic与queue
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                // 省略一些setXxx(...)方法
                ...
                // 真正的投递操作
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                // 投递完成,删除消息,当然不是真正地从磁盘上删除,只是将消息标记为删除
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService()
                        .deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    // 事务回滚操作
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        // 获取消息
        result = this.brokerController.getTransactionalMessageService()
            .rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 没有投递操作,直接删除,不是真正地从磁盘上删除,只是将消息标记为删除
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(
                    result.getPrepareMessage());
            }
            return res;
        }
    }
    // 并没有处理 UNKNOW 的操作
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

在这个方法里,会分别处理事务的提交与回滚操作,

  • 在事务的提交处理中,可以看到此时事务才真正地投递出去,投递出去后,会把原本的事务消息标记为删除;
  • 在事务的回滚操作中,直接就把原本的事务消息标识为删除了

我们来看看事务消息的真正投递过程,进入EndTransactionProcessor#sendFinalMessage方法:

private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 投递操作
    final PutMessageResult putMessageResult 
        = this.brokerController.getMessageStore().putMessage(msgInner);
    if (putMessageResult != null) {
        switch (putMessageResult.getPutMessageStatus()) {
            // 省略结果的处理
            ...
        }
    }
}

这里的brokerController.getMessageStore().putMessage(...)操作,就是把消息再一次写入到commitLog,不过此时的topicqueueId就是最初的了,接下来consumer就能对其进行消费了。

5. 事务的反查机制

前面我们分析了broker是如何处理事务消息的COMMIT_MESSAGEROLLBACK_MESSAGE状态,实际上,事务消息除了以上两种状态外,还有第三种状态:UNKNOW,从EndTransactionProcessor#processRequest方法来看,broker并没有处理这种状态!

当出现UNKNOW状态时,rocketMq该怎么办呢?实际上,EndTransactionProcessor#processRequest没有处理UNKNOW状态,这就表明UNKNOW状态的事务消息既不会执行提交操作,也不会提交回滚操作,它会由一个单独的线程来进行操作,这个线程就是事务消息的检查线程。

5.1 检查线程的启动

broker的启动流程中,BrokerController#start会执行这样一个方法:

public void start() throws Exception {
    ...
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        // 启动一些处理器
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        this.registerBrokerAll(truefalsetrue);
    }
    ...
}

/**
 * 在这里会启动 事务消息的检查线程
 */

private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}

在这个方法里会TransactionalMessageCheckServicestart()方法,我们先来看看这个操作做了什么,然后就来到了ServiceThread#start方法:

public abstract class ServiceThread implements Runnable {
    ...
    public void start() {
        log.info(...);
        if (!started.compareAndSet(falsetrue)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
    ...
}

可以看到,TransactionalMessageCheckServicestart()方法来自于ServiceThread,在ServiceThreadstart()方法中,会启动一个线程来处理操作。这里我们直接进入TransactionalMessageCheckService#run方法看看这个线程做了什么:

@Override
public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig()
        .getTransactionCheckInterval();
    while (!this.isStopped()) {
        // 运行操作
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

跟进ServiceThread#waitForRunning方法:

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(truefalse)) {
        // 执行操作
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

@Override
protected void onWaitEnd() {
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 检查操作
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, 
        this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}"
        System.currentTimeMillis() - begin);
}

最终会执行到TransactionalMessageServiceImpl#check方法,这个方法就是用来处理事务消息的检查操作的:

public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener)
 
{
    try {
        // 事务消息的队列名
        String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
        // 获取要检查的消息队列
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.debug("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            // 省略了好多的内容
            ...

            // 从队列上获取事务消息
            GetResult getResult = getHalfMsg(messageQueue, i);
            
            // 省略了好多的内容
            ...

            // 检查事务状态
            listener.resolveHalfMsg(msgExt);

            // 依然是省略了好多的内容
            ...
                    
        }
    } catch (Throwable e) {
        log.error("Check error", e);
    }
}

这个方法中省略了大量代码,关键操作就两个:

  1. 从事务消息的topic上获取消息
  2. 检查消息的事务状态

5.2 broker发送检查消息

这里直接来看检查事务状态的操作,进入Broker2Client#checkProducerTransactionState方法:

 public void checkProducerTransactionState(
    final String group,
    final Channel channel,
    final CheckTransactionStateRequestHeader requestHeader,
    final MessageExt messageExt)
 throws Exception 
{
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
    request.setBody(MessageDecoder.encode(messageExt, false));
    try {
        // 发送检测消息到producer,code 为 CHECK_TRANSACTION_STATE
        this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    } catch (Exception e) {
        log.error(...);
    }
}

从上面的代码来看,检查消息的codeCHECK_TRANSACTION_STATE,请求方式为Oneway这表明broker并不关心该消息的返回结果。

producer收到broker发送过来的检查消息后,又会怎么处理呢?下一小节我们再揭晓。

5.3 producer处理检查消息

从上一小节的分析可知,broker发送的检查消息的codeCHECK_TRANSACTION_STATEproducer处理该code的方法为ClientRemotingProcessor#processRequest

public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request)
 throws RemotingCommandException 
{
    switch (request.getCode()) {
        // 检查事务状态
        case RequestCode.CHECK_TRANSACTION_STATE:
            return this.checkTransactionState(ctx, request);
        // 省略其他
        ...
        default:
            break;
    }
    return null;
}

我们跟进ClientRemotingProcessor#checkTransactionState方法:

public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request)
 throws RemotingCommandException 
{
    final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) 
        request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if (messageExt != null) {
        if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
            messageExt.setTopic(NamespaceUtil.withoutNamespace(messageExt.getTopic(), 
                this.mqClientFactory.getClientConfig().getNamespace()));
        }
        String transactionId = messageExt
            .getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null != transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if (group != null) {
            // 获得一个producer
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            if (producer != null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                // 检查状态
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group);
            }
        } else {
            log.warn("checkTransactionState, pick producer group failed");
        }
    } else {
        log.warn("checkTransactionState, decode message failed");
    }

    return null;
}

这个方法虽然有点长,但主要操作就两个:

  1. 获得一个producerthis.mqClientFactory.selectProducer(group)
  2. 检查事务状态:producer.checkTransactionState(...)

这里我们直接看检查事务状态的操作,进入DefaultMQProducerImpl#checkTransactionState方法:

public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header)
 
{

    Runnable request = new Runnable() {
        // 省略了一堆的内容
        ...
    };

    this.checkExecutor.submit(request);
}

DefaultMQProducerImpl#checkTransactionState方法中,先是创建了一个Runnable对象,然后将该对象提交到checkExecutor线程池中,在本文的一开始,我们在分析TransactionMQProducer的启动流程中就提到过,它的赋值在DefaultMQProducerImpl#initTransactionEnv方法,现在看到了它的使用。

根据线程池的运行流程,它运行的内容主要就是Runnablerun()方法了,它的run()方法内容如下:

Runnable request = new Runnable() {
    private final String brokerAddr = addr;
    private final MessageExt message = msg;
    private final CheckTransactionStateRequestHeader checkRequestHeader = header;
    private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

    @Override
    public void run() {
        // 获得 checkListener
        TransactionCheckListener transactionCheckListener 
            = DefaultMQProducerImpl.this.checkListener();
        // 1. 获取 listener
        TransactionListener transactionListener = getCheckListener();
        if (transactionCheckListener != null || transactionListener != null) {
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable exception = null;
            try {
                if (transactionCheckListener != null) {
                    localTransactionState = transactionCheckListener
                        .checkLocalTransactionState(message);
                } else if (transactionListener != null) {
                    log.debug("Used new check API in transaction message");
                    // 2. 检查事务状态
                    localTransactionState = transactionListener.checkLocalTransaction(message);
                } else {
                    log.warn(...);
                }
            } catch (Throwable e) {
                log.error(...);
                exception = e;
            }
            // 处理事务状态
            this.processTransactionState(localTransactionState, group, exception);
        } else {
            log.warn(...);
        }
    }

    private void processTransactionState(
        final LocalTransactionState localTransactionState,
        final String producerGroup,
        final Throwable exception)
 
{
        final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
        thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
        thisHeader.setProducerGroup(producerGroup);
        thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
        // 设置检查标记
        thisHeader.setFromTransactionCheck(true);

        String uniqueKey = message.getProperties()
            .get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (uniqueKey == null) {
            uniqueKey = message.getMsgId();
        }
        thisHeader.setMsgId(uniqueKey);
        thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
        // 3. 处理事务 提交/回滚 状态
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                // 设置提交状态为:提交
                thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                // 设置提交状态为:回滚
                thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                log.warn(...);
                break;
            case UNKNOW:
                thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                log.warn(...);
                break;
            default:
                break;
        }

        String remark = null;
        if (exception != null) {
            remark = "checkLocalTransactionState Exception: " 
                + RemotingHelper.exceptionSimpleDesc(exception);
        }

        try {
            // 4. 发送消息,结束事务
            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl()
                .endTransactionOneway(brokerAddr, thisHeader, remark, 3000);
        } catch (Exception e) {
            log.error("endTransactionOneway exception", e);
        }
    }
};

这部分的代码有点长,不过关键部分就4点:

  1. 获取 listener,这就是我们一开始在demo里设置的TransactionListenerImpl
  2. 检查事务状态,这里就是运行我们自己指定的方法:TransactionListenerImpl#checkLocalTransaction
  3. 处理事务 提交/回滚 状态,这一步就是根据TransactionListenerImpl#checkLocalTransaction方法的执行结果,来设置 提交/回滚 状态
  4. broker发送结束事务的消息,这个同前面DefaultMQProducerImpl#sendMessageInTransaction 方法中的操作是一致的

6. 总结

分析完事务消息的流程后,我们来对整个流程做个总结:

这是官网提供的一张图,流程如下:

  1. producer 发送一条“半消息”,broker收到后,返回“ok”,进入第2步
  2. 执行本地事务,得到执行结果,成功则进行第3步,失败则进行第4步
  3. 本地事务执行成功,发送“commit”消息到broker,此时第1步发送的“半消息”才真正投递出去
  4. 本地事务执行失败,发送“rollback”消息到broker,此第1步发送的“半消息”就取消了,再也不会进行发送了

正常情况下,以上4步就满足事务消息的流程了,但实际中可能会异常情况:第3步或第4步发送失败了,导致broker中的半消息迟迟收不到回滚或提交的通知,此时就会用到回查机制:

  1. broker迟迟收不到回滚或提交的通知,发送一条单向消息给producer,通知producer反查本地的事务执行结果
  2. producer收到broker的消息后,调用回查方法,检查本地事务状态
  3. producer得到本地事务的状态,再发一条单向消息告知broker此前的"半消息"是提交还是回滚

限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 14
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报