炸了!Java多线程批量操作,居然有人不做事务控制
往期热门文章:
文章来源:https://c1n.cn/by3nt
目录
前言 循环操作的代码 使用手动事务的操作代码 尝试多线程进行数据修改 基于两个 CountDownLatch 控制多线程事务提交 基于 TransactionStatus 集合来控制多线程事务提交 使用 union 连接多个 select 实现批量 update 总结
前言
项目代码基于:MySQL 数据 开发框架为:SpringBoot、Mybatis 开发语言为:Java8
https://gitee.com/john273766764/springboot-mybatis-threads
循环操作的代码
/***
* 一条一条依次对50000条数据进行更新操作
* 耗时:2m27s,1m54s
*/
@Test
void updateStudent() {
ListallStudents = studentMapper.getAll();
allStudents.forEach(s -> {
//更新教师信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
}
使用手动事务的操作代码
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
/**
* 由于希望更新操作 一次性完成,需要手动控制添加事务
* 耗时:24s
* 从测试结果可以看出,添加事务后插入数据的效率有明显的提升
*/
@Test
void updateStudentWithTrans() {
ListallStudents = studentMapper.getAll();
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
allStudents.forEach(s -> {
//更新教师信息
String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
} catch (Throwable e) {
dataSourceTransactionManager.rollback(transactionStatus);
throw e;
}
}
尝试多线程进行数据修改
@Service
public class StudentServiceImpl implements StudentService {
@Autowired
private StudentMapper studentMapper;
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Override
public void updateStudents(Liststudents, CountDownLatch threadLatch) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子线程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新教师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
dataSourceTransactionManager.commit(transactionStatus);
threadLatch.countDown();
} catch (Throwable e) {
e.printStackTrace();
dataSourceTransactionManager.rollback(transactionStatus);
}
}
}
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private TransactionDefinition transactionDefinition;
@Autowired
private StudentService studentService;
/**
* 对用户而言,27s 任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度
* 预计创建10个线程,每个线程进行5000条数据修改操作
* 耗时统计
* 1 线程数:1 耗时:25s
* 2 线程数:2 耗时:14s
* 3 线程数:5 耗时:15s
* 4 线程数:10 耗时:15s
* 5 线程数:100 耗时:15s
* 6 线程数:200 耗时:15s
* 7 线程数:500 耗时:17s
* 8 线程数:1000 耗时:19s
* 8 线程数:2000 耗时:23s
* 8 线程数:5000 耗时:29s
*/
@Test
void updateStudentWithThreads() {
//查询总数据
ListallStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 100;
//每个线程处理的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创建多线程处理任务
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
// 每个线程处理的数据
ListthreadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
studentService.updateStudents(threadDatas, threadLatchs);
});
}
try {
// 倒计时锁设置超时时间 30s
threadLatchs.await(30, TimeUnit.SECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("主线程完成");
}
基于两个 CountDownLatch 控制多线程事务提交
@Override
public void updateStudentsThread(Liststudents, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
System.out.println("子线程:" + Thread.currentThread().getName());
try {
students.forEach(s -> {
// 更新教师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
} catch (Throwable e) {
taskStatus.setIsError();
} finally {
threadLatch.countDown(); // 切换到主线程执行
}
try {
mainLatch.await(); //等待主线程执行
} catch (Throwable e) {
taskStatus.setIsError();
}
// 判断是否有错误,如有错误 就回滚事务
if (taskStatus.getIsError()) {
dataSourceTransactionManager.rollback(transactionStatus);
} else {
dataSourceTransactionManager.commit(transactionStatus);
}
}
/**
* 由于每个线程都是单独的事务,需要添加对线程事务的统一控制
* 我们这边使用两个 CountDownLatch 对子线程的事务进行控制
*/
@Test
void updateStudentWithThreadsAndTrans() {
//查询总数据
ListallStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 4;
//每个线程处理的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创建多线程处理任务
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量
CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交
StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误
for (int i = 0; i < threadCount; i++) {
// 每个线程处理的数据
ListthreadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength)
.collect(Collectors.toList());
studentThreadPool.execute(() -> {
studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);
});
}
try {
// 倒计时锁设置超时时间 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
if (!await) { // 等待超时,事务回滚
taskStatus.setIsError();
}
} catch (Throwable e) {
e.printStackTrace();
taskStatus.setIsError();
}
mainLatch.countDown(); // 切换到子线程执行
studentThreadPool.shutdown(); //关闭线程池
System.out.println("主线程完成");
}
Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)
at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)
... 7 more
# 连接池中允许的最小连接数。缺省值:10
spring.datasource.hikari.minimum-idle=10
# 连接池中允许的最大连接数。缺省值:10
spring.datasource.hikari.maximum-pool-size=100
# 自动提交
spring.datasource.hikari.auto-commit=true
# 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟
spring.datasource.hikari.idle-timeout=30000
# 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒
spring.datasource.hikari.max-lifetime=1800000
# 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒
再次执行测试发现没有报错,修改线程数为 20 又执行了一下,同样执行成功了。
基于 TransactionStatus 集合来控制多线程事务提交
@Service
public class StudentsTransactionThread {
@Autowired
private StudentMapper studentMapper;
@Autowired
private StudentService studentService;
@Autowired
private PlatformTransactionManager transactionManager;
ListtransactionStatuses = Collections.synchronizedList(new ArrayList ());
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentWithThreadsAndTrans() throws InterruptedException {
//查询总数据
ListallStudents = studentMapper.getAll();
// 线程数量
final Integer threadCount = 2;
//每个线程处理的数据量
final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;
// 创建多线程处理任务
ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
CountDownLatch threadLatchs = new CountDownLatch(threadCount);
AtomicBoolean isError = new AtomicBoolean(false);
try {
for (int i = 0; i < threadCount; i++) {
// 每个线程处理的数据
ListthreadDatas = allStudents.stream()
.skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
studentThreadPool.execute(() -> {
try {
try {
studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}finally {
threadLatchs.countDown();
}
} catch (Exception e) {
e.printStackTrace();
isError.set(true);
}
});
}
// 倒计时锁设置超时时间 30s
boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
// 判断是否超时
if (!await) {
isError.set(true);
}
} catch (Throwable e) {
e.printStackTrace();
isError.set(true);
}
if (!transactionStatuses.isEmpty()) {
if (isError.get()) {
transactionStatuses.forEach(s -> transactionManager.rollback(s));
} else {
transactionStatuses.forEach(s -> transactionManager.commit(s));
}
}
System.out.println("主线程完成");
}
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void updateStudentsTransaction(PlatformTransactionManager transactionManager, ListtransactionStatuses, List students) {
// 使用这种方式将事务状态都放在同一个事务里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
transactionStatuses.add(status);
students.forEach(s -> {
// 更新教师信息
// String teacher = s.getTeacher();
String newTeacher = "TNO_" + new Random().nextInt(100);
s.setTeacher(newTeacher);
studentMapper.update(s);
});
System.out.println("子线程:" + Thread.currentThread().getName());
}
使用 union 连接多个 select 实现批量 update
update student,(
(select 1 as id,'teacher_A' as teacher) union
(select 2 as id,'teacher_A' as teacher) union
(select 3 as id,'teacher_A' as teacher) union
(select 4 as id,'teacher_A' as teacher)
/* ....more data ... */
) as new_teacher
set
student.teacher=new_teacher.teacher
where
student.id=new_teacher.id
总结
对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率 多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在 2-5 个线程时操作时间最快。 对于多线程阻塞事务提交时,线程数量不能过多 如果能有办法实现批量更新那是最好
往期热门文章:
1、巨坑,常见的 update 语句很容易造成Bug 2、完爆90%的数据库性能毛病! 3、Spring Boot性能太差,教你几招轻松搞定 4、Fastjson 2 来了,性能继续提升,还能再战十年 5、笑死!程序员延寿指南开源了 6、用 Dubbo 传输文件?被老板一顿揍! 7、45 个 Git 经典操作场景,专治不会合代码! 8、@Transactional 注解失效的3种原因及解决办法 9、小学生们在B站讲算法,网友:我只会阿巴阿巴 10、Spring爆出比Log4j2还大的漏洞?
评论