Redis中的分布式锁(稳扎稳打)
分布式锁
概述
分布式锁指的是,一切服务中的一切线程都去获取同一把锁,但只要一个线程能够成功的取得锁,其他没有取得锁的线程有必要悉数等候,直到持有锁的线程开释锁。
分布式锁是能够跨过多个实例,多个进程的锁
分布式锁具有的条件:
- 互斥性:恣意时刻,只能有一个客户端持有锁
- 锁超时开释:持有锁超时,能够开释,防止死锁
- 可重入性:一个线程获取了锁之后,能够再次对其恳求加锁
- 高可用、高性能:加锁和解锁开支要尽或许低,一起保证高可用
- 安全性:锁只能被持有该锁的服务(或运用)开释。
- 容错性:在持有锁的服务溃散时,锁仍能得到开释,防止死锁。
分布式锁完结计划
分布式锁都是通过第三方组件来完结的,现在比较盛行的分布式锁的解决计划有:
- 数据库,通过数据库能够完结分布式锁,可是在高并发的情况下对数据库压力较大,所以很少运用。
- Redis,凭借Redis也能够完结分布式锁,而且Redis的Java客户端品种许多,运用的办法也不尽相同。
- Zookeeper,Zookeeper也能够完结分布式锁,相同Zookeeper也存在多个Java客户端,运用办法也不相同
Redis完结分布式锁
SETNX
根本计划:Redis供给了setXX指令来完结分布式锁
格局: setnx key value
将key 的值设为value ,当且仅当key不存在。
若给定的 key现已存在,则SETNX不做任何动作。
设置分布式锁后,能保证并发安全,但上述代码还存在问题,假如履行过程中呈现反常,程序就直接抛出反常退出,导致锁没有开释形成终究死锁的问题。(即便将锁放在finally中开释,可是假如是履行到半途体系宕机,锁仍是没有被成功的开释掉,仍然会呈现死锁现象)
设置超时时刻
SET lock_key unique_value NX PX 10000
可是,即便设置了超时时刻后,还存在问题。
假定有多个线程,假定设置锁的过期时刻10s,线程1上锁后履行事务逻辑的时长超越十秒,锁到期开释锁,线程2就能够取得锁履行,此刻线程1履行完删去锁,删去的便是线程2持有的锁,线程3又能够获取锁,线程2履行完删去锁,删去的是线程3的锁,如此往后,这样就会出问题。
让线程只删去自己的锁
解决办法便是让线程只能删去自己的锁,即给每个线程上的锁增加仅有标识(这儿UUID完结,根本不会呈现重复),删去锁时判别这个标识:
但上述红框中因为断定和开释锁不是原子的,极点情况下,或许断定能够开释锁,在履行删去锁操作前刚好时刻到了,其他线程获取锁履行,前者线程删去锁删去的仍然是其他线程的锁,所以要让删去锁具有原子性,能够运用redis事务或lua脚本完结原子操作判别+删去
Redis的单条指令操作是原子性的,可是多条指令操作并不是原子性的,因而Lua脚本完结的便是令Redis的多条指令也完结原子操作
redis事务不是原子操作的,概况请看 Redis的事务
可是,能够运用Redis的事务和watch完结的达观锁 来监督锁的状况
@RequestMapping(" /deduct_stock")
public String deductStock() {
String REDIS_LOCK = "good_lock";
// 每个人进来先要进行加锁,key值为"good_lock"
String value = UUID.randomUUID().toString().replace("-","");
try{
// 为key加一个过期时刻
Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);
// 加锁失利
if(!flag){
return "抢锁失利!";
}
System.out.println( value+ " 抢锁成功");
String result = template.opsForValue().get("goods:001");
int total = result == null ? 0 : Integer.parseInt(result);
if (total > 0) {
// 假如在此处需求调用其他微服务,处理时刻较长。。。
int realTotal = total - 1;
template.opsForValue().set("goods:001", String.valueOf(realTotal));
System.out.println("购买产品成功,库存还剩:" + realTotal + "件, 服务端口为8002");
return "购买产品成功,库存还剩:" + realTotal + "件, 服务端口为8002";
} else {
System.out.println("购买产品失利,服务端口为8002");
}
return "购买产品失利,服务端口为8002";
}finally {
// 谁加的锁,谁才干删去
// 也能够运用redis事务
// https://redis.io/commands/set
// 运用Lua脚本,进行锁的删去
Jedis jedis = null;
try{
jedis = RedisUtils.getJedis();
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));
if("1".equals(eval.toString())){
System.out.println("-----del redis lock ok....");
}else{
System.out.println("-----del redis lock error ....");
}
}catch (Exception e){
}finally {
if(null != jedis){
jedis.close();
}
}
// redis事务
// while(true){
// template.watch(REDIS_LOCK);
// if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
// template.setEnableTransactionSupport(true);
// template.multi();
// template.delete(REDIS_LOCK);
// List<Object> list = template.exec();
// if(list == null){
// continue;
// }
// }
// template.unwatch();
// break;
// }
}
}
}
尽管这样,仍是会有问题,锁超时开释尽管能够防止死锁,但假如是事务履行耗时较长,也会导致锁的开释,但其实此刻事务还在履行中,仍是应该将事务履行完毕之后再开释锁。
续时
因而能够设定,使命不完结,锁就不开释。
能够保护一个守时线程池 ScheduledExecutorService
,每隔 2s 去扫描参加行列中的 Task,判别失效时刻是否快到了,假如快到了,则给锁续上时刻。
那怎么判别是否快到失效时刻了呢?能够用以下公式:【失效时刻】<= 【当时时刻】+【失效距离(三分之一超时)】
// 扫描的使命行列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
* 线程池,保护keyAliveTime
*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
// 两秒履行一次「续时」操作
SCHEDULER.scheduleAtFixedRate(() -> {
// 这儿记住加 try-catch,否者报错后守时使命将不会再履行=-=
Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
while (iterator.hasNext()) {
RedisLockDefinitionHolder holder = iterator.next();
// 判空
if (holder == null) {
iterator.remove();
continue;
}
// 判别 key 是否还有用,无效的话进行移除
if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
iterator.remove();
continue;
}
// 超时重试次数,超越时给线程设定中止
if (holder.getCurrentCount() > holder.getTryCount()) {
holder.getCurrentTread().interrupt();
iterator.remove();
continue;
}
// 判别是否进入最终三分之一时刻
long curTime = System.currentTimeMillis();
boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
if (shouldExtend) {
holder.setLastModifyTime(curTime);
redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
holder.setCurrentCount(holder.getCurrentCount() + 1);
}
}
}, 0, 2, TimeUnit.SECONDS);
}
Redisson
运用Redis + lua办法或许存在的问题
- 不行重入性。同一个线程无法屡次获取同一把锁
- 不行重试。获取锁只测验一次就回来false,没有重试机制
- 超时开释。锁超时开释尽管能够防止死锁,但假如是事务履行耗时较长,也会导致锁的开释,存在安全隐患
- 主从一致性。假如Redis是主从集群,主从同步存在推迟,当主机宕机时,从成为了主,但或许存在从此刻还未完结同步,因而从上就没有锁标识,此刻会呈现线程安全问题。
RLock是Redisson分布式锁的最中心接口,承继了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的回来值都是RFuture,是Redisson履行异步完结的中心逻辑,也是Netty发挥的首要阵地。
RLock怎么加锁解锁,完结可重入性?
从RLock进入,找到RedissonLock类,找到tryLock 办法再持续找到tryAcquireOnceAsync 办法,这是加锁的首要代码(版别纷歧此处完结有不同,和最新3.15.x有必定收支,可是中心逻辑仍然未变。此处以3.13.6为例)
// waitTime 等候时刻,多久时刻内都会在这测验获取锁
// leaseTime 加锁时是否设置过期时刻
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
此处呈现leaseTime时刻判其他2个分支,实际上便是加锁时是否设置过期时刻,未设置过期时刻(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事情的续约使命。咱们先来看有过期时刻tryLockInnerAsync 部分
evalWriteAsync办法是eval指令履行lua的进口
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
eval指令履行Lua脚本的当地,此处将Lua脚本打开
-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then
-- 新增该锁而且hash中该线程id对应的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置过期时刻
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在该key 而且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 线程重入次数++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId组合的仅有值
ARGV[2] = this.getLockName(threadId)
一共3个参数完结了一段逻辑:
- 判别该锁是否现已有对应hash表存在,
- 没有对应的hash表:则set该hash表中一个entry的key为锁称号,value为1,之后设置该hash表失效时刻为leaseTime
- 存在对应的hash表:则将该lockName的value履行+1操作,也便是核算进入次数,再设置失效时刻leaseTime
- 最终回来这把锁的ttl剩下时刻
再看看RLock怎么解锁?
看unlock办法,相同查找办法名,一路到unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}
将lua脚本打开
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 存在,计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 过期时刻重设
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 删去并发布解锁音讯
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
name 锁称号
channelName,用于pubSub发布音讯的channel称号
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
LockPubSub.UNLOCK_MESSAGE,channel发送音讯的类别,此处解锁为0
internalLockLeaseTime,watchDog装备的超时时刻,默以为30s
lockName 这儿的lockName指的是uuid和threadId组合的仅有值
详细履行过程如下:
- 假如该锁不存在则回来nil;
- 假如该锁存在则将其线程的hash key计数器-1,
- 计数器counter>0,重置下失效时刻,回来0;不然,删去该锁,发布解锁音讯unlockMessage,回来1;
加锁解锁流程总结如下:
总的来说便是通过Hash类型来存储锁的次数:
RLock的锁重试问题
需求剖析的是锁重试的,所以,在运用lock.tryLock()办法的时分,不能用无参的。
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}
在调用tryAcquire办法后,回来了一个Long的ttl
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
//省掉
持续跟着代码进去查看,最终会发现,调用tryLockInnerAsync办法。这个办法便是获取锁的Lua脚本的。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
这个lua脚本上面提到了。便是 判别,假如获取到锁,回来一个nil.也便是null。假如没有获取到,就调用 pttl,name。其实便是获取当时name锁的剩下有用期。
获取到ttl。假如回来null说获取锁成功,直接回来true.假如回来的不是null,阐明需求进行重试操作了。首要是依据时刻进行判其他。通过一系列判别后,do,while是真实履行重试相关逻辑的。如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
//假如回来null,阐明获取到了锁,直接回来
if (ttl == null) {
return true;
} else {
//当时时刻与进入办法时的时刻进行比较
//System.currentTimeMillis() - current表明前面获取锁耗费时刻
time -= System.currentTimeMillis() - current;////time是重试锁的等候时刻,
if (time <= 0L) {//剩下等候时刻,假如剩下等候时刻<=0,设置获取锁失利。
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
//再次获取当时时刻
current = System.currentTimeMillis();
//刚刚测验完获取锁失利,假如持续当即测验一般是获取不到锁的,因而这儿挑选订阅的办法
//订阅当时锁,在unlock开释锁的时分有个:redis.call('publish', KEYS[2], ARGV[1]); 所以这儿就订阅了
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
//进行等候RFuture的成果,等多久?等time的时刻
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
//time时刻过完了还没有比及锁开释的告诉
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
//假如等候超时,就撤销订阅
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
//回来获取锁失利
return false;
} else {//到这儿表明在tme时刻内取得了开释锁的告诉
boolean var16;
try {
//查看之前订阅等候的耗费时刻
time -= System.currentTimeMillis() - current;
if (time <= 0L) {//当时的剩下等候时刻
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
}
//这儿开端进行重试相关逻辑。首要便是当时时刻和进入办法时分的时刻进行比较
do {
long currentTime = System.currentTimeMillis();
//这儿便是第一次重试
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {//null表明获取锁失利
var16 = true;
return var16;
}
//再试一次
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) { //也不是一向试,等他人开释
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);//时刻还足够,持续等候
//时刻到期了,还没获取到锁,回来失利
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
}
}
首要是do while机制进行锁重试的,while会查看时刻是否还足够会持续循环。当然这个循环不是直接while(true)的盲等机制,而是运用信号量和订阅的办法完结的,会等他人开释锁,再进行测验,这种办法对cpu友爱
Redisson的超时续约
跟从tryLock代码,在RedissonLock类中的tryAcquireOnceAsync办法中,会看到如下代码:
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {//设置了锁过期时刻
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
//leaseTime = -1时,即没有设置了锁过期时刻
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//,默许30秒
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
//ttlRemainingFuture完结今后
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {//没有抛反常
if (ttlRemaining) {//获取锁成功
this.scheduleExpirationRenewal(threadId);//自动更新续期时刻的使命调度
}
}
});
return ttlRemainingFuture;
}
}
- 在运用trylock的时分,假如设置了锁过期时刻,就不会履行续命相关逻辑了。
- 其间默许的watchdogTimeout时刻是30秒。
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
//获取一个entry,将entry放到map里,getEntryName()便是当时锁称号。
//放到map里,即一个锁对应一个entry
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {//表明重入的,第2次放
oldEntry.addThreadId(threadId);
} else {//表明第一次放
entry.addThreadId(threadId);
this.renewExpiration();//第一次放,进行续约
}
}
看门狗机制:在获取锁成功今后,敞开一个守时使命,每隔一段时刻就会去重置锁的超时时刻,以保证锁是在程序