把学习当成一种习惯
选择往往大于努力,越努力越幸运

前言

  • 最近大部分学习时间都花费在Redis上,学习方式主要是阅读《Redis深度历险 核心原理与应用实践》(钱文品)、《Redis设计与实现》(黄健宏) 这两本书籍,本人是强烈推荐这两本书籍的;除了书籍,当然还会结合一些高质量的文章来学习.
  • 本人是先看《Redis设计与实现》,然后《Redis深度历险 核心原理与应用实践》,一些章节会通过这两本书结合起来去重复阅读.
  • 分布式锁的实现方式有几种,例如 : MySQL数据库、Zookeeper、Redis 都可以实现分布式锁,本文只学习Redis实现的分布式锁.

Redis实现的分布式锁

分布式锁的由来

  • 在单体架构下,由于是同一个进程(一个JVM实例),所以可以使用Java提供的锁机制如synchronized、Lock、并发容器等来实现共享资源的安全访问;
  • 在分布式架构下,由于一个应用会部署多个进程(多个JVM实例),进程与进程之间的内存空间不能互相访问,那么Java提供的锁机制如synchronized、Lock、并发容器等都失效了;当多个进程同时并发的修改共享资源时,就需要引用分布式锁来解决这种多进程并发问题.

分布式锁的实现

  Redis实现的分布式锁本质上就是只能一个客户端在Redis服务器中占用这个"锁",当其他客户端访问Redis服务器,发现该"锁"被占有了,那么将重试或者放弃,这样就满足了锁的互斥性;当持有锁的客户端执行完共享资源后,需要释放这个"锁",防止死锁的出现;
  Redis提供了SETNX(set if not exists)指令来执行获取锁,表示如果 key 不存在,才会设置它的值,否则什么也不做.


127.0.0.1:6379> setnx lock 1
(integer) 1  // 客户端A获取锁成功

127.0.0.1:6379> setnx lock 1
(integer) 0
127.0.0.1:6379> // 客户端B获取锁失败

当成功持有锁的客户端执行完共享资源后,必须释放锁,而Redis提供DEL指令来释放锁


127.0.0.1:6379> del lock
(integer) 1   // 成功释放锁

这里存在问题,持有锁的客户端执行过程中因为异常等问题导致没有释放锁,那么就会进入死锁状态 :

解决死锁的方式很简单,就是给个过期时间,这样可以保证即使出现异常问题没有释放锁而自动过期去释放锁.


127.0.0.1:6379> setnx lock 1
(integer) 1 // 成功获取锁

127.0.0.1:6379> expire lock 5
(integer) 1   // 设置锁的过期时间为5s

  加了过期时间还是有问题,因为setnx、expire两条指令并非原子指令,即可能存在执行完setnx后,expire没来得及执行或者执行失败,这样就有潜在过期时间设置失败的风险,依旧会发生死锁问题;
  如果使用事务来解决,也是不行,因为事务没有if-else分支逻辑,事务的特点是一气呵成,要么全部执行要么全部不执行,即如果setnx没获取到锁,是不能去执行expire的.
  Redis2.8版本中,Redis作者就加入了set指令参数的扩展来保证setnx、expire指令可以一起执行 :


// 一条命令保证原子性执行
127.0.0.1:6379> set lock 1 ex 5 nx
OK 

  解决了setnx、expire两条指令一起执行的问题,还是存在问题 : 客户端1获取到锁,设置锁的过期时间为5s,但是由于获取锁和释放锁之间的逻辑执行得太长(超过5s),这样会导致锁自动到期,客户端1还在执行,而其他客户端可以乘机获取到锁了 :

  • 客户端1 : 获取锁成功,执行的共享资源
  • 客户端1 : 超过了锁的过期时间,锁被自动释放
  • 客户端2 : 获取锁成功,执行的共享资源
  • 客户端1 : 操作共享资源完成,释放锁(但释放的是客户端 2 的锁)

又衍生了新的问题 :

  • 释放了其他客户端的锁 : 客户端释放锁时,需要判断是否为当前客户端持有的锁再进行释放;
  • 锁自动过期 : 可能导致多个客户端同时获取到锁;

释放了其他客户端的锁

  • 这个问题很简单,使用UUID或者客户端ID作为唯一标识保存在value中.
  • PS : 这里假设 5s 操作共享时间完全足够,先不考虑锁自动过期的问题.

