分布式事务中使用RocketMQ的事务消息机制优化事务的处理逻辑

java1234

共 20438字,需浏览 41分钟

 ·

2021-06-03 22:34

点击上方蓝色字体,选择“标星公众号”

优质文章,第一时间送达

1、事务消费介绍

我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了。

上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证?!在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证?!等等,相信大家或多或多少都能碰到相似情景。

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功。

如果是单机系统(数据库实例也在同一个系统上)的话,我们可以用本地事务轻松解决:

还是以支付宝转账余额宝为例(比如转账10000块钱),假设有

支付宝账户表:A(id,userId,amount)
余额宝账户表:B(id,userId,amount)
用户的userId=1;

从支付宝转账1万块钱到余额宝的动作分为两步:

1)支付宝表扣除1万:update A set amount=amount-10000 where userId=1;
2)余额宝表增加1万:update B set amount=amount+10000 where userId=1;

如何确保支付宝余额宝收支平衡呢?

有人说这个很简单嘛,可以用事务解决。

Begin transaction 
  update A set amount=amount-10000 where userId=1;
 update B set amount=amount+10000 where userId=1;
End transaction 
commit;

这样确实能解决,如果你使用spring的话一个注解就能搞定上述事务功能。

@Transactional(rollbackFor=Exception.class) 
public void update() { 
      //更新A表 
      updateATable();
      //更新B表
      updateBTable();
}

如果系统规模较小,数据表都在一个数据库实例上,上述本地事务方式可以很好地运行,但是如果系统规模较大,比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。

下面我们来看看比较主流的两种方案:

2、分布式事务—————— 两阶段提交协议

两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器TC和若干事务执行者两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。

我们根据上面的图来看看主要流程:

1) 我们的应用程序(client)发起一个开始请求到TC(transaction);

2) TC先将prepare消息写到本地日志,之后向所有的Si发起prepare消息。以支付宝转账到余额宝为例,TC给A的prepare消息是通知支付宝数据库相应账目扣款1万,TC给B的prepare消息是通知余额宝数据库相应账目增加1w。为什么在执行任务前需要先写本地日志,主要是为了故障后恢复用,本地日志起到现实生活中凭证的效果,如果没有本地日志(凭证),出问题容易死无对证;

3) Si收到prepare消息后,执行具体本机事务,但不会进行commit,如果成功返回yes,不成功返回no。同理,返回前都应把要返回的消息写到日志里,当作凭证。

4) TC收集所有执行器返回的消息,如果所有执行器都返回yes,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。

注:TC或Si把发送或接收到的消息先写到日志里,主要是为了故障后恢复用。如某一Si从故障中恢复后,先检查本机的日志,如果已收到commit,则提交,如果abort则回滚。如果是yes,则再向TC询问一下,确定下一步。如果什么都没有,则很可能在prepare阶段Si就崩溃了,因此需要回滚。

现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/),来快速实现。)

不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?

1)两阶段提交涉及多次节点间的网络通信,通信时间太长!
2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!

正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。

3、使用消息队列来避免分布式事务

如果仔细观察生活的话,生活的很多场景已经给了我们提示。

比如在北京很有名的姚记炒肝点了炒肝并付了钱后,他们并不会直接把你点的炒肝给你,而是给你一张小票,然后让你拿着小票到出货区排队去取。为什么他们要将付钱和取货两个动作分开呢?原因很多,其中一个很重要的原因是为了使他们接待能力增强(并发量更高)。

还是回到我们的问题,只要这张小票在,你最终是能拿到炒肝的。同理转账服务也是如此,当支付宝账户扣除1万后,我们只要生成一个凭证(消息)即可,这个凭证(消息)上写着“让余额宝账户增加1万”,只要这个凭证(消息)能可靠保存,我们最终是可以拿着这个凭证(消息)让余额宝账户增加1万的,即我们能依靠这个凭证(消息)完成最终一致性。

那么我们如何可靠保存凭证(消息)有两种方法:

1)业务与消息耦合的方式

支付宝在完成扣款的同时,同时记录消息数据,这个消息数据与业务数据保存在同一数据库实例里(消息记录表表名为message)。

Begin transaction 
       update A set amount=amount-10000 where userId=1; 
       insert into message(userId, amount,status) values(1, 10000, 1); 
End transaction 
commit;

上述事务能保证只要支付宝账户里被扣了钱,消息一定能保存下来。

当上述事务提交成功后,我们通过实时消息服务将此消息通知余额宝,余额宝处理成功后发送回复成功消息,支付宝收到回复后删除该条消息数据。

