Redis分布式锁的实现
作者:行百里er
博客:https://chendapeng.cn (opens new window)
提示
这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!
# 环境准备
我比较喜欢做全套的,一个Redis分布式锁的应用示例,我准备了Redis各种环境、SpringBoot部署两个服务、用tengine做这两个服务的负载均衡、用Jmeter做压力测试,可谓是麻雀虽小,五脏俱全。
关于Redis各种模式环境的搭建,我已经准备了一篇文章:
Redis各种模式部署及工作原理-单节点、主从复制、redis-sentinel(哨兵)以及redis-cluster(集群) (opens new window)
欢迎批评指正。
本文Redis分布式锁,从Redis单节点、主从、哨兵、集群各种环境都操练一下,其实主要玩的是配置,配置对了,调用接口就可以了。
我已经准备好了Redis各种环境,我们分布式锁代码实现就基于这一系列环境。
# 单节点
主机名称 | 角色 | IP地址 | 端口 |
---|---|---|---|
redis-standalone | 192.168.2.11 | 6379 |
# 主从(1主3从)
主机名称 | 角色 | IP地址 | 端口 |
---|---|---|---|
Redis-Master-01 | Master | 192.168.2.20 | 9736 |
Redis-Slave-02 | Slave | 192.168.2.21 | 9736 |
Redis-Slave-03 | Slave | 192.168.2.22 | 9736 |
Redis-Slave-04 | Slave | 192.168.2.23 | 9736 |
# 哨兵(1主3从3sentinel)
主机名称 | 角色 | IP地址 | 端口 |
---|---|---|---|
Redis-Master-01 | Master | 192.168.2.20 | 9736 |
Redis-Slave-02 | Slave | 192.168.2.21 | 9736 |
Redis-Slave-03 | Slave | 192.168.2.22 | 9736 |
Redis-Slave-04 | Slave | 192.168.2.23 | 9736 |
Redis-Sentinel-01 | Sentinel | 192.168.2.30 | 29736 |
Redis-Sentinel-02 | Sentinel | 192.168.2.31 | 29736 |
Redis-Sentinel-03 | Sentinel | 192.168.2.32 | 29736 |
# 集群(3主3从)
主机名称 | 角色 | IP地址 | 端口 |
---|---|---|---|
Redis-Cluster-01 | Master | 192.168.2.50 | 6379 |
Redis-Cluster-01 | Slave | 192.168.2.50 | 6380 |
Redis-Cluster-02 | Master | 192.168.2.51 | 6379 |
Redis-Cluster-02 | Slave | 192.168.2.51 | 6380 |
Redis-Cluster-03 | Master | 192.168.2.52 | 6379 |
Redis-Cluster-03 | Slave | 192.168.2.52 | 6380 |
# 分布式锁应用举例
我之前怼过基于etcd和zookeeper的分布式锁的实现,用的例子是秒杀场景,扣减库存,这也是比较经典的使用分布式锁的业务场景。
还有比Redis更骚的分布式锁的实现方式吗?有,etcd! (opens new window)
用ZooKeeper实现分布式锁 (opens new window)
本次换一个搞法,我们对一篇文章的阅读量进行分布式操作,使用Redis分布式锁对文章的阅读量这个共享资源进行控制。
# 存储阅读量
set pview 0
2
# 使用tengine(nginx)做负载均衡
tengine主机信息:
主机名称 | 角色 | IP地址 | 端口 |
---|---|---|---|
nginx-node | 负载均衡 | 192.168.2.10 | 80 |
后面做压力测试的过程中会只通过一个地址,对两个服务(8080/8090)做负载均衡,nginx简单配置如下:
...
upstream distributed-lock {
server 192.168.2.1:8080 weight=1;
server 192.168.2.1:8090 weight=1;
}
server {
listen 80;
server_name localhost;
location / {
root html;
index index.html index.htm;
proxy_pass http://distributed-lock;
}
...
}
...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# JMeter压力测试配置
模拟同一时刻发出666个请求:
# 轮子:Redisson
Redisson
对Redis分布式锁的实现有相当好的支持,其实现机制:
(1)加锁机制:根据hash节点选择一个客户端执行lua脚本
(2)锁互斥机制:再来一个客户端执行同样的lua脚本会提示已经存在锁,然后进入循环一直尝试加锁
(3)可重入机制
(4)watch dog自动延期机制
(5)释放锁机制
# 代码实现
# 不加锁
@RequestMapping("/v1/pview")
public String incrPviewWithoutLock() {
//阅读量增加1
long pview = redissonClient.getAtomicLong("pview").incrementAndGet();
LOGGER.info("{}线程执行阅读量加1,当前阅读量:{}", Thread.currentThread().getName(), pview);
return port + " increase pview end!";
}
2
3
4
5
6
7
同一时刻并发请求666个,来看一下结果:
666个请求,最终结果才是34!
# 加synchronized同步锁
从刚才的结果可以看出,在8080和8090这两个JVM进程中均有重复的,所以我们改进一下,加一个synchronized
同步锁,再看一下执行情况。
@RequestMapping("/v2/pview")
public String incrPviewWithSync() {
synchronized (this) {
//阅读量增加1
int oldPview = Integer.valueOf((String) redissonClient.getBucket("pview", new StringCodec()).get());
int newPview = oldPview + 1;
redissonClient.getBucket("pview", new StringCodec()).set(String.valueOf(newPview));
LOGGER.info("{}线程执行阅读量加1,当前阅读量:{}", Thread.currentThread().getName(), newPview);
}
return port + " increase pview end!";
}
2
3
4
5
6
7
8
9
10
11
结果并不是预期的666,而是391:
这个时候可以看到,虽然两个端口各自的服务内没有重复的了,但是8080和8090两个服务的进程有重复对同一个pview的值进行+1的。
也就是说,synchronized只能解决进程内的并发问题,不能解决分布式系统带来的操作共享资源问题。
# 主角登场-分布式锁
解决分布式系统下的操作共享资源的问题,用分布式锁。
完整代码:https://github.com/xblzer/distributedLocks
构造RedissonClient:
public PviewController(RedisConfiguration redisConfiguration) {
RedissonManager redissonManager;
switch (redisConfiguration.deployType) {
case "single":
redissonManager = new SingleRedissonManager();
break;
case "master-slave":
redissonManager = new MasterSlaveRedissonManager();
break;
case "sentinel":
redissonManager = new SentinelRedissonManager();
break;
case "cluster":
redissonManager = new ClusterRedissonManager();
break;
default:
throw new IllegalStateException("Unexpected value: " + redisConfiguration.deployType);
}
this.redissonClient = redissonManager.initRedissonClient(redisConfiguration);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这里用了一个策略模式,可根据Redis部署方式的不同选择初始化不同的RedissonClient。
RedisLock:
这里为了整合zookeeper、etcd分布式锁,我抽象出了一个AbstractLock模板方法类,该类实现了java.util.concurrent.locks.Lock。
这样后面无论用哪种分布式锁,都可以用Lock lock = new xxx()来定义。
在下面的文章中有体现:
public class RedisLock extends AbstractLock {
private RedissonClient redissonClient;
private String lockKey;
public RedisLock(RedissonClient redissonClient, String lockKey) {
this.redissonClient = redissonClient;
this.lockKey = lockKey;
}
@Override
public void lock() {
redissonClient.getLock(lockKey).lock();
}
//...略
@Override
public void unlock() {
redissonClient.getLock(lockKey).unlock();
}
//...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
请求API:
@RequestMapping("/v3/pview")
public String incrPviewWithDistributedLock() {
Lock lock = new RedisLock(redissonClient, lockKey);
try {
//加锁
lock.lock();
int oldPview = Integer.valueOf((String) redissonClient.getBucket("pview", new StringCodec()).get());
//执行业务 阅读量增加1
int newPview = oldPview + 1;
redissonClient.getBucket("pview", new StringCodec()).set(String.valueOf(newPview));
LOGGER.info("{} 成功获得锁,阅读量加1,当前阅读量:{}", Thread.currentThread().getName(), newPview);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
return port + " increase pview end!";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
执行压测结果:
从结果看,没有问题。
# RedissonLock加锁源码分析
来看一下RedissonLock
加锁的源码:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
this.internalLockLeaseTime,
this.getLockName(threadId));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
其中执行了Lua脚本,用Lua脚本的原因是
- 原子操作。Redis会将整个脚本作为一个整体执行,不会被中断。可以用来批量更新、批量插入
- 减少网络开销。多个Redis操作合并为一个脚本,减少网络时延
- 代码复用。客户端发送的脚本可以存储在Redis中,其他客户端可以根据脚本的id调用。
这里面用到了几个Redis命令:
hincrby
HINCRBY key field increment
为哈希表 key 中的域 field 的值加上增量 increment 。
增量也可以为负数,相当于对给定域进行减法操作。
如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。
如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。
返回值:
执行 HINCRBY 命令之后,哈希表 key 中域 field 的值。
pexpire
PEXPIRE key milliseconds
这个命令和 EXPIRE 命令的作用类似,但是它以毫秒为单位设置 key 的生存时间,而不像 EXPIRE 命令那样,以秒为单位。
返回值:
设置成功,返回 1
key 不存在或设置失败,返回 0
hexists
HEXISTS key field
查看哈希表 key 中,给定域 field 是否存在。
返回值:
如果哈希表含有给定域,返回 1 。
如果哈希表不含有给定域,或 key 不存在,返回 0 。
pttl
PTTL key
这个命令类似于 TTL 命令,但它以毫秒为单位返回 key 的剩余生存时间,而不是像 TTL 命令那样,以秒为单位。
返回值:
当 key 不存在时,返回 -2 。
当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以毫秒为单位,返回 key 的剩余生存时间。
现在再看那一段Lua脚本,
- 如果 KEYS[1] 不存在,
则执行hincrby KEYS[1] ARGV[2] 1
,
表示设置一个key为KEYS[1]的hash,该hash的k=ARGV[2],v=1,
(因为hincrby:如果域 field 不存在,那么在执行命令前,域的值被初始化为 0 。)
然后执行pexpire KEYS[1] ARGV[1]
设置过期时间
- 如果 KEYS[1] 存在,
执行hincrby KEYS[1] ARGV[2] 1
则表示为哈希表 key 中的域 field 的值加上1,也就是锁重入;
然后设置过期时间。
# RedisRedLock 红锁
前面的方案貌似解决了分布式系统下操作共享资源的问题,然而这是建立在Redis永不宕机的情况下的。
假如加锁使用Redis Sentinel模式,有节点宕机:
- 客户端通过MasterA获取到了锁,锁的超时时间是20秒;
- 在锁失效时间到来之前(即加锁后还未超过20秒)MasterA宕机了;
- Sentinel把其中一台Slave节点拉上来变成MasterB;
- MasterB发现没有锁,它也上锁;
- MasterB在锁失效时间内也宕机,Sentinel拉上来一个MasterC;
- MasterC上锁...
最后同时有3台实例都上了这把锁!这个坚决不能忍啊!
Redis为我们提供了RedLock红锁解决方案。
# RedLock算法步骤
在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。
以5个Redis节点为例,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉(下面用1台开5个实例来模拟)。
为了取到锁,客户端应该执行以下操作:
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。
在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。
例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。
当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
# 使用RedLock实现分布式锁
这里开5个Redis实例,使用RedLock实现分布式锁。
分布式锁使用的Redis实例列表:
# Redis分布式锁使用的redis实例
192.168.2.11 : 6479
192.168.2.11 : 6579
192.168.2.11 : 6679
192.168.2.11 : 6779
192.168.2.11 : 6889
2
3
4
5
6
为了方便,存储数据放在单节点Redis实例上(还可以是主从、哨兵、集群):
# 存储数据用的redis
192.168.2.11 : 6379
2
红锁代码实现:
// ============== 红锁 begin 方便演示才写在这里 可以写一个管理类 ==================
public static RLock create(String redisUrl, String lockKey) {
Config config = new Config();
//未测试方便 密码写死
config.useSingleServer().setAddress(redisUrl).setPassword("redis123");
RedissonClient client = Redisson.create(config);
return client.getLock(lockKey);
}
RedissonRedLock redissonRedLock = new RedissonRedLock(
create("redis://192.168.2.11:6479", "lock1"),
create("redis://192.168.2.11:6579", "lock2"),
create("redis://192.168.2.11:6679", "lock3"),
create("redis://192.168.2.11:6779", "lock4"),
create("redis://192.168.2.11:6889", "lock5")
);
@RequestMapping("/v4/pview")
public String incrPview() {
Lock lock = new RedisRedLock(redissonRedLock);
try {
//加锁
lock.lock();
//执行业务 阅读量增加1
int oldPview = Integer.valueOf((String) redissonClient.getBucket("pview", new StringCodec()).get());
int newPview = oldPview + 1;
redissonClient.getBucket("pview", new StringCodec()).set(String.valueOf(newPview));
LOGGER.info("{} 成功获得锁,阅读量加1,当前阅读量:{}", Thread.currentThread().getName(), newPview);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
return port + " increase pview end!";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
压测结果:
结果很完美!
这样我们就用Redis的RedLock红锁实现了分布式锁。
基于Reddisson实现的Redis红锁代码位于类
org.redisson.RedissonMultiLock
中:
以上。
本文完整代码:https://github.com/chendapengjava/distributedLocks
首发公众号 行百里er ,欢迎老铁们关注阅读指正。