分布式事务中使用RocketMQ的事务消息机制优化事务的处理逻辑
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
1、事务消费介绍
支付宝账户表:A(id,userId,amount)
余额宝账户表:B(id,userId,amount)
用户的userId=1;
1)支付宝表扣除1万:update A set amount=amount-10000 where userId=1;
2)余额宝表增加1万:update B set amount=amount+10000 where userId=1;
Begin transaction
update A set amount=amount-10000 where userId=1;
update B set amount=amount+10000 where userId=1;
End transaction
commit;
@Transactional(rollbackFor=Exception.class)
public void update() {
//更新A表
updateATable();
//更新B表
updateBTable();
}
2、分布式事务—————— 两阶段提交协议
1)两阶段提交涉及多次节点间的网络通信,通信时间太长!
2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!
3、使用消息队列来避免分布式事务
1)业务与消息耦合的方式
Begin transaction
update A set amount=amount-10000 where userId=1;
insert into message(userId, amount,status) values(1, 10000, 1);
End transaction
commit;
2)业务与消息解耦方式
4、那么如何解决消息重复投递的问题?
For each msg in queue
Begin transaction
select count(*) as cnt from message_apply where msg_id=msg.msg_id;
if cnt==0 then
update B set amount=amount+10000 where userId=1;
insert into message_apply(msg_id) values(msg.msg_id);
end if
End transaction
commit;
End For
5、大事务 = 小事务 + 异步
6、Rocket事务流程处理分析
// =============================发送事务消息的一系列准备工作========================================
// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
// ================================事务消息的发送过程=============================================
public TransactionSendResult sendMessageInTransaction(.....) {
// 逻辑代码,非实际代码
// 1.发送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息发送成功,处理与消息关联的本地事务单元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.结束事务
this.endTransaction(sendResult, localTransactionState, localException);
}
根据sendResult找到Prepared消息 ,sendResult包含事务消息的ID
根据localTransaction更新消息的最终状态
7、交易事务处理示例
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
//设置用于事务消息的处理线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService);
//设置事务监听器,监听器实现接口org.apache.rocketmq.client.producer.TransactionListener
//监听器中实现需要处理的交易业务逻辑的处理,以及MQ Broker中未确认的事务与业务的确认逻辑
producer.setTransactionListener(transactionListener);
producer.start();
//生成不同的Tag,用于模拟不同的处理场景
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
//组装产生消息
Message msg =
new Message("TopicTransaction", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//以事务发送消息,并在事务消息被成功预写入到RocketMQ中后,执行用户定义的交易逻辑,
//交易逻辑执行成功后,再实现实现业务消息的提交逻辑
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
System.out.printf("%s%n", sendResult.getTransactionId());
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 该方法会在消息成功预写入RocketMQ后被执行
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("开始处理业务逻辑...");
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
switch (status) {
case 0:
//LocalTransactionState.UNKNOW表示未知的事件,需要RocketMQ进一步服务业务进行确认该交易的处理
//结果,确认消息被调用的方法为下方的checkLocalTransaction。
//注:RocketMQ与业务确认消息的执行状态的功能已经被移除了,在早期3.0.8的版本中有该功能,因而如果
//返回的状态为UNKNOW,则该消息不会被提交
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
/**
* 该方法用于RocketMQ与业务确认未提交事务的消息的状态,不过该方法已经的实现在RocketMQ中已经
* 被删除了,因而其功能也就没有意义了。
* 不过如果使用阿里云的企业的RocketMQ服务,该功能会起作用。
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
int mod = msg.getTransactionId().hashCode() % 2;
if (null != status) {
switch (mod) {
case 0:
return LocalTransactionState.ROLLBACK_MESSAGE;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTransaction", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
private Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("获取到消息开始消费:"+msg + " , content : " + new String(msg.getBody()));
}
try {
// 模拟业务处理
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (Exception e) {
e.printStackTrace();
//返回处理失败,该消息后续可以继续被消费
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//返回处理成功,该消息就不会再次投递过来了
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ! ");
}
}
8、如何保证扣钱与加钱的事务的最终一致性
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:
https://blog.csdn.net/fenglibing/article/details/92417739
评论