// 获取锁
127.0.0.1:6379> set lock $clientId ex 5 nx
OK

  • 这样就可以根据value来释放锁,防止释放了别人的锁, 伪代码 : if get lock == $UUID ==> del lock .
  • PS : 由于get、del是两个指令,所以需要Lua脚本来执行.

// 查看锁
127.0.0.1:6379> get lock
$clientId

// 如果为当前客户端ID,则执行删除
127.0.0.1:6379> del lock
(integer) 1

// Lua脚本
if ARGV[1] == redis.call('GET', KEYS[1]) 
then 
  return redis.call('DEL', KEYS[1]) 
else 
  return 0 
end

锁自动过期 --- watch dog自动延期机制

  对于锁自动过期这个问题,最粗暴的办法就是把过期时间设置长一点,但是这个办法治标不治本,还是会存在问题.
  有一种叫 看门狗 的方案 : 加锁的时候,开启一条 后台线程 , 然后定时去给这个锁重新初始化过期时间.
  开源的Redisson框架就有实现这种 watch dog自动延期机制,默认30s自动过期,后台线程 会每隔10秒检查一下,如果该守护线程的客户端还持有锁key,那么就会不断的延长锁key的生存时间.

极端情况下还是有问题

有了watch dog自动延期机制,在极端情况下还是会有问题 :

  • 客户端1 : 获取到锁,操作共享资源,后台线程自动延期;
  • 客户端1 : 与锁服务器发生失联(例如 GC、网络问题等),后台线程没能自动延期,导致锁过期
  • 客户端2 : 成功获取到锁,此时与客户端1同时持有了锁;
  • 锁过期时间设置较长可以防止这种极端情况,开源的Redisson框架默认锁过期时间为30s;

可重入

  到目前为止,从设置过期时间防止死锁、设置value值为UUID防止误解锁,到 watch dog自动延期机制 防止 锁过期了,但是共享资源还在操作 ,我们可以得到如以下数据结构的锁 :


lock : $UUID

那如果客户端1已经持有了这把锁了,结果可重入的加锁会怎么样呢?其实很简单,改变一下数据结构 :


# 重入1次
lock : {
    $UUID : 1
}

# 重入2次
lock : {
    $UUID : 2
}

  只要是同一个客户端获取锁\释放锁,那么只需要进行加减就行了;开源的Redisson框架就是使用的这种数据结构来实现的可重入机制.

释放锁

释放锁就相对简单了,由于是可重入锁,所以必须减少为0时才执行del命令,从redis里删除这个key.

小结

  • 防止死锁 : 设置过期时间
  • 误解锁 : value值设置为UUID
  • 锁过期了,但是共享资源还在操作 : watch dog自动延期机制
  • 可重入 : 使用hash结构,把UUID和重入次数映射起来
  • 释放锁 : 可重入数减为0时执行del命令

PS : 以上是单个Redis实例的情况下的分析场景,已经实现了一个几乎完美的分布式锁,而集群环境下还是会存在问题.

基于Redisson实现分布式锁

项目结构如下图 :

单例集成Redisson

Maven集成Redisson


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.10.6</version>
        </dependency>

application.yml :


server:
  port: 8999
spring:
  # redis
  redis:
    # redis服务器ip
    host: 
    port: 6379
    password: 密码填写自己的
    jedis:
      pool:
        # 连接池最大连接数(使用负值表示没有限制)
        max-active: 100
        # 连接池中的最小空闲连接
        max-idle: 10
        # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1
      # 连接超时时间(毫秒)
      timeout: 5000
      #默认是索引为0的数据库
      database: 0

RedissonConfig.java : 创建RedissonClient操作redis对象、RedisLocker操作redis服务接口


@Component
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password:}")
    private String password;


    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        //单节点
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        if (StringUtils.isEmpty(password)) {
            config.useSingleServer().setPassword(null);
        } else {
            config.useSingleServer().setPassword(password);
        }
        //添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
        // 集群模式配置 setScanInterval()扫描间隔时间,单位是毫秒, //可以用"rediss://"来启用SSL连接
// config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002");
        return Redisson.create(config);
    }

    /**
     * 装配locker类,并将实例注入到RedissLockUtil中
     */
    @Bean
    public RedisLocker redisLocker(RedissonClient redissonClient) {
        RedisLocker redisLocker = new RedisLockerServer();
        redisLocker.setRedissonClient(redissonClient);
        RedisLockUtil.setLocker(redisLocker);
        return redisLocker;
    }

}

