CAS+失败重试方式实现数据库的原子性更新

程序新视界

共 11340字,需浏览 23分钟

 ·

2022-07-31 17:15

在前面一篇文章中我们讲了Java底层基于Unsafe的CAS实现《什么是CAS?如果说不清楚,这篇文章要读一读!》,这篇文章来看看基于Spring和CAS理念来实现的乐观锁方案。

在数据库修改单条数据时,常用的方式是select for update的悲观锁机制,如果锁竞争比较大,没有获得锁的操作会阻塞。使用CAS乐观锁的方式,可以大大提高并发性。例如,在分布式服务中,多个用户并发下单操作前会先扣减库存时,服务1、服务2和服务3为不同机器上的库存服务。

库存扣减操作流程如下:

扣减库存

使用CAS方式的乐观锁,当库存还剩3个,3个用户同时下单,服务同时扣减库存,可以并发地扣减成功,提高了并发性。如果库存还剩1个,3个用户同时下单,同时扣减库存,这时只有1个用户会操作成功,其余2个会失败,避免了超卖。

在库存的CAS操作中,先进行库存查询操作,然后根据value+version方式修改库存,如果操作失败,则幂等地重复操作。通常会从操作次数和执行时间两个条件限制CAS的操作,如果两个条件中有某个条件触发,可以抛出乐观锁异常。

在Java项目中,通过AOP+注解的方式实现数据库操作的CAS幂等操作的统一处理。定义OptimisticRetry注解,标识CAS重试Spring AOP的切点,并且设置属性value(最大重试次数条件),以及属性maxExecuteTime(最大执行时间条件)。OptimisticRetryAOP定义CAS重试的切面,乐观锁的实现。OptimisticRetry注解定义代码如下:

/**
 * 乐观锁的重试
 **/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface OptimisticRetry {
 
    /**
     * 最大重试次数
     */
    int value() default 200;
 
    /**
     * 最大执行时间
     */
    int maxExecuteTime() default 2 * 60 * 60 * 1000;
}

OptimisticRetryAOP切面,定义标注了OptimisticRetry注解的方法则进行,CAS幂等重试,如果达到最大重试次数限制或者最大执行时间限制,则抛出乐观锁异常OptimisticLockingFailureException。

代码如下:

/**
 * 乐观锁的重试
 **/
@Aspect
@Component
@Order(10000)
@Slf4j
public class OptimisticRetryAOP {
 
    @Pointcut("@annotation(com.smcx.winemall.common.lock.OptimisticRetry)|| @within(com.smcx.winemall.common.lock.OptimisticRetry)")
    public void optimisticRetryPointcut() {
 
    }
 
    @Around("optimisticRetryPointcut()")
    public Object doConcurrentOperation(ProceedingJoinPoint joinPoint) throws Throwable {
 
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        // 代理目标对象Class
        Class targetClazz = joinPoint.getTarget().getClass();
        Method method = targetClazz.getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
        Optional<OptimisticRetry> optimisticRetryOption = getOptimisticRetry(method.getAnnotations());
        if (!optimisticRetryOption.isPresent()) {
 
            log.warn("no OptimisticRetry Annotation to execute!");
            return joinPoint.proceed();
        }
        OptimisticRetryConfig optimisticRetryConfig = obtainOptimisticRetryConfig(optimisticRetryOption.get());
        int numAttempts = 0;
        OptimisticLockingFailureException lockFailureException;
        long startTime = System.currentTimeMillis();
        do {
            numAttempts++;
            try {
                return joinPoint.proceed();
            } catch (OptimisticLockingFailureException ex) {
 
                lockFailureException = ex;
                long executeTime = System.currentTimeMillis() - startTime;
                long maxExecuteTime = optimisticRetryConfig.getMaxExecuteTime();
                if (isLargerThanMaxExecuteTime(executeTime, maxExecuteTime)) {
 
                    log.warn("throw optimistic locking failure exception!num attempts [{}],start time [{}]," +
                                    "actual execute time [{}] ms, max execute time [{}] ms",
                            numAttempts, startTime, executeTime, maxExecuteTime);
                    throw lockFailureException;
                }
            }
        }
        while (numAttempts <= optimisticRetryConfig.getMaxTryCount());
 
        log.warn("throw optimistic locking failure exception!num attempts {} ", numAttempts);
        throw lockFailureException;
    }
 
    /**
     * 大于限制的重试执行时间
     *
     * @param executeTime
     * @return
     */
    private boolean isLargerThanMaxExecuteTime(long executeTime, long maxExecuteTime) {
 
        if (executeTime <= 0) {
            return false;
        }
        if (maxExecuteTime > executeTime) {
            return true;
        }
        return false;
    }
 
    private OptimisticRetryConfig obtainOptimisticRetryConfig(OptimisticRetry optimisticRetry) {
 
        // 从注解中获取配的值
        OptimisticRetryConfig optimisticRetryConfig = new OptimisticRetryConfig();
        optimisticRetryConfig.setMaxTryCount(optimisticRetry.value());
        optimisticRetryConfig.setMaxExecuteTime(optimisticRetry.maxExecuteTime());
        return optimisticRetryConfig;
    }
 
    private Optional<OptimisticRetry> getOptimisticRetry(Annotation[] annotations) {
 
        if (ArrayUtils.isEmpty(annotations)) {
            return Optional.empty();
        }
        for (Annotation anno : annotations) {
            if (anno.annotationType().getName().equals(OptimisticRetry.class.getName())) {
                return Optional.of((OptimisticRetry) anno);
            }
        }
        return Optional.empty();
    }
 
    /**
     * 重试配置
     */
    @Data
    private static class OptimisticRetryConfig implements Serializable {
 
        private static final long serialVersionUID = -182211651320526367L;
 
        /**
         * 最大重试次数
         */
        private int maxTryCount;
 
        /**
         * 最大执行时间
         */
        private long maxExecuteTime;
    }
}

业务代码中操作,代码如下:

  @Override
    @Transactional(rollbackFor = Exception.class)
    @OptimisticRetry
    public void decreaseStockRemainingAmount(OperateStockDto operateStock) {
 
        // 获取存储
        WinemallStock existStock = winemallStockMapper.selectByOperateStock(operateStock);
        // TODO 库存是否充足判断等条件判断
 
        // 修改库存数据
        if (0 == winemallStockMapper.updateStockRemainingAmount(existStock, 1)) {
            throw new OptimisticLockingFailureException("decrease stock remaining amount optimistic locking failure!");
        }
    }

来源:blog.csdn.net/new_com/article/details/103834871

面试被问Linux 命令su和sudo的区别?

2022-07-26

到底如何保证线程安全,你真的清楚吗?(干货推荐)

2022-07-25

4种 Redis 集群方案介绍+优缺点对比

2022-07-24

2.2w Star,这是一款什么样的Nginx可视化配置神器?

2022-07-23

什么是CAS?如果说不清楚,这篇文章要读一读!

2022-07-21





如果你觉得这篇文章不错,那么,下篇通常会更好。备注“公众号”添加微信好友(微信号:zhuan2quan)

▲ 按关注”程序新视界“,洞察技术内幕


浏览 64
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报