
前言
- 最近大部分学习时间都花费在Redis上,学习方式主要是阅读《Redis深度历险 核心原理与应用实践》(钱文品)、《Redis设计与实现》(黄健宏) 这两本书籍,本人是强烈推荐这两本书籍的;除了书籍,当然还会结合一些高质量的文章来学习.
- 本人是先看《Redis设计与实现》,然后《Redis深度历险 核心原理与应用实践》,一些章节会通过这两本书结合起来去重复阅读.
- 分布式锁的实现方式有几种,例如 : MySQL数据库、Zookeeper、Redis 都可以实现分布式锁,本文只学习Redis实现的分布式锁.
Redis实现的分布式锁
分布式锁的由来
- 在单体架构下,由于是同一个进程(一个JVM实例),所以可以使用Java提供的锁机制如synchronized、Lock、并发容器等来实现共享资源的安全访问;
- 在分布式架构下,由于一个应用会部署多个进程(多个JVM实例),进程与进程之间的内存空间不能互相访问,那么Java提供的锁机制如synchronized、Lock、并发容器等都失效了;当多个进程同时并发的修改共享资源时,就需要引用分布式锁来解决这种多进程并发问题.

分布式锁的实现
Redis实现的分布式锁本质上就是只能一个客户端在Redis服务器中占用这个"锁",当其他客户端访问Redis服务器,发现该"锁"被占有了,那么将重试或者放弃,这样就满足了锁的互斥性;当持有锁的客户端执行完共享资源后,需要释放这个"锁",防止死锁的出现;
Redis提供了SETNX(set if not exists)指令来执行获取锁,表示如果 key 不存在,才会设置它的值,否则什么也不做.
127.0.0.1:6379> setnx lock 1
(integer) 1 // 客户端A获取锁成功
127.0.0.1:6379> setnx lock 1
(integer) 0
127.0.0.1:6379> // 客户端B获取锁失败
当成功持有锁的客户端执行完共享资源后,必须释放锁,而Redis提供DEL指令来释放锁
127.0.0.1:6379> del lock
(integer) 1 // 成功释放锁
这里存在问题,持有锁的客户端执行过程中因为异常等问题导致没有释放锁,那么就会进入死锁状态 :

解决死锁的方式很简单,就是给个过期时间,这样可以保证即使出现异常问题没有释放锁而自动过期去释放锁.
127.0.0.1:6379> setnx lock 1
(integer) 1 // 成功获取锁
127.0.0.1:6379> expire lock 5
(integer) 1 // 设置锁的过期时间为5s
加了过期时间还是有问题,因为setnx、expire两条指令并非原子指令,即可能存在执行完setnx后,expire没来得及执行或者执行失败,这样就有潜在过期时间设置失败的风险,依旧会发生死锁问题;
如果使用事务来解决,也是不行,因为事务没有if-else分支逻辑,事务的特点是一气呵成,要么全部执行要么全部不执行,即如果setnx没获取到锁,是不能去执行expire的.
Redis2.8版本中,Redis作者就加入了set指令参数的扩展来保证setnx、expire指令可以一起执行 :
// 一条命令保证原子性执行
127.0.0.1:6379> set lock 1 ex 5 nx
OK
解决了setnx、expire两条指令一起执行的问题,还是存在问题 : 客户端1获取到锁,设置锁的过期时间为5s,但是由于获取锁和释放锁之间的逻辑执行得太长(超过5s),这样会导致锁自动到期,客户端1还在执行,而其他客户端可以乘机获取到锁了 :
- 客户端1 : 获取锁成功,执行的共享资源
- 客户端1 : 超过了锁的过期时间,锁被自动释放
- 客户端2 : 获取锁成功,执行的共享资源
- 客户端1 : 操作共享资源完成,释放锁(但释放的是客户端 2 的锁)