RedisLocker接口 : 操作redis服务接口


public interface RedisLocker {
    RLock lock(String lockKey);

    RLock lock(String lockKey, int timeout);

    RLock lock(String lockKey, TimeUnit unit, int timeout);

    boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime);

    void unlock(String lockKey);

    void unlock(RLock lock);

    RCountDownLatch getCountDownLatch(String name);

    RSemaphore getSemaphore(String name);

    void setRedissonClient(RedissonClient redissonClient);
}

RedisLockerServer 实现接口 :


@Configurable
public class RedisLockerServer implements RedisLocker {


    @Autowired
    private RedissonClient redissonClient;


    @Override
    public RLock lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
        return lock;
    }

    @Override
    public RLock lock(String lockKey, int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, TimeUnit.SECONDS);
        return lock;

    }

    @Override
    public RLock lock(String lockKey, TimeUnit unit, int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, unit);
        return lock;
    }

    @Override
    public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
        RLock lock = redissonClient.getLock(lockKey);
        try {
            return lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            return false;
        }
    }

    @Override
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.unlock();
    }

    @Override
    public void unlock(RLock lock) {
        lock.unlock();
    }

    @Override
    public RCountDownLatch getCountDownLatch(String name) {
        return redissonClient.getCountDownLatch(name);
    }

    @Override
    public RSemaphore getSemaphore(String name) {
        return redissonClient.getSemaphore(name);
    }

    @Override
    public void setRedissonClient(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }

}

RedisLockUtil.java : 操作redis工具类


public class RedisLockUtil {

    private static RedisLocker redisLocker;

    public static void setLocker(RedisLocker locker) {
        redisLocker = locker;
    }

    /**
     * 加锁
     * @param lockKey
     * @return
     */
    public static RLock lock(String lockKey) {
        return redisLocker.lock(lockKey);
    }

    /**
     * 释放锁
     * @param lockKey
     */
    public static void unlock(String lockKey) {
        redisLocker.unlock(lockKey);
    }

    /**
     * 释放锁
     * @param lock
     */
    public static void unlock(RLock lock) {
        redisLocker.unlock(lock);
    }

    /**
     * 带超时的锁
     * @param lockKey
     * @param timeout 超时时间   单位:秒
     */
    public static RLock lock(String lockKey, int timeout) {
        return redisLocker.lock(lockKey, timeout);
    }

    /**
     * 带超时的锁
     * @param lockKey
     * @param unit 时间单位
     * @param timeout 超时时间
     */
    public static RLock lock(String lockKey, int timeout, TimeUnit unit ) {
        return redisLocker.lock(lockKey, unit, timeout);
    }

    /**
     * 尝试获取锁
     * @param lockKey
     * @param waitTime 最多等待时间
     * @param leaseTime 上锁后自动释放锁时间
     * @return
     */
    public static boolean tryLock(String lockKey, int waitTime, int leaseTime) {
        return redisLocker.tryLock(lockKey, TimeUnit.SECONDS, waitTime, leaseTime);
    }

    /**
     * 尝试获取锁
     * @param lockKey
     * @param unit 时间单位
     * @param waitTime 最多等待时间
     * @param leaseTime 上锁后自动释放锁时间
     * @return
     */
    public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
        return redisLocker.tryLock(lockKey, unit, waitTime, leaseTime);
    }

    /**
     * 获取计数器
     *
     * @param name
     * @return
     */
    public static RCountDownLatch getCountDownLatch(String name){
        return redisLocker.getCountDownLatch(name);
    }

    /**
     * 获取信号量
     *
     * @param name
     * @return
     */
    public static RSemaphore getSemaphore(String name){
        return redisLocker.getSemaphore(name);
    }

}

单例环境下集成Redisson就完成了.

实战

我们假设 : 一个服务分别部署在两台服务器,分别是 服务A,端口号为8999、服务B,端口号为9000 ,我们现在模拟这么个简单需求 : 服务A、B需要同时向count.txt写入统计数据,这里发生了A、B(多个进程)并发修改同一份文件,为了避免操作乱序导致数据错误,此时,我们就需要引入分布式锁来解决这个问题了.

RedissonController.java :


@RestController
@RequestMapping("/redisson")
public class RedissonController {

    private static final String ENCODING = "UTF-8";
    
