如何通过事务消息保障抢购业务的分布式一致性?
前言
事务一致性原理回顾
脏读:事务 A 读到了事务 B 还没有提交的数据。
不可重复读:在一个事务里面对某个数据读取了两次,读出来的数据不一致。
幻读:在一个事务对某个数据集用同样的方式读取了两次,数据集的条目数量不一致。
READ_UNCOMMITTED(读未提交):最低的隔离级别,可以读到未提交的数据,无法解决脏读、不可重复读、幻读中的任何一种。
READ_COMMITED (读已提交):能够防止脏读,但是无法解决不可重复读和幻读的问题。
REPEATABLE_READ (重复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可重复读的问题,但是幻读的问题还是无法解决。
SERLALIZABLE ( 串行化):最高的事务隔离级别,避免了事务的并行执行,解决了脏读、不可重复读和幻读的问题,但性能最低。
抢购业务中的分布式事务
分布式事务的实现方式
传统分布式事务
提供了强一致性保证,在业务执行的任何时间点都能确保事务一致性。
使用简单。常见的关系型数据库都提供了对XA协议的支持,通过引入事务协调器,业务代码跟使用单机事务相比基本上没有差别。
柔性事务
事务消息原理分析
抢购业务场景拆解
引入消息异步通知机制
先执行本地事务,还是先发送异步消息?
如何确保远程事务能执行成功?
消息队列集群在将异步消息投递到远程事务参与方的时候,由于网络不稳定,消息没能投递成功。
消息投递成功了,但远程事务参与方还没来得及执行远程事务,就宕机了。
完整流程
事务消息实战
消息队列 RocketMQ
开通 RocketMQ 服务
创建资源
本地事务参与方的业务代码
1、初始化 TransactionProducer
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.2.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.13.1</version>
</dependency>
TransactionProducer
,用于异步消息的发送,需要填入如下信息:Group ID:之前创建的用于本地事务参与方的 Group ID。 Access key和Secret Key:RAM 用户对应的密钥信息,从 RAM 用户控制台获得。 Nameserver Address:RocketMQ 实例的接入点信息,从 RocketMQ 控制台获得。
Properties properties = new Properties();
// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
2、获取全局唯一的交易流水号
3、实现本地事务回查逻辑
LocalTransactionChecker
接口的LocalTransactionCheckerImpl
类,实现其中的check(Message)
方法,该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文讨论的范围之前,我们将其封装在BusinessService
类中。package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);
private static BusinessService businessService = new BusinessService();
public TransactionStatus check(Message msg) {
// 从消息体中获得的交易ID
String transactionKey = msg.getKey();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean isCommit = businessService.checkbusinessService(transactionKey);
if (isCommit) {
transactionStatus = TransactionStatus.CommitTransaction;
} else {
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
LOGGER.error("Transaction Key:{}", transactionKey, e);
}
LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());
return transactionStatus;
}
}
4、执行本地事务并发送异步消息
LocalTransactionExecuter
接口的匿名类,通过send
方法进行发送,这就是本地事务参与方所需要实现的所有业务代码了。当然,这个匿名类实现了TransactionStatus execute.execute()
方法,其中包含了对于本地事务的执行。完整代码如下:package transaction;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TransactionProducerClient {
private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
private static final BusinessService businessService = new BusinessService();
private static final String TOPIC = "create_order";
private static final TransactionProducer producer = null;
static {
Properties properties = new Properties();
// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
// LocalTransactionCheckerImpl本地事务回查类的实现
TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
}
public static void main(String[] args) throws InterruptedException {
String transactionKey = getGlobalTransactionKey();
String messageContent = String.format("lock inventory for: %s", transactionKey);
Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());
try {
SendResult sendResult = producer.send(message, (msg, arg) -> {
// 此处用Lambda表示,实际是实现TransactionStatus execute(final Message msg, final Object arg)方法
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
boolean localTransactionOK = businessService.execbusinessService(transactionKey);
if (localTransactionOK) {
transactionStatus = TransactionStatus.CommitTransaction;
} else {
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
LOGGER.error("Transaction Key:{}", transactionKey, e);
}
LOGGER.warn("Transaction Key:{}", transactionKey);
return transactionStatus;
}, null);
LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);
} catch (Exception e) {
LOGGER.info("send message failed, Transaction Key:{}", message.getKey());
}
// demo example防止进程退出
TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
}
private static String getGlobalTransactionKey() {
// TODO
return "";
}
}
远程事务参与方的业务代码
package transaction;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class TransactionConsumerClient {
private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);
private static final BusinessService businessService = new BusinessService();
private static final String TOPIC = "create_order";
private static final Consumer consumer = null;
static {
Properties properties = new Properties();
// 在控制台创建的Group ID,不同于本地事务参与方使用的Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.start();
}
public static void main(String[] args) {
consumer.subscribe(TOPIC, "*", (message, context) -> {
LOGGER.info("Receive: " + message);
businessService.doBusiness(message);
// 返回CommitMessage,代表给予消息队列集群异步消息已经得到正常处理的回馈
return Action.CommitMessage;
}
);
}
}
事务回滚
技术异常:远程事务参与方宕机、网络故障、数据库故障等。
业务异常:远程逻辑在业务上无法执行、代码业务逻辑错误等。
多个事务参与方
有可能发生业务异常的:比如锁定库存的操作,有可能因为库存不足而执行失败。又比如扣除积分的操作,有可能因为用户积分不足而无法扣除。
不太可能发生业务异常的:比如删除购物车条目的操作,除非是技术类故障,一定可以执行成功,即便对应的条目并不存在,也没有关系。又比如积分增加的操作,只要对应的用户没有注销,是不可能遇到业务异常的。
其他注意事项
消息幂等
每日对账
1、消息重试多次后,依然不成功:当消费者完全无法正常工作的时候,RocketMQ 不可能永无止境地重试消息,事实上,如果16次重试后异步消息依然没有办法被正常处理,RocketMQ 会停止尝试,将消息放到一个特殊的队列中。
2、未处理的业务异常:比如给某个账号加积分的时候,发现此账号被注销了,这是一个非常罕见的业务现象,有可能事先对此并没有健壮的处理机制。
3、幂等校验失败:处理幂等所依赖的系统比如 Redis 发生了故障,导致某些消息被重复处理。
4、其他严重的系统故障:比如网络长时间中断,留下了大量执行到一半的事务。
5、其他漏网之鱼。
总结
干货分享
最近将个人学习笔记整理成册,使用PDF分享。关注我,回复如下代码,即可获得百度盘地址,无套路领取!
•001:《Java并发与高并发解决方案》学习笔记;•002:《深入JVM内核——原理、诊断与优化》学习笔记;•003:《Java面试宝典》•004:《Docker开源书》•005:《Kubernetes开源书》•006:《DDD速成(领域驱动设计速成)》•007:全部•008:加技术群讨论
近期热文
•LinkedBlockingQueue vs ConcurrentLinkedQueue•解读Java 8 中为并发而生的 ConcurrentHashMap•Redis性能监控指标汇总•最全的DevOps工具集合,再也不怕选型了!•微服务架构下,解决数据库跨库查询的一些思路•聊聊大厂面试官必问的 MySQL 锁机制
关注我
喜欢就点个"在看"呗^_^