又衍生了新的问题 :
- 释放了其他客户端的锁 : 客户端释放锁时,需要判断是否为当前客户端持有的锁再进行释放;
- 锁自动过期 : 可能导致多个客户端同时获取到锁;
释放了其他客户端的锁
- 这个问题很简单,使用UUID或者客户端ID作为唯一标识保存在value中.
- PS : 这里假设 5s 操作共享时间完全足够,先不考虑锁自动过期的问题.
// 获取锁
127.0.0.1:6379> set lock $clientId ex 5 nx
OK
- 这样就可以根据value来释放锁,防止释放了别人的锁, 伪代码 : if get lock == $UUID ==> del lock .
- PS : 由于get、del是两个指令,所以需要Lua脚本来执行.
// 查看锁
127.0.0.1:6379> get lock
$clientId
// 如果为当前客户端ID,则执行删除
127.0.0.1:6379> del lock
(integer) 1
// Lua脚本
if ARGV[1] == redis.call('GET', KEYS[1])
then
return redis.call('DEL', KEYS[1])
else
return 0
end
锁自动过期 --- watch dog自动延期机制
对于锁自动过期这个问题,最粗暴的办法就是把过期时间设置长一点,但是这个办法治标不治本,还是会存在问题.
有一种叫 看门狗 的方案 : 加锁的时候,开启一条 后台线程 , 然后定时去给这个锁重新初始化过期时间.
开源的Redisson框架就有实现这种 watch dog自动延期机制,默认30s自动过期,后台线程 会每隔10秒检查一下,如果该守护线程的客户端还持有锁key,那么就会不断的延长锁key的生存时间.
极端情况下还是有问题
有了watch dog自动延期机制,在极端情况下还是会有问题 :
- 客户端1 : 获取到锁,操作共享资源,后台线程自动延期;
- 客户端1 : 与锁服务器发生失联(例如 GC、网络问题等),后台线程没能自动延期,导致锁过期
- 客户端2 : 成功获取到锁,此时与客户端1同时持有了锁;
- 锁过期时间设置较长可以防止这种极端情况,开源的Redisson框架默认锁过期时间为30s;
可重入
到目前为止,从设置过期时间防止死锁、设置value值为UUID防止误解锁,到 watch dog自动延期机制 防止 锁过期了,但是共享资源还在操作 ,我们可以得到如以下数据结构的锁 :
lock : $UUID
那如果客户端1已经持有了这把锁了,结果可重入的加锁会怎么样呢?其实很简单,改变一下数据结构 :
# 重入1次
lock : {
$UUID : 1
}
# 重入2次
lock : {
$UUID : 2
}
只要是同一个客户端获取锁\释放锁,那么只需要进行加减就行了;开源的Redisson框架就是使用的这种数据结构来实现的可重入机制.
释放锁
释放锁就相对简单了,由于是可重入锁,所以必须减少为0时才执行del命令,从redis里删除这个key.
小结
- 防止死锁 : 设置过期时间
- 误解锁 : value值设置为UUID
- 锁过期了,但是共享资源还在操作 : watch dog自动延期机制
- 可重入 : 使用hash结构,把UUID和重入次数映射起来
- 释放锁 : 可重入数减为0时执行del命令
PS : 以上是单个Redis实例的情况下的分析场景,已经实现了一个几乎完美的分布式锁,而集群环境下还是会存在问题.
基于Redisson实现分布式锁
项目结构如下图 :

