分布式事务解决方案之RabbitMQ

共 9233字,需浏览 19分钟

 ·

2022-02-14 21:46

点击上方 Java学习之道,选择 设为星标

每天18:30点,干货准时奉上!

作者: _沸羊羊_
来源: juejin.cn/post/7007040162676883486

Part1前言

之前介绍了分布式事务解决方案:2PC,3PC,TCC机制,今天主要介绍基于 RabbitMQ 的分布式事务解决方案,Let's go !

Part2分布式事务问题

以电商业务为例,订单服务对应的订单数据库,库存服务对应的库存数据库,多个数据源之间存在了分布式事务问题。如何保证在订单生成后,正确的扣除库存,或者在订单生成失败时,还原扣除的库存,这就是分布式事务将要解决的问题。

解决分布式事务前的业务逻辑:

@Transaction(rollbackFor=Exception.class)
public void createOrder(OrderInfo orderInfothrows Exception 
{
    // 订单系统 —— 生成订单
    orderService.saveOrder(orderInfo);
    
    // 库存系统 —— 扣除库存
    if(!"success".equals(reduceStock(orderInfo))){
        throw new Exception("订单创建失败,原因:库存扣除失败!");
    }
}

上述存在的问题:

  • 库存服务的接口调用成功,订单数据库事务提交失败,库存没有回滚,导致订单生成失败但扣除了库存
  • 库存服务的接口调用超时,订单系统数据库事务被回滚,库存系统接口继续执行,导致订单生成失败但扣除了库存

Part3解决方案

生产者:订单系统将订单信息的消息发送到 RabbitMQ 中,消息到达交换机后通过 confirm 机制将接收成功的消息返回给订单系统(生产者),生产者收到正确的 confirm 后给用户返回订单创建成功的响应。

队列:为了防止 RabbitMQ 在收到消息后还未发送给消费者处理就挂了的情况,将队列和消息分别持久化。

消费者:消费者接收到消息后,为了确保消息能够被成功正确的处理,需要考虑消息丢失问题和消息幂等性等问题。

可靠的发送消息

在订单系统创建订单的事务中添加把消息保存到订单数据库的业务逻辑,将消息在生产者中持久化。

public class OrderDBService {
    public void createOrder(OrderInfo orderInfo) throws Exception {
        // 创建订单
        String sql = "...";
        int count = jdbcTemplate.insert(sql);
        if(count != 1) {
            throw new Exception("订单创建失败,原因:数据库操作失败!");
        }
        
        // 将消息保存到 MQ
        saveMQMessage(orderInfo);
    }
    
    /** 
     * 将消息持久化,此时的消息状态字段为:未发送
     */

    public void saveMQMessage(OrderInfo orderInfo) throws Exception {
        String sql = "...";
        int count = jdbcTemplate.insert(sql);
        if(count != 1) {
            throw new Exception("消息记录失败,原因:数据库操作失败!");
        }
    }
}

创建订单成功后,将订单消息发送到 MQ 中,开启发布确定机制;生产者收到确认后在回调方法中更新本地消息表的消息状态为:已发送。

spring:
  rabbitmq:
    ......
    # 开启 confirm 模式
    publisher-confirm-type: correlated
@Component
@Slf4j
public class MQService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OrderInfo orderInfo) {
        // 发送消息到 MQ
        rabbitTemplate.convertAndSend("directEx""", orderInfo);
        // 回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送成功!");
            } else {
                log.info("消息发送失败!");
            }
        });
    }
}

消息的持久化

将队列声明为持久化

@Bean
public Queue getRoutingQueue() {
    // 参数:queueName,durable,exclusive,autoDelete
    return new Queue("routingQueue"truefalsetrue);
}

将消息声明为持久化

public void send(String msg) {
    // 持久化消息
    Message message = MessageBuilder.withBody(msg.getBytes()).build();
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    // 发送消息到 MQ
    rabbitTemplate.convertAndSend("directEx""", message);
    // 回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("消息发送成功!");
        } else {
            log.info("消息发送失败!");
        }
    });
}

可靠的消费消息

保证消费者可靠消费有两个方面,消息丢失和消息幂等性问题。

使用手动ACK机制解决消息丢失问题。

spring:
  rabbitmq:
    listener:
        simple:
            # 开启手动ACK
            acknowledge-mode: manual
@Component
@Slf4j
@RabbitListener(queues = "routingQueue")
public class Consumer {

    @RabbitHandler
    public void process(String msg, Message message, Channel channel) throws IOException {
        try {
            log.info("consumer received msg : " + msg);
            // 手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.info("消息入队失败");
            // 重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
        }
    }
}

配置 RabbitMQ 的 retry 机制,当消费者出现发送 NACK或异常时,会根据配置的 retry 机制进行重试。

解决消息的幂等性问题可以给消息添加 messageID,并且消费者消费成功后放入将 messageID 放入 redis 中。

// 生产者发送消息前设置 messageID
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
@Component
@Slf4j
@RabbitListener(queues = "routingQueue")
public class Consumer {

    @Autowired
    private RedisTemplate redisTemplate;

    @RabbitHandler
    public void process(String msg, Message message, Channel channel) throws IOException {
        // redis 的 setnx 命令,如果 messageID 已存在代表此消息已被消费过
        if (!redisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(), msg)) {
            return;
        }
        try {
            log.info("consumer received msg : " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.out.println("消息入队失败");
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
        }
    }
}

综上,使用手动 ACK 和 retry 机制解决了消息丢失的问题,使用 messageID 存到 redis 解决了消息幂等性问题。

Part4总结

-- END --

 | 更多精彩文章 -



加我微信,交个朋友
长按/扫码添加↑↑↑

浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报