    private static final File FILE = new File("D:\\count.txt");
    
    // 锁
    private static final String LOCK_KEY = "lock_key";

    /**
     * 访问count.txt、并进行统计
     */
    @PutMapping("/count")
    public void count() {
        try {
            // 获取锁
            RedisLockUtil.lock(LOCK_KEY);
            Long count = Long.parseLong(FileUtils.readLines(FILE, ENCODING).get(0));
            ++count;
            FileUtils.writeStringToFile(FILE, String.valueOf(count), ENCODING);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            RedisLockUtil.unlock(LOCK_KEY);
        }
    }

    /**
     * 获取文件统计的个数
     *
     * @return 文件统计的个数
     * @throws IOException 文件IO异常
     */
    @GetMapping("/getCount")
    public String getCount() throws IOException {
        List<String> readLines = new ArrayList<>(1);
        while (readLines.size() == 0) {
            readLines = FileUtils.readLines(FILE, ENCODING);
        }
        long count = Long.parseLong(readLines.get(0));
        return " Count : " + count;

    }


    /**
     * 重置为0
     *
     * @return 重置信息
     * @throws IOException 文件IO异常
     */
    @GetMapping("/reset")
    public String reset() throws IOException {
        if (!FILE.exists()) {
            FILE.createNewFile();
        }
        FileUtils.writeStringToFile(FILE, "0", ENCODING);
        return "已经完成重置, Count : " + FileUtils.readLines(FILE, ENCODING).get(0);
    }

}

分别启动两个服务,端口号为 8999、9000 :

调用 /redisson/reset 初始化、重置 :

调用 /redisson/getCount 查看count.txt 文件记录数 :

启动Jmeter,分别向 /redisson/count 1秒内压测 2000 QPS :

调用 /redisson/getCount 查看count.txt 文件记录数 : 结果符合预期

测试看门狗策略 : 在 RedissonController.java 新增接口 :


/**
     * 测试看门狗策略是否能保证共享资源的安全操作
     */
    @PutMapping("/testWatchDog")
    public void testWatchDog() {
        try {
            // 由于默认过期时间是30s,为了方便测试,我们这边设置为3s
            RedisLockUtil.lock(LOCK_KEY,3);
            // 模仿执行业务需要 4s ,超过了锁过期
            Thread.sleep(4000);
            Long count = Long.parseLong(FileUtils.readLines(FILE, ENCODING).get(0));
            ++count;
            FileUtils.writeStringToFile(FILE, String.valueOf(count), ENCODING);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            RedisLockUtil.unlock(LOCK_KEY);
        }
    }

Jmeter 分别压测 10 QPS , 调用 /redisson/getCount 查看是否等于20 :

总结

Redis分布式锁的不足

  在集群环境下,它不是绝对安全的;在Sentinel集群中,当主节点挂掉时,从节点取而代之,但是客户端并不知道 : 客户端1在主节点成功获取锁,操作共享资源,但这把锁还没来得及同步到从节点,主节点突然挂掉了,然后从节点变为主节点,新的节点并没有这个锁,其他客户端过来请求锁的时候,可以获取到,这就导致了同一把锁被两个客户端同时持有.

  作者引入了Redlock来解决集群下的问题,使用Redlock需要提供多个Redis实例,并且没有主从关系,需要更多的Redis实例,性能肯定下降了,而且很多地方需要考虑,例如运维上时钟跳跃问题,所以使用前需再三斟酌.
  PS : 本文不对Redlock详细叙述,想了解更多的可以到 怎么实现Redis分布式锁 的@Kaito 的评论.

其他总结

  • 一个分布式锁,在极端情况下,不一定是安全的,所以对业务比较敏感的一定要注意这个问题.
  • 使用分布式锁,在上层完成互斥目的,虽然极端情况下锁会失效,但它可以最大程度把并发请求阻挡在最上层,减轻操作资源层的压力;
  • 对于要求数据绝对正确的业务,在资源层一定要做好兜底,例如使用mysql实现的乐观锁 :
    • 客户端1使用Redlock拿到锁
    • 先在mysql表中更新当前的UUID
    • 根据where条件是否为当前UUID来操作共享资源
    • UPDATE table T set val = $new_value where id = $id AND uuid = $current_uuid
  • 如果操作的共享资源的服务器不是MySQL,上面的方案就无能为力了,这对操作的共享资源的服务器提出了更高的要求.

相关参考资料


目录