单例集成Redisson
Maven集成Redisson
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.10.6</version>
</dependency>
application.yml :
server:
port: 8999
spring:
# redis
redis:
# redis服务器ip
host:
port: 6379
password: 密码填写自己的
jedis:
pool:
# 连接池最大连接数(使用负值表示没有限制)
max-active: 100
# 连接池中的最小空闲连接
max-idle: 10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1
# 连接超时时间(毫秒)
timeout: 5000
#默认是索引为0的数据库
database: 0
RedissonConfig.java : 创建RedissonClient操作redis对象、RedisLocker操作redis服务接口
@Component
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password:}")
private String password;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
//单节点
config.useSingleServer().setAddress("redis://" + host + ":" + port);
if (StringUtils.isEmpty(password)) {
config.useSingleServer().setPassword(null);
} else {
config.useSingleServer().setPassword(password);
}
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
// 集群模式配置 setScanInterval()扫描间隔时间,单位是毫秒, //可以用"rediss://"来启用SSL连接
// config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002");
return Redisson.create(config);
}
/**
* 装配locker类,并将实例注入到RedissLockUtil中
*/
@Bean
public RedisLocker redisLocker(RedissonClient redissonClient) {
RedisLocker redisLocker = new RedisLockerServer();
redisLocker.setRedissonClient(redissonClient);
RedisLockUtil.setLocker(redisLocker);
return redisLocker;
}
}
RedisLocker接口 : 操作redis服务接口
public interface RedisLocker {
RLock lock(String lockKey);
RLock lock(String lockKey, int timeout);
RLock lock(String lockKey, TimeUnit unit, int timeout);
boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime);
void unlock(String lockKey);
void unlock(RLock lock);
RCountDownLatch getCountDownLatch(String name);
RSemaphore getSemaphore(String name);
void setRedissonClient(RedissonClient redissonClient);
}
RedisLockerServer 实现接口 :
@Configurable
public class RedisLockerServer implements RedisLocker {
@Autowired
private RedissonClient redissonClient;
@Override
public RLock lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
@Override
public RLock lock(String lockKey, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, TimeUnit.SECONDS);
return lock;
}
@Override
public RLock lock(String lockKey, TimeUnit unit, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
return lock;
}
@Override
public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
return false;
}
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
@Override
public void unlock(RLock lock) {
lock.unlock();
}
@Override
public RCountDownLatch getCountDownLatch(String name) {
return redissonClient.getCountDownLatch(name);
}
@Override
public RSemaphore getSemaphore(String name) {
return redissonClient.getSemaphore(name);
}
@Override
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
}
RedisLockUtil.java : 操作redis工具类
public class RedisLockUtil {
private static RedisLocker redisLocker;
public static void setLocker(RedisLocker locker) {
redisLocker = locker;
}
/**
* 加锁
* @param lockKey
* @return
*/
public static RLock lock(String lockKey) {
return redisLocker.lock(lockKey);
}
/**
* 释放锁
* @param lockKey
*/
public static void unlock(String lockKey) {
redisLocker.unlock(lockKey);
}
/**
* 释放锁
* @param lock
*/
public static void unlock(RLock lock) {
redisLocker.unlock(lock);
}
/**
* 带超时的锁
* @param lockKey
* @param timeout 超时时间 单位:秒
*/
public static RLock lock(String lockKey, int timeout) {
return redisLocker.lock(lockKey, timeout);
}
/**
* 带超时的锁
* @param lockKey
* @param unit 时间单位
* @param timeout 超时时间
*/
public static RLock lock(String lockKey, int timeout, TimeUnit unit ) {
return redisLocker.lock(lockKey, unit, timeout);
}
/**
* 尝试获取锁
* @param lockKey
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public static boolean tryLock(String lockKey, int waitTime, int leaseTime) {
return redisLocker.tryLock(lockKey, TimeUnit.SECONDS, waitTime, leaseTime);
}
/**
* 尝试获取锁
* @param lockKey
* @param unit 时间单位
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public static boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
return redisLocker.tryLock(lockKey, unit, waitTime, leaseTime);
}
/**
* 获取计数器
*
* @param name
* @return
*/
public static RCountDownLatch getCountDownLatch(String name){
return redisLocker.getCountDownLatch(name);
}
/**
* 获取信号量
*
* @param name
* @return
*/
public static RSemaphore getSemaphore(String name){
return redisLocker.getSemaphore(name);
}
}
单例环境下集成Redisson就完成了.
实战
我们假设 : 一个服务分别部署在两台服务器,分别是 服务A,端口号为8999、服务B,端口号为9000 ,我们现在模拟这么个简单需求 : 服务A、B需要同时向count.txt写入统计数据,这里发生了A、B(多个进程)并发修改同一份文件,为了避免操作乱序导致数据错误,此时,我们就需要引入分布式锁来解决这个问题了.