2)业务与消息解耦方式

上述保存消息的方式使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。为了解耦,可以采用以下方式。

a)支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送,只有消息发送成功后才会提交事务;

b)当支付宝扣款事务被提交成功后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送该消息;

c)当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。在得到取消发送指令后,该消息将不会被发送;

d)对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态并进行更新。为什么需要这一步骤,举个例子:假设在第2步支付宝扣款事务被成功提交后,系统挂了,此时消息状态并未被更新为“确认发送”,从而导致消息不能被发送。

优点:消息数据独立存储,降低业务系统与消息系统间的耦合;
缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。

4、那么如何解决消息重复投递的问题?

还有一个很严重的问题就是消息重复投递,以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了(上面讲顺序消费是讲过,这里再提一下)。

为什么相同的消息会被重复投递?比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg。

解决方法很简单,在余额宝这边增加消息应用状态表(message_apply),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务) 。

For each msg in queue 
 Begin transaction 
   select count(*) as cnt from message_apply where msg_id=msg.msg_id; 
   if cnt==0 then 
  update B set amount=amount+10000 where userId=1; 
  insert into message_apply(msg_id) values(msg.msg_id); 
   end if
 End transaction 
 commit;
End For

为了方便大家理解,我们再来举一个银行转账的示例(和上一个例子差不多):

比如,Bob向Smith转账100块。

在单机环境下,执行事务的情况,大概是下面这个样子:

当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:

这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那如何来规避这个问题?

5、大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:

图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

首先看下先发送消息的情况,大致的示意图如下:

存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。

先发消息不行,那就先扣款吧,大致的示意图如下:

存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。

RocketMQ支持事务消息,下面来看看RocketMQ是怎样来实现的?

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?

RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

6、Rocket事务流程处理分析

那我们来看下RocketMQ源码,是如何处理事务消息的。

客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer)

// =============================发送事务消息的一系列准备工作========================================
// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();


接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

//  ================================事务消息的发送过程=============================================
public TransactionSendResult sendMessageInTransaction(.....) {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}
 

endTransaction方法会将请求发往broker(mq server)去更新事务消息的最终状态:

  • 根据sendResult找到Prepared消息 ,sendResult包含事务消息的ID

  • 根据localTransaction更新消息的最终状态

如果endTransaction方法执行失败,数据没有发送到broker,导致事务消息的 状态更新失败,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOneway让broker来更新消息的最终状态。

再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题,解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

消费事务消息
这样基本上可以解决消费端超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

我们需要注意的是,在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息。也就是说,消息失败不会进行检查。

7、交易事务处理示例

下面我们来看一个简单的例子:

消息生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
        //设置用于事务消息的处理线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService);
        //设置事务监听器,监听器实现接口org.apache.rocketmq.client.producer.TransactionListener
        //监听器中实现需要处理的交易业务逻辑的处理,以及MQ Broker中未确认的事务与业务的确认逻辑
        producer.setTransactionListener(transactionListener);
        producer.start();
 
        //生成不同的Tag,用于模拟不同的处理场景
        String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
        for (int i = 0; i < 10; i++) {
            try {
             //组装产生消息
                Message msg =
                    new Message("TopicTransaction", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //以事务发送消息,并在事务消息被成功预写入到RocketMQ中后,执行用户定义的交易逻辑,
                //交易逻辑执行成功后,再实现实现业务消息的提交逻辑
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                System.out.printf("%s%n", sendResult.getTransactionId());
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
 
        producer.shutdown();
    }
}


业务实现类TransactionListenerImpl:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
 
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
 
    /**
     * 该方法会在消息成功预写入RocketMQ后被执行
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        System.out.println("开始处理业务逻辑...");
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        switch (status) {
         case 0:
          //LocalTransactionState.UNKNOW表示未知的事件,需要RocketMQ进一步服务业务进行确认该交易的处理
          //结果,确认消息被调用的方法为下方的checkLocalTransaction。
          //注:RocketMQ与业务确认消息的执行状态的功能已经被移除了,在早期3.0.8的版本中有该功能,因而如果
          //返回的状态为UNKNOW,则该消息不会被提交
          return LocalTransactionState.UNKNOW;
         case 1:
          return LocalTransactionState.COMMIT_MESSAGE;
         case 2:
          return LocalTransactionState.ROLLBACK_MESSAGE;
         default:
          return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
 
    /**
     * 该方法用于RocketMQ与业务确认未提交事务的消息的状态,不过该方法已经的实现在RocketMQ中已经
     * 被删除了,因而其功能也就没有意义了。
     * 不过如果使用阿里云的企业的RocketMQ服务,该功能会起作用。
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        int mod = msg.getTransactionId().hashCode() % 2;
        if (null != status) {
            switch (mod) {
                case 0:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


消息消费者:

public class Consumer {
 public static void main(String[] args) throws MQClientException {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer");
  consumer.setNamesrvAddr("127.0.0.1:9876");
  /**
   * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
   * 如果非第一次启动,那么按照上次消费的位置继续消费
   */
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  consumer.subscribe("TopicTransaction""*");
  consumer.registerMessageListener(new MessageListenerOrderly() {
   private Random random = new Random();
 
   @Override
   public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    // 设置自动提交
    context.setAutoCommit(true);
    for (MessageExt msg : msgs) {
     System.out.println("获取到消息开始消费:"+msg + " , content : " + new String(msg.getBody()));
    }
    try {
     // 模拟业务处理
     TimeUnit.SECONDS.sleep(random.nextInt(5));
    } catch (Exception e) {
     e.printStackTrace();
     //返回处理失败,该消息后续可以继续被消费
     return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }
    //返回处理成功,该消息就不会再次投递过来了
    return ConsumeOrderlyStatus.SUCCESS;
   }
  });
  consumer.start();
  System.out.println("consumer start ! ");
 }
}


