Spring编程式事务 + Java 多线程实现批量操作的回滚!

业余草

共 6554字,需浏览 14分钟

 ·

2022-06-19 20:04

你知道的越多,不知道的就越多,业余的像一棵小草!

你来,我们一起精进!你不来,我和你的竞争对手一起精进!

编辑:业余草

推荐:https://www.xttblog.com/?p=5347

前两天,我发了一篇推文《Java多线程+List分段完美解决导入等批量更新场景问题!》。

Java多线程批量更新

有热心好学的朋友留言到:

看到他的留言,我才知道,原来我们早已成了微信好友😊。

关于他提到的问题,今天我再整个简单的 demo,实现单事务多线程操作数据的案例。

核心问题

多线程更新或插入操作是很快,但是运行之后有网友发现,这些线程并不存在于一个事务中。他们想要一个报错,全体回滚。但是暂时又不明白该怎么实现,因此我写了本文(水文😊)。

伪代码

多线程插入伪代码:

@Transactional
public void test() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(5);
    for (int i = 0; i <5 ; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                AreaController.this.insert();//插入一条数据
                countDownLatch.countDown();
            }
        }).start();
    }
    countDownLatch.await();
    int i =1/0//主线程报错
}

无法回滚的原因

了解 Spring 事务的网友,应该知道。Spring 相关数据库连接信息都放在了 threadLocal 中。所以不同的线程享用不同的连接信息。所以前面的多线程操作不存在于一个事务中。

那么我们则么做到同步呢?

多线程单事务

要解决这个问题,就要让多个线程共用同一个事务。

这样以来,我们就不能使用 Spring 提供的注解了。我们需要自己操控事务管理。

简单的案例代码如下所示:

@Autowired
private PlatformTransactionManager transactionManager;

public void test() throws InterruptedException {
    CountDownLatch rollBackLatch = new CountDownLatch(1);
    CountDownLatch mainThreadLatch = new CountDownLatch(5);
    AtomicBoolean rollbackFlag = new AtomicBoolean(false);
    for (int i = 0; i < 5; i++) {
        int t = i;
        new Thread(() -> {
            if (rollbackFlag.get()) return//如果其他线程已经报错 就停止线程
            //设置一个事务
            DefaultTransactionDefinition def = new DefaultTransactionDefinition();
            def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
            TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
            try {
                AreaController.this.insert();//插入一条数据
                // if (t==4)throw new RuntimeException("报错");
                mainThreadLatch.countDown();
                rollBackLatch.await();//线程等待
                if (rollbackFlag.get()) {
                    transactionManager.rollback(status);
                } else {
                    transactionManager.commit(status);
                }
            } catch (Exception e) {
                e.printStackTrace();
                //如果出错了 就放开锁 让别的线程进入提交/回滚  本线程进行回滚
                rollbackFlag.set(true);
                rollBackLatch.countDown();
                mainThreadLatch.countDown();
                transactionManager.rollback(status);
            }
        }).start();
    }
    try {
//            int i = 1 / 0; //主线程执行相关业务报错
    } catch (Exception e) {
        rollbackFlag.set(true);
        rollBackLatch.countDown();
    }
    //主线程业务执行完毕 如果其他线程也执行完毕 且没有报异常 正在阻塞状态中 唤醒其他线程 提交所有的事务
    //如果其他线程或者主线程报错 则不会进入if 会触发回滚
    if (!rollbackFlag.get()){
        mainThreadLatch.await();
        rollBackLatch.countDown();
    }
}

这里面,最主要的是你要了解 Spring 声明式事务 @Transactional 的实现原理《Spring源码阅读,@Transactional实现原理》。

首先对于 Spring 的声明式事务 @Transactional 在多线程环境下明显是失效了的,原因是这些方法无法被加入到同一个事务中。

但是 Spring 作为业界的标杆,它提供了另一种控制粒度更加细的编程式事务,可以通过事务管理对象 PlatformTransactionManager 来操作事务的提交与回滚,我们可以把事务相关提交回滚的代码写在每个线程之中。

上面我们使用了一个回滚标志 rollbackFlag 来控制事务是提交还是回滚,并且使用了 countDownLatch 来进行线程阻塞的控制。只有所有的任务都正常的执行完毕了,才执行事务提交,如果其中一个线程报错了,则进行全体执行回滚。

水文,不喜轻喷!

浏览 152
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报