RedissonController.java :
@RestController
@RequestMapping("/redisson")
public class RedissonController {
private static final String ENCODING = "UTF-8";
private static final File FILE = new File("D:\\count.txt");
// 锁
private static final String LOCK_KEY = "lock_key";
/**
* 访问count.txt、并进行统计
*/
@PutMapping("/count")
public void count() {
try {
// 获取锁
RedisLockUtil.lock(LOCK_KEY);
Long count = Long.parseLong(FileUtils.readLines(FILE, ENCODING).get(0));
++count;
FileUtils.writeStringToFile(FILE, String.valueOf(count), ENCODING);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 释放锁
RedisLockUtil.unlock(LOCK_KEY);
}
}
/**
* 获取文件统计的个数
*
* @return 文件统计的个数
* @throws IOException 文件IO异常
*/
@GetMapping("/getCount")
public String getCount() throws IOException {
List<String> readLines = new ArrayList<>(1);
while (readLines.size() == 0) {
readLines = FileUtils.readLines(FILE, ENCODING);
}
long count = Long.parseLong(readLines.get(0));
return " Count : " + count;
}
/**
* 重置为0
*
* @return 重置信息
* @throws IOException 文件IO异常
*/
@GetMapping("/reset")
public String reset() throws IOException {
if (!FILE.exists()) {
FILE.createNewFile();
}
FileUtils.writeStringToFile(FILE, "0", ENCODING);
return "已经完成重置, Count : " + FileUtils.readLines(FILE, ENCODING).get(0);
}
}
分别启动两个服务,端口号为 8999、9000 :


调用 /redisson/reset 初始化、重置 :

调用 /redisson/getCount 查看count.txt 文件记录数 :

启动Jmeter,分别向 /redisson/count 1秒内压测 2000 QPS :


调用 /redisson/getCount 查看count.txt 文件记录数 : 结果符合预期

测试看门狗策略 : 在 RedissonController.java 新增接口 :
/**
* 测试看门狗策略是否能保证共享资源的安全操作
*/
@PutMapping("/testWatchDog")
public void testWatchDog() {
try {
// 由于默认过期时间是30s,为了方便测试,我们这边设置为3s
RedisLockUtil.lock(LOCK_KEY,3);
// 模仿执行业务需要 4s ,超过了锁过期
Thread.sleep(4000);
Long count = Long.parseLong(FileUtils.readLines(FILE, ENCODING).get(0));
++count;
FileUtils.writeStringToFile(FILE, String.valueOf(count), ENCODING);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
RedisLockUtil.unlock(LOCK_KEY);
}
}
Jmeter 分别压测 10 QPS , 调用 /redisson/getCount 查看是否等于20 :

总结
Redis分布式锁的不足
在集群环境下,它不是绝对安全的;在Sentinel集群中,当主节点挂掉时,从节点取而代之,但是客户端并不知道 : 客户端1在主节点成功获取锁,操作共享资源,但这把锁还没来得及同步到从节点,主节点突然挂掉了,然后从节点变为主节点,新的节点并没有这个锁,其他客户端过来请求锁的时候,可以获取到,这就导致了同一把锁被两个客户端同时持有.

作者引入了Redlock来解决集群下的问题,使用Redlock需要提供多个Redis实例,并且没有主从关系,需要更多的Redis实例,性能肯定下降了,而且很多地方需要考虑,例如运维上时钟跳跃问题,所以使用前需再三斟酌.
PS : 本文不对Redlock详细叙述,想了解更多的可以到 怎么实现Redis分布式锁 的@Kaito 的评论.
其他总结
- 一个分布式锁,在极端情况下,不一定是安全的,所以对业务比较敏感的一定要注意这个问题.
- 使用分布式锁,在上层完成互斥目的,虽然极端情况下锁会失效,但它可以最大程度把并发请求阻挡在最上层,减轻操作资源层的压力;
- 对于要求数据绝对正确的业务,在资源层一定要做好兜底,例如使用mysql实现的乐观锁 :
- 客户端1使用Redlock拿到锁
- 先在mysql表中更新当前的UUID
- 根据where条件是否为当前UUID来操作共享资源
- UPDATE table T set val = $new_value where id = $id AND uuid = $current_uuid
- 如果操作的共享资源的服务器不是MySQL,上面的方案就无能为力了,这对操作的共享资源的服务器提出了更高的要求.
相关参考资料
- Redis深度历险 核心原理与应用实践【书籍】
- Redis设计与实现【书籍】
- 怎么实现Redis分布式锁【链接】