SpringBoot分布式事务整合Seata
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
1. 适用场景
一个SpringBoot的单体项目整合多个关系型数据库。多数据源。
2. Seata 地址
http://seata.io/zh-cn/docs/overview/what-is-seata.html
3.案例说明
模拟一个用户下订单场景。
创建三个数据库:用户库、商品库、订单库。
SpringBoot 项目配置三个数据库。
订单controller–>订单service(调用商品service、用户service),各自service在调用各自的dao层
4.项目结构展示
5. 代码实现
1.pom.xml 文件
"1.0" encoding="UTF-8"?>
"http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.zlw
seata-distributed-transaction
0.0.1-SNAPSHOT
seata-distributed-transaction
springboot项目测试seata分布式事务
1.8
UTF-8
UTF-8
2.3.0.RELEASE
2.2.1.RELEASE
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-actuator
org.springframework.boot
spring-boot-starter-test
test
org.junit.vintage
junit-vintage-engine
org.projectlombok
lombok
mysql
mysql-connector-java
org.mybatis.spring.boot
mybatis-spring-boot-starter
2.1.3
io.seata
seata-spring-boot-starter
1.3.0
com.baomidou
dynamic-datasource-spring-boot-starter
3.2.0
com.alibaba.nacos
nacos-client
1.3.1
org.springframework.boot
spring-boot-dependencies
${spring-boot.version}
<type>pomtype>
import
com.alibaba.cloud
spring-cloud-alibaba-dependencies
${spring-cloud-alibaba.version}
<type>pomtype>
import
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
src/main/java
**/*.xml
src/main/resources
**/*.*
yml 配置文件
# springboot 整合单体 TC server 配置。
server:
port: 18080
seata:
config:
type: file
application-id: springboot-seata
# enable-auto-data-source-proxy: false
registry:
type: file
# nacos:
# application: seata-server
# cluster: default
# group: SEATA_GROUP
# server-addr: 127.0.0.1:8801,127.0.0.1:8802,127.0.0.1:8803 #192.168.172.128:8848
# namespace: le_zi_jie
# type: nacos
# service:
# vgroup-mapping:
# springboot-seata-group: default
service:
grouplist:
default: 127.0.0.1:8091
vgroup-mapping:
springboot-seata-group: default
# seata 事务组编号 用于TC集群名
tx-service-group: springboot-seata-group
spring:
application:
name: seata-distributed-transaction
datasource:
dynamic:
datasource:
# 设置 账号数据源配置
account-ds:
driver-class-name: com.mysql.cj.jdbc.Driver
password: 123456
url: jdbc:mysql://127.0.0.1:3306/accountdb?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false
username: root
# 设置 订单数据源配置
order-ds:
driver-class-name: com.mysql.cj.jdbc.Driver
password: 123456
url: jdbc:mysql://127.0.0.1:3306/orderdb?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false
username: root
# 设置商品 数据源配置
product-ds:
driver-class-name: com.mysql.cj.jdbc.Driver
password: 123456
url: jdbc:mysql://127.0.0.1:3306/productdb?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false
username: root
# 设置默认数据源或者数据源组 默认值即为master
primary: order-ds # 默认指定一个数据源
# 开启对 seata的支持
seata: true
3.mapper 文件
3.1 product mapper
package com.zlw.seata.mapper;
import com.zlw.seata.mode.Product;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface ProductMapper {
int deleteByPrimaryKey(Integer id);
int insert(Product record);
int insertSelective(Product record);
Product selectByPrimaryKey(Integer id);
int updateByPrimaryKeySelective(Product record);
int updateByPrimaryKey(Product record);
int reduceStock(@Param("productId") Integer productId, @Param("amount") Integer amount);
}
"1.0" encoding="UTF-8"?>
"-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
"com.zlw.seata.mapper.ProductMapper">
"BaseResultMap" type="com.zlw.seata.mode.Product">
"id" jdbcType="INTEGER" property="id"/>
"name" jdbcType="VARCHAR" property="name"/>
"price" jdbcType="DECIMAL" property="price"/>
"stock" jdbcType="INTEGER" property="stock"/>
"add_time" jdbcType="TIMESTAMP" property="addTime"/>
"update_time" jdbcType="TIMESTAMP" property="updateTime"/>
"Base_Column_List">
id, name, price, stock, add_time, update_time
"deleteByPrimaryKey" parameterType="java.lang.Integer">
delete from product
where id = #{id,jdbcType=INTEGER}
"insert" parameterType="com.zlw.seata.mode.Product">
insert into product (id, name, price,
stock, add_time, update_time
)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{price,jdbcType=DECIMAL},
#{stock,jdbcType=INTEGER}, #{addTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}
)
"insertSelective" parameterType="com.zlw.seata.mode.Product">
insert into product
"(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
if>
<if test="name != null">
name,
if>
<if test="price != null">
price,
if>
<if test="stock != null">
stock,
if>
<if test="addTime != null">
add_time,
if>
<if test="updateTime != null">
update_time,
if>
"values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=INTEGER},
if>
<if test="name != null">
#{name,jdbcType=VARCHAR},
if>
<if test="price != null">
#{price,jdbcType=DECIMAL},
if>
<if test="stock != null">
#{stock,jdbcType=INTEGER},
if>
<if test="addTime != null">
#{addTime,jdbcType=TIMESTAMP},
if>
<if test="updateTime != null">
#{updateTime,jdbcType=TIMESTAMP},
if>
"updateByPrimaryKeySelective" parameterType="com.zlw.seata.mode.Product">
update product
<set>
<if test="name != null">
name = #{name,jdbcType=VARCHAR},
if>
<if test="price != null">
price = #{price,jdbcType=DECIMAL},
if>
<if test="stock != null">
stock = #{stock,jdbcType=INTEGER},
if>
<if test="addTime != null">
add_time = #{addTime,jdbcType=TIMESTAMP},
if>
<if test="updateTime != null">
update_time = #{updateTime,jdbcType=TIMESTAMP},
if>
set>
where id = #{id,jdbcType=INTEGER}
"updateByPrimaryKey" parameterType="com.zlw.seata.mode.Product">
update product
set name = #{name,jdbcType=VARCHAR},
price = #{price,jdbcType=DECIMAL},
stock = #{stock,jdbcType=INTEGER},
add_time = #{addTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
"reduceStock">
update product SET stock = stock - #{amount, jdbcType=INTEGER}
WHERE id = #{productId, jdbcType=INTEGER} AND stock >= #{amount, jdbcType=INTEGER}
3.2 OrdersMapper
package com.zlw.seata.mapper;
import com.zlw.seata.mode.Orders;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface OrdersMapper {
int deleteByPrimaryKey(Integer id);
int insert(Orders record);
int insertSelective(Orders record);
Orders selectByPrimaryKey(Integer id);
int updateByPrimaryKeySelective(Orders record);
int updateByPrimaryKey(Orders record);
}
"1.0" encoding="UTF-8"?>
"-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
"com.zlw.seata.mapper.OrdersMapper">
"BaseResultMap" type="com.zlw.seata.mode.Orders">
"id" jdbcType="INTEGER" property="id" />
"user_id" jdbcType="INTEGER" property="userId" />
"product_id" jdbcType="INTEGER" property="productId" />
"pay_amount" jdbcType="DECIMAL" property="payAmount" />
"add_time" jdbcType="TIMESTAMP" property="addTime" />
"update_time" jdbcType="TIMESTAMP" property="updateTime" />
"Base_Column_List">
id, user_id, product_id, pay_amount, add_time, update_time
"deleteByPrimaryKey" parameterType="java.lang.Integer">
delete from orders
where id = #{id,jdbcType=INTEGER}
"insert" parameterType="com.zlw.seata.mode.Orders">
insert into orders (id, user_id, product_id,
pay_amount, add_time, update_time
)
values (#{id,jdbcType=INTEGER}, #{userId,jdbcType=INTEGER}, #{productId,jdbcType=INTEGER},
#{payAmount,jdbcType=DECIMAL}, #{addTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}
)
"insertSelective" parameterType="com.zlw.seata.mode.Orders">
insert into orders
"(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
if>
<if test="userId != null">
user_id,
if>
<if test="productId != null">
product_id,
if>
<if test="payAmount != null">
pay_amount,
if>
<if test="addTime != null">
add_time,
if>
<if test="updateTime != null">
update_time,
if>
"values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=INTEGER},
if>
<if test="userId != null">
#{userId,jdbcType=INTEGER},
if>
<if test="productId != null">
#{productId,jdbcType=INTEGER},
if>
<if test="payAmount != null">
#{payAmount,jdbcType=DECIMAL},
if>
<if test="addTime != null">
#{addTime,jdbcType=TIMESTAMP},
if>
<if test="updateTime != null">
#{updateTime,jdbcType=TIMESTAMP},
if>
"updateByPrimaryKeySelective" parameterType="com.zlw.seata.mode.Orders">
update orders
<set>
<if test="userId != null">
user_id = #{userId,jdbcType=INTEGER},
if>
<if test="productId != null">
product_id = #{productId,jdbcType=INTEGER},
if>
<if test="payAmount != null">
pay_amount = #{payAmount,jdbcType=DECIMAL},
if>
<if test="addTime != null">
add_time = #{addTime,jdbcType=TIMESTAMP},
if>
<if test="updateTime != null">
update_time = #{updateTime,jdbcType=TIMESTAMP},
if>
set>
where id = #{id,jdbcType=INTEGER}
"updateByPrimaryKey" parameterType="com.zlw.seata.mode.Orders">
update orders
set user_id = #{userId,jdbcType=INTEGER},
product_id = #{productId,jdbcType=INTEGER},
pay_amount = #{payAmount,jdbcType=DECIMAL},
add_time = #{addTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
3.3 AccountMapper
package com.zlw.seata.mapper;
import com.zlw.seata.mode.Account;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.math.BigDecimal;
@Mapper
public interface AccountMapper {
int deleteByPrimaryKey(Integer id);
int insert(Account record);
int insertSelective(Account record);
Account selectByPrimaryKey(Integer id);
Account selectAccountByUserId(Integer userId);
int updateByPrimaryKeySelective(Account record);
int updateByPrimaryKey(Account record);
int reduceBalance(@Param("userId") Integer userId, @Param("money") BigDecimal money);
}
"1.0" encoding="UTF-8"?>
"-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
"com.zlw.seata.mapper.AccountMapper">
"BaseResultMap" type="com.zlw.seata.mode.Account">
"id" jdbcType="INTEGER" property="id"/>
"user_id" jdbcType="INTEGER" property="userId"/>
"balance" jdbcType="DECIMAL" property="balance"/>
"update_time" jdbcType="TIMESTAMP" property="updateTime"/>
"Base_Column_List">
id, user_id, balance, update_time
"deleteByPrimaryKey" parameterType="java.lang.Integer">
delete from account
where id = #{id,jdbcType=INTEGER}
"insert" parameterType="com.zlw.seata.mode.Account">
insert into account (id, user_id, balance,
update_time)
values (#{id,jdbcType=INTEGER}, #{userId,jdbcType=INTEGER}, #{balance,jdbcType=DOUBLE},
#{updateTime,jdbcType=TIMESTAMP})
"insertSelective" parameterType="com.zlw.seata.mode.Account">
insert into account
"(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
if>
<if test="userId != null">
user_id,
if>
<if test="balance != null">
balance,
if>
<if test="updateTime != null">
update_time,
if>
"values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=INTEGER},
if>
<if test="userId != null">
#{userId,jdbcType=INTEGER},
if>
<if test="balance != null">
#{balance,jdbcType=DOUBLE},
if>
<if test="updateTime != null">
#{updateTime,jdbcType=TIMESTAMP},
if>
"updateByPrimaryKeySelective" parameterType="com.zlw.seata.mode.Account">
update account
<set>
<if test="userId != null">
user_id = #{userId,jdbcType=INTEGER},
if>
<if test="balance != null">
balance = #{balance,jdbcType=DOUBLE},
if>
<if test="updateTime != null">
update_time = #{updateTime,jdbcType=TIMESTAMP},
if>
set>
where id = #{id,jdbcType=INTEGER}
"updateByPrimaryKey" parameterType="com.zlw.seata.mode.Account">
update account
set user_id = #{userId,jdbcType=INTEGER},
balance = #{balance,jdbcType=DOUBLE},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
"reduceBalance">
update account
SET balance = balance - #{money}
WHERE user_id = #{userId, jdbcType=INTEGER}
AND balance >= ${money}
4.service
4.1 ProductService
package com.zlw.seata.service;
import com.zlw.seata.mode.Product;
public interface ProductService {
/**
* 减库存
*
* @param productId 商品 ID
* @param amount 扣减数量
* @throws Exception 扣减失败时抛出异常
*/
Product reduceStock(Integer productId, Integer amount) throws Exception;
}
package com.zlw.seata.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.zlw.seata.mapper.ProductMapper;
import com.zlw.seata.mode.Product;
import com.zlw.seata.service.ProductService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Override
@DS(value = "product-ds")
public Product reduceStock(Integer productId, Integer amount) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectByPrimaryKey(productId);
if (product.getStock() < amount) {
throw new Exception("库存不足");
}
// 扣减库存
int updateCount = productMapper.reduceStock(productId, amount);
// 扣除成功
if (updateCount == 0) {
throw new Exception("库存不足");
}
// 扣除成功
log.info("扣除 {} 库存成功", productId);
return product;
}
}
4.2 OrderService
package com.zlw.seata.service;
public interface OrderService {
/**
* 下订单
*
* @param userId 用户id
* @param productId 产品id
* @return 订单id
* @throws Exception 创建订单失败,抛出异常
*/
Integer createOrder(Integer userId, Integer productId) throws Exception;
}
package com.zlw.seata.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.zlw.seata.mapper.OrdersMapper;
import com.zlw.seata.mode.Orders;
import com.zlw.seata.mode.Product;
import com.zlw.seata.service.AccountService;
import com.zlw.seata.service.OrderService;
import com.zlw.seata.service.ProductService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrdersMapper ordersMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@Override
@DS(value = "order-ds")
@GlobalTransactional //seata全局事务注解
public Integer createOrder(Integer userId, Integer productId) throws Exception {
Integer amount = 1; // 购买数量暂时设置为 1
log.info("当前 XID: {}", RootContext.getXID());
// 减库存 - 远程服务
Product product = productService.reduceStock(productId, amount);
// 减余额 - 远程服务
accountService.reduceBalance(userId, product.getPrice());
// 下订单 - 本地下订单
Orders order = new Orders();
order.setUserId(userId);
order.setProductId(productId);
order.setPayAmount(product.getPrice().multiply(new BigDecimal(amount)));
ordersMapper.insertSelective(order);
log.info("下订单: {}", order.getId());
// 返回订单编号
return order.getId();
}
}
4.3 AccountService
package com.zlw.seata.service;
import java.math.BigDecimal;
public interface AccountService {
/**
* 减余额
*
* @param userId 用户id
* @param money 扣减金额
* @throws Exception 失败时抛出异常
*/
void reduceBalance(Integer userId, BigDecimal money) throws Exception;
}
package com.zlw.seata.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.zlw.seata.mapper.AccountMapper;
import com.zlw.seata.mode.Account;
import com.zlw.seata.service.AccountService;
import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Override
@DS(value = "account-ds")
public void reduceBalance(Integer userId, BigDecimal money) throws Exception {
log.info("当前 XID: {}", RootContext.getXID());
// 检查余额
Account account = accountMapper.selectAccountByUserId(userId);
if (account.getBalance().doubleValue() < money.doubleValue()) {
throw new Exception("余额不足");
}
// 扣除余额
int updateCount = accountMapper.reduceBalance(userId, money);
// 扣除成功
if (updateCount == 0) {
throw new Exception("余额不足");
}
log.info("扣除用户 {} 余额成功", userId);
}
}
5 controller 调用
package com.zlw.seata.contorller;
import com.zlw.seata.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j //lombok
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@RequestMapping("/order")
public Integer createOrder(@RequestParam("userId") Integer userId,
@RequestParam("productId") Integer productId) throws Exception {
log.info("请求下单, 用户:{}, 商品:{}", userId, productId);
return orderService.createOrder(userId, productId);
}
}
6.表数据
CREATE TABLE `account` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`balance` int(11) DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `orders` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`product_id` int(11) DEFAULT NULL,
`pay_amount` int(11) DEFAULT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=50 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `product` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` int(11) DEFAULT NULL,
`stock` int(11) DEFAULT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
account 表:
INSERT INTO `account` (`id`, `user_id`, `balance`, `update_time`) VALUES ('1', '1', '200', '2021-01-15 00:02:17');
product 表:
INSERT INTO `product` (`id`, `name`, `price`, `stock`, `add_time`, `update_time`) VALUES ('1', '电池', '10', '67', '2021-01-15 00:00:32', '2021-01-15 00:00:35');
每个库都需要有一个undo_log表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=137 DEFAULT CHARSET=utf8;
7.seata的配置
单体SpringBoot使用默认的文件进行seata TCServer的日志记录(集群的时候使用MySQL),此处也可以使用MySQL。
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
}
6. 调用测试
6.1 启动seata,seata默认端口是8091
win 使用cmd 运行seata-server.bat 文件
6.2 启动项目
可以看到8091 说明项目连接seata TCServer正常,项目启动正常。
浏览器发送请求:
http://localhost:18080/order?userId=1&productId=1
请求之前数据:
正常请求:
模拟异常:
二阶段回滚
版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
本文链接:
https://blog.csdn.net/qq_32691791/article/details/112727500
粉丝福利:Java从入门到入土学习路线图
👇👇👇
👆长按上方微信二维码 2 秒
感谢点赞支持下哈