分布式锁(三):基于Redisson的分布式锁实践

ProjectDaedalus

共 16414字,需浏览 33分钟

 ·

2022-05-10 15:11

Redisson是基于Redis的Java驻内存数据网格(In-Memory Data Grid),底层使用Netty进行实现。其提供了相应的分布式锁实现

abstract.png

RedissonLock 分布式非公平可重入互斥锁

RedissonLock是一个分布式非公平可重入互斥锁,其在获取锁的过程中,支持lock阻塞式、tryLock非阻塞式两种形式。其中,这两个方法还有多个重载版本,以支持设置锁的最大持有时间、设置获取锁的最大等待时间。具体方法如下所示

# 阻塞式获取锁
void lock();

# 阻塞式获取锁
# 支持通过leaseTime参数设置锁的最大持有时间
void lock(long leaseTime, TimeUnit unit);

# 非阻塞式获取锁
boolean tryLock();

# 非阻塞式获取锁
# 支持通过time参数设置获取锁的最大等待时间
boolean tryLock(long time, TimeUnit unit);

# 非阻塞式获取锁
# 支持通过waitTime参数设置获取锁的最大等待时间
# 支持通过leaseTime参数设置锁的最大持有时间
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

可重入性

现在我们验证下RedissonLock是一个互斥锁并且支持可重入。示例代码如下所示

/**
 * RedissonLock Demo : 分布式非公平可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonLockDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static RedissonClient redissonClient;

    @BeforeClass
    public static void init() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        redissonClient = Redisson.create( config );
    }

    /**
     * 测试: 阻塞式获取锁、可重入性
     */

    @Test
    public void testLock1() {
        final String lockName = "sellPc";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getLock(lockName);
            try{
                // 阻塞式获取锁
                lock.lock();
                info("成功获取锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));

                lock.lock();
                info("成功获取锁 #2");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            } finally {
                info("释放锁 #2");
                lock.unlock();

                info("释放锁 #1\n");
                lock.unlock();
            }
        };

        RLock tempLock = redissonClient.getLock(lockName);
        if( tempLock instanceof RedissonLock) {
            System.out.println("锁类型: RedissonLock");
        }
        try{ Thread.sleep( 1*1000 ); } catch (Exception e) {}
        
        for (int i=1; i<=3; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
        System.out.println("---------------------- 系统下线 ----------------------");
    }


    @AfterClass
    public static void close() {
        redissonClient.shutdown();
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,符合预期

figure 1.jpeg

显式指定锁的最大持有时间

前面提到支持通过leaseTime参数显式设置锁的最大持有时间,当业务持锁时间超过leaseTime参数值,则其持有的锁会被自动释放。但需要注意的是某个线程的锁一旦被自动释放后,此时再调用unlock方法来释放锁时,即会抛出IllegalMonitorStateException异常。原因也很简单,因为此时线程实际上并未持有锁,示例代码如下所示

/**
 * RedissonLock Demo : 分布式非公平可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonLockDemo {
    ...

    /**
     * 测试: 指定锁的最大持有时间
     */

    @Test
    public void testLock2() {
        final String lockName = "sellBooK";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getLock(lockName);
            try{
                // 阻塞式获取锁, 指定锁的最大持有时间为1秒
                lock.lock(1, TimeUnit.SECONDS);
                info("成功获取锁");
                // 模拟业务耗时: 10s
                Thread.sleep( 10 * 1000 );
            } catch (Exception e) {
                info("Happen Exception: " + e.getMessage());
            } finally {
                info("释放锁");
                try {
                    lock.unlock();
                } catch (IllegalMonitorStateException e) {
                    info("Happen Exception: " + e.getMessage());
                }
            }
        };

        for (int i=1; i<=5; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    ...
}

测试结果如下所示,符合预期。可以看到每隔1秒后,由于线程持锁时间到期了。锁被自动释放了,进而使得下一个任务拿到了锁。并且由于每个任务的锁都是自动释放的,故每次调用unlock方法均会抛出异常

figure 2.jpeg

非阻塞式获取锁

下面展示如果通过tryLock方法进行非阻塞式获取锁,示例代码如下所示

/**
 * RedissonLock Demo : 分布式非公平可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonLockDemo {
    ...

    /**
     * 测试: 非阻塞式获取锁
     */

    @Test
    public void testTryLock() {
        final String lockName = "sellPhone";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getLock(lockName);
            boolean flag = false;
            try{
                // 非阻塞式获取锁
                flag = lock.tryLock();
                if( flag ) {
                    info("成功获取锁");
                    // 模拟业务耗时
                    Thread.sleep(RandomUtils.nextLong(100500));
                } else {
                    info("未获取到锁\n");
                }
            } catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            } finally {
                if( flag ) {
                    info("释放锁\n");
                    lock.unlock();
                }
            }
        };

        for (int i=1; i<=5; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    ...
}

测试结果如下所示,符合预期

figure 3.jpeg

Watch Dog看门狗机制

在介绍Redisson的Watch Dog看门狗机制之前,我们先来做个测试。如果某个线程一直持有锁执行业务逻辑,则锁是否会被自动释放呢?示例代码如下所示

/**
 * RedissonLock Demo : 分布式非公平可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonLockDemo {
    ...

    /**
     * 测试: 看门狗机制
     */

    @Test
    public void testLock3() {
        final String lockName = "sellPig";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getLock(lockName);
            try{
                info("尝试获取锁");
                // 阻塞式获取锁
                lock.lock();
                info("成功获取锁");
                // 未显式指定锁的持有时间, 则看门狗会在断开连接前一直进行续期
                while (true) {
                }
            } catch (Exception e) {
                info("Happen Exception: " + e.getMessage());
            } finally {
                info("成功释放锁");
                lock.unlock();
            }
        };

        for (int i=1; i<5; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 120*1000 ); } catch (Exception e) {}
        info("---------------------- 系统下线 ----------------------");
    }

    ...
}