我们先启动消费端,然后启动生产端:

在运行之前,我们先来看一下,web控制台的消息:

Topic transaction_producer中没有未消费的消息,下面开始执行代码逻辑。

生产端:

通过日志可以看到其发送了四条消息,交易逻辑被调用了4次,其中只有一条消息反馈的结果为COMMIT_MESSAGE。

再次查看控制台:

可以看到其Topic transaction_producer只有一条待消费的消息,这个和发送端只一条消息被COMMIT的结论相符合。

消费端:

启动消费端,在控制台只看到消费了一条消息。

生产者总共生产了四条消息,原因如下:

 

 

这就是为什么我们生产了四条消息,最后却只消费了一条,再次确认业务实现类TransactionListenerImpl中的方法checkLocalTransaction被没有调用。

8、如何保证扣钱与加钱的事务的最终一致性

在上面的转账交易逻辑中,存在两个问题:

1)如果没有使用RocketMQ的企业版本,那就可能会发生扣钱的事务成功了,但是扣钱的消息由于生产方发生了故障,导致交易消息没有在扣钱的事务提交成功后往RocketMQ中确认该条消息可以被提交,就会导致该条消息不会递交给消费方,导致Bob的钱被扣了,但是Smith的钱却没有增加。

2)生产方的全部逻辑都处理完成了,扣钱的事务在数据库中被成功的提交,扣钱的消息在RocketMQ被成功的确认,但是消费方在消费消息的时候,自己本身发生了故障,或者处理该条消息发生了逻辑错误,导致Smith的钱没有被正确的加上。

以上两个问题虽然发生的机率都很低,但是只要存在着发生的机率就会一定在某个时间点发生,只是故障发生时间点的早晚问题。在金融系统中,每日都会跑系统日志执行对账操作,用于核对当日总共的支付与收入是否是平衡的、每个单笔交易结果是否都满足借贷平衡等,因而为了避免以上两个问题的发生,我的处理方式还是引入金融系统对账的业务逻辑来进行处理。

其业务处理逻辑如下:

在发送交易事务消息过后,发送一个交易对账消息到对账Topic中,该对账消息为非事务消息,发送成功即表示成功保存到了RocketMQ中,该交易对账消息不会用于消费者消费,后续的交易对账系统会消费该队列中的对账信息,其分别会和交易的生产方和消费方进行交易核对,核对逻辑如下:

交易对账系统首先和交易的消费方进行核对,如果消费方消费成功,则可以说明整个交易结果满足最终一致性,因为消息是生产者成功处理后,然后再发送的交易确认消息,因而只要产生了事务确认的交易消息,则可以肯定生产方已经正常执行完了扣款的逻辑。

只有交易消息在消费方处理失败或者消息方没有消费该消息的情况下,才需要再次和生产方进行确认,如果生产方成功执行了扣款操作,则需要回滚这笔扣款交易;如果没有扣除成功,则表示两边都没有消费这边交易,就不用做任何操作了。


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:

https://blog.csdn.net/fenglibing/article/details/92417739








浏览 22
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报