测试结果如下所示,该锁被持有后,一直未被释放。其他任务都被阻塞住了

figure 4.jpeg

当我们调用不含leaseTime参数版本的lock()方法时,即未显式设置最大持锁时间。则其在RedissonLock类内部会将 特殊值-1 传给leaseTime参数。然后在tryAcquireAsync方法中会通过RedissonLock类的internalLockLeaseTime字段设置一个默认的最大持锁时间。最后通过RedissonLock构造器我们不难发现 internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段。其中lockWatchdogTimeout字段的默认值为30秒。换言之即使我们调用lock方法时,未显式设置最大持锁时间。但RedissonLock内部也会通过lockWatchdogTimeout字段给该锁设置一个最大持有时间,默认值为30秒

public class RedissonLock extends RedissonBaseLock {

    protected long internalLockLeaseTime;
    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        // internalLockLeaseTime 字段的值来自于Config类的lockWatchdogTimeout字段
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    
    /**
     * 未显式设置最大持锁时间的lock方法
     */

    @Override
    public void lock() {
        try {
            // 则其会将 特殊值-1 传给leaseTime参数
            lock(-1nullfalse);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        ...
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        ...
    }
    
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

    private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 未显式设置最大持锁时间 则会 通过 internalLockLeaseTime 字段设置一个默认的最大持锁时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    // 未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
}

...

public class Config {

    // 时间: 30秒
    private long lockWatchdogTimeout = 30 * 1000;

    public long getLockWatchdogTimeout() {
        return lockWatchdogTimeout;
    }

    ...
}

那问题来了,为啥在我们刚刚的测试代码中即使持锁时间超过了30秒,锁也没有被自动释放呢?原因就在于Redisson的看门狗机制。在RedissonLock类的tryAcquireAsync方法中,未显式设置最大持锁时间 则会启动一个定时任务用于进行自动续期。即RedissonLock类的tryAcquireAsync方法中会调用scheduleExpirationRenewal()以启动一个定时任务用于进行自动续期。具体的续期逻辑在RedissonBaseLock类的renewExpiration方法中,其中自动续期定时任务的执行周期 是RedissonBaseLock类的internalLockLeaseTime字段值的1/3。然后通过renewExpirationAsync方法每次利用Lua脚本向Redis发送续期命令,具体地。每次续期时会将RedissonBaseLock类的internalLockLeaseTime字段值设置为新的最大持锁时间。同样地,RedissonBaseLock类的 internalLockLeaseTime 字段值也是来自于Config类的lockWatchdogTimeout字段,即默认为30秒

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {

 ...

    protected long internalLockLeaseTime;

 public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        // internalLockLeaseTime 参数的值来自于Config类的lockWatchdogTimeout变量
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
 
 protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            try {
             // 自动续期
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }

    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                // 通过Lua脚本向Redis发送续期命令
                RFuture future = renewExpirationAsync(threadId);

                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
            // 定时任务的执行周期 是 internalLockLeaseTime字段值的 1/3
            // 即, 定时任务的执行周期 是 lockWatchdogTimeout字段值的 1/3
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

    protected RFuture renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                // 每次续期时会将internalLockLeaseTime字段值作为新的最大持锁时间
                Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));
    }

    ...

}

至此我们应该比较清楚Redisson的看门狗机制了:

  1. Redisson的Watch Dog看门狗机制只会在未显式设置最大持锁时间才会生效。换言之,一旦调用lock方法时指定了leaseTime参数值,则该锁到期后即会自动释放。Redisson的Watch Dog看门狗不会对该锁进行自动续期
  2. 当我们未显式设置Config类的lockWatchdogTimeout字段值时,使用默认的30秒。此时如果加锁时未显式设置最大持锁时间,即Watch Dog看门狗机制会生效的场景中。该锁实际上一开始也会设置一个默认的最大持锁时间,即30秒。然后看门狗每隔10秒(30秒 * 1/3 = 10秒)会将该锁的最大持锁时间再次设置为30秒,以达到自动续期的目的。这样只要持锁线程的业务还未执行完,则该锁就一直有效、不会被自动释放。当然一旦持锁的服务实例发生宕机后,看门狗的定时任务自然也无法续期。这样锁到期后也就释放掉了,避免了死锁的发生

RedissonFairLock 分布式公平可重入互斥锁

由于RedissonLock是非公平的,故Redisson提供了一个分布式公平可重入互斥锁——RedissonFairLock。示例代码如下所示

/**
 * RedissonFairLock Demo : 分布式公平可重入互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonFairLockDemo {

    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    /**
     * 测试: 可重入性
     */

    @Test
    public void test1() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        RedissonClient redissonClient = Redisson.create( config );

        final String lockName = "sellWatch";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getFairLock(lockName);
            try{
                // 阻塞式获取锁
                lock.lock();
                info("成功获取锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));

                lock.lock();
                info("成功获取锁 #2");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            } finally {
                info("释放锁 #2");
                lock.unlock();

                info("释放锁 #1\n");
                lock.unlock();
            }
        };

        RLock tempLock = redissonClient.getFairLock(lockName);
        if( tempLock instanceof RedissonFairLock ) {
            System.out.println("锁类型: RedissonFairLock");
        }
        try{ Thread.sleep( 4*1000 ); } catch (Exception e) {}

        for (int i=1; i<=3; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
        redissonClient.shutdown();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,符合预期

figure 5.jpeg

RReadWriteLock 分布式读写锁

读写测试

RReadWriteLock是一个分布式可重入读写锁。其中读锁为可重入的共享锁、写锁为可重入的互斥锁,且读写互斥。示例代码如下所示

/**
 * RReadWriteLock Demo : 分布式可重入读写锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RReadWriteLockDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static RedissonClient redissonClient;

    @BeforeClass
    public static void init() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        redissonClient = Redisson.create( config );
    }

    /**
     * 测试: 读锁为共享锁, 读锁具有可重入性
     */

    @Test
    public void test1Read() {
        System.out.println("\n---------------------- Test 1 : Read ----------------------");
        String lockName = "sellCat";
        for(int i=1; i<=3; i++) {
            String taskName = "读任务#"+i;
            Runnable task = new ReadTask(taskName, redissonClient, lockName);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 写锁为互斥锁, 写锁具有可重入性
     */

    @Test
    public void test2Write() {
        System.out.println("\n---------------------- Test 2 : Write ----------------------");
        String lockName = "sellDog";
        for(int i=1; i<=3; i++) {
            String taskName = "写任务#"+i;
            Runnable task = new WriteTask(taskName, redissonClient, lockName);
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    /**
     * 测试: 读写互斥
     */

    @Test
    public void test3ReadWrite() {
        System.out.println("\n---------------------- Test 3 : Read Write ----------------------");
        String lockName = "sellLion";
        for(int i=1; i<=5; i++) {
            Runnable task = null;
            Boolean isReadTask = RandomUtils.nextBoolean();
            if( isReadTask ) {
                task = new ReadTask( "读任务#"+i, redissonClient, lockName);
            } else {
                task = new WriteTask( "写任务#"+i, redissonClient, lockName);
            }
            threadPool.execute( task );
        }
        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 10*1000 ); } catch (Exception e) {}
    }

    @AfterClass
    public static void close() {
        redissonClient.shutdown();
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    /**
     * 读任务
     */

    private static class ReadTask implements Runnable {
        private String taskName;

        private RedissonReadLock readLock;

        public ReadTask(String taskName, RedissonClient redissonClient, String lockName) {
            this.taskName = taskName;
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
            this.readLock = (RedissonReadLock) readWriteLock.readLock();
        }

        @Override
        public void run() {
            try{
                readLock.lock();
                info(taskName + ": 成功获取读锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));

                readLock.lock();
                info(taskName + ": 成功获取读锁 #2");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放读锁 #2");
                readLock.unlock();

                info(taskName + ": 释放读锁 #1");
                readLock.unlock();
            }
        }
    }

    /**
     * 写任务
     */

    private static class WriteTask implements Runnable {
        private String taskName;

        private RedissonWriteLock writeLock;

        public WriteTask(String taskName, RedissonClient redissonClient, String lockName) {
            this.taskName = taskName;
            RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
            this.writeLock = (RedissonWriteLock) readWriteLock.writeLock();
        }

        @Override
        public void run() {
            try{
                writeLock.lock();
                info(taskName + ": 成功获取写锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));

                writeLock.lock();
                info(taskName + ": 成功获取写锁 #2");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println( taskName + ": Happen Exception: " + e.getMessage());
            } finally {
                info(taskName + ": 释放写锁 #2");
                writeLock.unlock();

                info(taskName + ": 释放写锁 #1\n");
                writeLock.unlock();
            }
        }
    }
}

读锁测试结果如下所示,符合预期

figure 6.jpeg

写锁测试结果如下所示,符合预期

figure 7.jpeg

读写测试结果如下所示,符合预期

figure 8.jpeg

锁升级、锁降级


所谓锁升级指的是读锁升级为写锁。当一个线程先获取到读锁再去申请写锁,显然其是不支持的。理由也很简单,读锁是可以多个服务实例同时持有的。若其中一个服务实例此锁线程能够进行锁升级,成功获得写锁。显然与我们之前的所说的读写互斥相违背。因为其在获得写锁的同时,其他服务实例依然持有读锁;反之,其是支持锁降级的,即写锁降级为读锁。当一个服务实例的线程在获得写锁后,该线程依然可以获得读锁。这个时候当其释放写锁,则将只持有读锁,即完成了锁降级过程。锁降级的使用价值也很大,其一方面保证了安全,读锁在写锁释放前获取;另一方面保证了高效,因为读锁是共享的。

锁升级示例代码如下所示

/**
 * RReadWriteLock Demo : 分布式可重入读写锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RReadWriteLockDemo {
    
    ...

    /**
     * 测试: 锁升级
     */

    @Test
    public void test4Read2Write() {
        System.out.println("---------------------- Test 4 : Read -> Write ----------------------\n");
        String lockName = "sellTiger";
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
        RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
        RedissonWriteLock  writeLock = (RedissonWriteLock) readWriteLock.writeLock();

        try {
            readLock.lock();
            info("成功获取读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.lock();
            info("成功获取写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.unlock();
            info("成功释放读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.unlock();
            info("成功释放写锁");
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }
        System.out.println("---------------------- 系统下线 ----------------------");
    }    
}

测试结果如下所示,在持有读锁的情况下,继续尝试获取写锁会被一直阻塞

锁降级示例代码如下所示

/**
 * RReadWriteLock Demo : 分布式可重入读写锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RReadWriteLockDemo {
    
    ...

    /**
     * 测试: 锁降级
     */

    @Test
    public void test5Write2Read() {
        System.out.println("---------------------- Test 2 : Write -> Read ----------------------");
        String lockName = "sellChicken";
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(lockName);
        RedissonReadLock readLock = (RedissonReadLock) readWriteLock.readLock();
        RedissonWriteLock  writeLock = (RedissonWriteLock) readWriteLock.writeLock();

        try {
            writeLock.lock();
            info("成功获取写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.lock();
            info("成功获取读锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            writeLock.unlock();
            info("成功释放写锁");
            // 模拟业务耗时
            Thread.sleep(RandomUtils.nextLong(100500));

            readLock.unlock();
            info("成功释放读锁");
        } catch (Exception e) {
            System.out.println("Happen Exception: " + e.getMessage());
        }
        System.out.println("---------------------- 系统下线 ----------------------");
    }
}

测试结果如下所示,符合预期

figure 9.jpeg

RedissonSpinLock 分布式非公平可重入自旋互斥锁

RedissonSpinLock则是一个分布式非公平可重入自旋互斥锁。示例代码如下所示

/**
 * RedissonSpinLock Demo : 分布式非公平可重入自旋互斥锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonSpinLockDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    /**
     * 测试: 可重入性、互斥锁
     */

    @Test
    public void test1() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        RedissonClient redissonClient = Redisson.create( config );

        final String lockName = "sellKeyword";
        Runnable task = () -> {
            // 设置分布式锁
            RLock lock = redissonClient.getSpinLock(lockName);
            try{
                // 阻塞式获取锁
                lock.lock();
                info("成功获取锁 #1");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));

                lock.lock();
                info("成功获取锁 #2");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextLong(100500));
            } catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            } finally {
                info("释放锁 #2");
                lock.unlock();

                info("释放锁 #1\n");
                lock.unlock();
            }
        };

        RLock tempLock = redissonClient.getSpinLock(lockName);
        if( tempLock instanceof RedissonSpinLock) {
            System.out.println("锁类型: RedissonSpinLock");
        }
        try{ Thread.sleep( 2*1000 ); } catch (Exception e) {}

        for (int i=1; i<=3; i++) {
            threadPool.execute( task );
        }

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
        redissonClient.shutdown();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }
}

测试结果如下所示,符合预期

figure 10.jpeg

RedissonCountDownLatch 分布式闩锁

RedissonCountDownLatch是一个分布式的CountDownLatch闩锁。比如我们的业务系统会依赖很多其他基础服务,这样在业务系统启动过程中,需要等待其他基础服务全部启动完毕。示例代码如下所示

/**
 * RedissonCountDownLatch Demo : 分布式闩锁
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonCountDownLatchDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    private static RedissonClient redissonClient;

    @BeforeClass
    public static void init() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        redissonClient = Redisson.create( config );
    }

    @Test
    public void test1() throws InterruptedException {
        final String countDownLatchName = "systemInit";
        int count = 5;

        for (int i=1; i<=count; i++) {
            String serviceName = "基础服务 #"+i;
            BasicService basicService = new BasicService(serviceName, redissonClient, countDownLatchName, count);
            threadPool.execute( basicService );
        }

        RedissonCountDownLatch countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
        countDownLatch.trySetCount(count);
        // 阻塞等待基础服务全部启动完成
        countDownLatch.await();

        info("系统初始化已完成, 业务系统启动 ...");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    /**
     * 基础服务
     */

    private static class BasicService implements Runnable {
        private String serviceName;

        private RedissonCountDownLatch countDownLatch;

        public BasicService(String serviceName, RedissonClient redissonClient, String countDownLatchName, Integer count) {
            this.serviceName = serviceName;
            this.countDownLatch = (RedissonCountDownLatch) redissonClient.getCountDownLatch(countDownLatchName);
            this.countDownLatch.trySetCount( count );
        }

        @Override
        public void run() {
            try{
                info(serviceName + ": 启动中");
                // 模拟 基础服务启动 耗时
                Thread.sleep( RandomUtils.nextLong(15) * 1000 );
                countDownLatch.countDown();
                info(serviceName + ": 启动完成");
            } catch (Exception e) {
                System.out.println( serviceName + ": Happen Exception: " + e.getMessage());
            }
        }
    }
}

测试结果如下所示,符合预期

figure 11.jpeg

RedissonSemaphore 分布式信号量

RedissonSemaphore是一个分布式的信号量,示例代码如下所示

/**
 * RedissonSemaphore Demo : 分布式信号量
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonSemaphoreDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    @Test
    public void test1() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        RedissonClient redissonClient = Redisson.create( config );

        final String lockName = "sellAnimal";
        // 系统最大并发处理量
        int maxLimit = 3;
        IntStream.rangeClosed(1,8)
            .mapToObj( num -> new UserReq("用户#"+num, redissonClient, lockName, maxLimit) )
            .forEach( threadPool::execute );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
        redissonClient.shutdown();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    private static class UserReq implements Runnable {
        private String name;

        private RedissonSemaphore semaphore;

        public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
            this.name = name;
            this.semaphore = (RedissonSemaphore) redissonClient.getSemaphore(lockName);
            // 设置信号量的许可数
            semaphore.trySetPermits( maxLimit );
        }

        @Override
        public void run() {
            try {
                // 模拟用户不定时发起请求
                Thread.sleep(RandomUtils.nextLong(5002000));
                info( name + ": 发起请求" );

                // 阻塞等待,直到获取许可
                semaphore.acquire();
                info(name + ": 系统开始处理请求");
                // 模拟业务耗时
                Thread.sleep(RandomUtils.nextInt(520)*1000);

                // 用户请求处理完毕,释放许可
                semaphore.release();
                info(name + ": 系统处理完毕");
            }catch (Exception e) {
                System.out.println("Happen Exception: " + e.getMessage());
            }
        }
    }
}

测试结果如下所示,符合预期

figure 12.jpeg

RedissonPermitExpirableSemaphore 分布式支持有效期的信号量

相比较于RedissonSemaphore而言,RedissonPermitExpirableSemaphore在获取许可的acquire方法中,增加了一个支持leaseTime参数的重载版本。以实现指定许可的最大持有时间。一旦业务持许可时间超过leaseTime参数值,则其持有的许可会被自动释放。但需要注意的是某个线程的许可一旦被自动释放后,此时再调用release方法来释放许可时,即会抛出异常。原因也很简单,因为此时线程实际上并未持有许可,示例代码如下所示。示例代码如下所示

/**
 * RedissonPermitExpirableSemaphore Demo : 分布式支持有效期的信号量
 * @author Aaron Zhu
 * @date 2022-04-04
 */

public class RedissonPermitExpirableSemaphoreDemo {
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

    private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

    @Test
    public void test1() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://127.0.0.1:6379")
            .setPassword("123456");
        RedissonClient redissonClient = Redisson.create( config );

        final String lockName = "sellMilk";
        // 系统最大并发处理量
        int maxLimit = 3;
        IntStream.rangeClosed(1,8)
            .mapToObj( num -> new UserReq("用户 #"+num, redissonClient, lockName, maxLimit) )
            .forEach( threadPool::execute );

        // 主线程等待所有任务执行完毕
        try{ Thread.sleep( 40*1000 ); } catch (Exception e) {}
        redissonClient.shutdown();
        System.out.println("---------------------- 系统下线 ----------------------");
    }

    /**
     * 打印信息
     * @param msg
     */

    private static void info(String msg) {
        String time = formatter.format(LocalTime.now());
        String thread = Thread.currentThread().getName();
        String log = "["+time+"] "" <"+ thread +"> " + msg;
        System.out.println(log);
    }

    private static class UserReq implements Runnable {
        private String name;

        private RedissonPermitExpirableSemaphore semaphore;

        public UserReq(String name, RedissonClient redissonClient, String lockName, Integer maxLimit) {
            this.name = name;
            this.semaphore = (RedissonPermitExpirableSemaphore) redissonClient.getPermitExpirableSemaphore(lockName);
            // 设置信号量的许可数
            semaphore.trySetPermits( maxLimit );
        }

        @Override
        public void run() {
            try {
                // 模拟用户不定时发起请求
                if( !name.equals("用户 #1") ) {
                    Thread.sleep( RandomUtils.nextLong(10002000) );
                }
                info( name + ": 发起请求" );

                // 阻塞等待直到获取许可, 指定信号量的最大持有时间为2秒
                String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
                info(name + ": 系统开始处理请求");

                // 模拟业务耗时
                if( name.equals("用户 #1") ) {
                    Thread.sleep( 5 * 1000 );
                } else {
                    Thread.sleep(RandomUtils.nextInt(5001000));
                }

                // 用户请求处理完毕,释放许可
                semaphore.release(permitId);
                info(name + ": 系统处理完毕");
            }catch (Exception e) {
                info( name + ": Happen Exception: " + e.getCause().getMessage());
            }
        }
    }
}

测试结果如下所示,符合预期

figure 13.jpeg


浏览 54
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报