redisson 分布式锁原理

1. 工具类
package com.meta.mall.common.utils;
import lombok.extern.slf4j.slf4j;
import org.redisson.api.rlock;
import org.redisson.api.redissonclient;
import org.springframework.stereotype.component;
import javax.annotation.resource;
import java.util.concurrent.timeunit;
/**
* redisson 分布式工具类
*
* @author gaoyang
* @date 2022-05-14 08:58
*/
@slf4j
@component
public class redissonutils {
@resource
private redissonclient redissonclient;
/**
* 加锁
*
* @param lockkey
*/
public void lock(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
lock.lock();
}
/**
* 带过期时间的锁
*
* @param lockkey key
* @param leasetime 上锁后自动释放锁时间
*/
public void lock(string lockkey, long leasetime) {
rlock lock = redissonclient.getlock(lockkey);
lock.lock(leasetime, timeunit.seconds);
}
/**
* 带超时时间的锁
*
* @param lockkey key
* @param leasetime 上锁后自动释放锁时间
* @param unit 时间单位
*/
public void lock(string lockkey, long leasetime, timeunit unit) {
rlock lock = redissonclient.getlock(lockkey);
lock.lock(leasetime, unit);
}
/**
* 尝试获取锁
*
* @param lockkey key
* @return
*/
public boolean trylock(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
return lock.trylock();
}
/**
* 尝试获取锁
*
* @param lockkey key
* @param waittime 最多等待时间
* @param leasetime 上锁后自动释放锁时间
* @return boolean
*/
public boolean trylock(string lockkey, long waittime, long leasetime) {
rlock lock = redissonclient.getlock(lockkey);
try {
return lock.trylock(waittime, leasetime, timeunit.seconds);
} catch (interruptedexception e) {
log.error("redissonutils - trylock异常", e);
}
return false;
}
/**
* 尝试获取锁
*
* @param lockkey key
* @param waittime 最多等待时间
* @param leasetime 上锁后自动释放锁时间
* @param unit 时间单位
* @return boolean
*/
public boolean trylock(string lockkey, long waittime, long leasetime, timeunit unit) {
rlock lock = redissonclient.getlock(lockkey);
try {
return lock.trylock(waittime, leasetime, unit);
} catch (interruptedexception e) {
log.error("redissonutils - trylock异常", e);
}
return false;
}
/**
* 释放锁
*
* @param lockkey key
*/
public void unlock(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
lock.unlock();
}
/**
* 是否存在锁
*
* @param lockkey key
* @return
*/
public boolean islocked(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
return lock.islocked();
}
}
2. lock和trylock的区别
1.返回值
- lock 是 void;
- trylock 是 boolean。
2.时机
- lock 一直等锁释放;
- trylock 获取到锁返回true,获取不到锁并直接返回false。
lock拿不到锁会一直等待。trylock是去尝试,拿不到就返回false,拿到返回true。
trylock是可以被打断的,被中断的,lock是不可以。
3. 源码分析
3.1 lock
private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception {
// 获取当前线程 id
long threadid = thread.currentthread().getid();
// 获取锁,正常获取锁则ttl为null,竞争锁时返回锁的过期时间
long ttl = tryacquire(-1, leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return;
}
// 订阅锁释放事件
// 如果当前线程通过 redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争
rfuture<redissonlockentry> future = subscribe(threadid);
if (interruptibly) {
commandexecutor.syncsubscriptioninterrupted(future);
} else {
commandexecutor.syncsubscription(future);
}
try {
while (true) {
// 循环重试获取锁,直至重新获取锁成功才跳出循环
// 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程
ttl = tryacquire(-1, leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds);
} catch (interruptedexception e) {
if (interruptibly) {
throw e;
}
future.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds);
}
} else {
if (interruptibly) {
future.getnow().getlatch().acquire();
} else {
future.getnow().getlatch().acquireuninterruptibly();
}
}
}
} finally {
// 最后释放订阅事件
unsubscribe(future, threadid);
}
// get(lockasync(leasetime, unit));
}
<t> rfuture<t> trylockinnerasync(long waittime, long leasetime, timeunit unit, long threadid, redisstrictcommand<t> command) {
return evalwriteasync(getrawname(), longcodec.instance, command,
"if (redis.call('exists', keys[1]) == 0) then " +
"redis.call('hincrby', keys[1], argv[2], 1); " +
"redis.call('pexpire', keys[1], argv[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', keys[1], argv[2]) == 1) then " +
"redis.call('hincrby', keys[1], argv[2], 1); " +
"redis.call('pexpire', keys[1], argv[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', keys[1]);",
collections.singletonlist(getrawname()), unit.tomillis(leasetime), getlockname(threadid));
}此段脚本为一段lua脚本:
key[1]: 为你加锁的lock值
argv[2]: 为线程id
argv[1]: 为设置的过期时间
第一个if:
- 判断是否存在设置lock的key是否存在,不存在则利用redis的hash结构设置一个hash,值为1,并设置过期时间,后续返回锁。
第二个if:
- 判断是否存在设置lock的key是否存在,存在此线程的hash,则为这个锁的重入次数加1(将hash值+1),并重新设置过期时间,后续返回锁。
最后返回:
- 这个最后返回不是说最后结果返回,是代表以上两个if都没有进入,则代表处于竞争锁的情况,后续返回竞争锁的过期时间。
3.2 trylock
trylock具有返回值,true或者false,表示是否成功获取锁。
trylock前期获取锁逻辑基本与lock一致,主要是后续获取锁失败的处理逻辑与lock不一致。
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {
long time = unit.tomillis(waittime);
long current = system.currenttimemillis();
long threadid = thread.currentthread().getid();
long ttl = tryacquire(waittime, leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return true;
}
// 获取锁失败后,中途trylock会一直判断中间操作耗时是否已经消耗锁的过期时间,如果消耗完则返回false
time -= system.currenttimemillis() - current;
if (time <= 0) {
acquirefailed(waittime, unit, threadid);
return false;
}
current = system.currenttimemillis();
// 订阅锁释放事件
// 如果当前线程通过 redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争.
rfuture<redissonlockentry> subscribefuture = subscribe(threadid);
// 将订阅阻塞,阻塞时间设置为我们调用trylock设置的最大等待时间,超过时间则返回false
if (!subscribefuture.await(time, timeunit.milliseconds)) {
if (!subscribefuture.cancel(false)) {
subscribefuture.oncomplete((res, e) -> {
if (e == null) {
unsubscribe(subscribefuture, threadid);
}
});
}
acquirefailed(waittime, unit, threadid);
return false;
}
try {
time -= system.currenttimemillis() - current;
if (time <= 0) {
acquirefailed(waittime, unit, threadid);
return false;
}
// 循环获取锁,但由于上面有最大等待时间限制,基本会在上面返回false
while (true) {
long currenttime = system.currenttimemillis();
ttl = tryacquire(waittime, leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return true;
}
time -= system.currenttimemillis() - currenttime;
if (time <= 0) {
acquirefailed(waittime, unit, threadid);
return false;
}
// waiting for message
currenttime = system.currenttimemillis();
if (ttl >= 0 && ttl < time) {
subscribefuture.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds);
} else {
subscribefuture.getnow().getlatch().tryacquire(time, timeunit.milliseconds);
}
time -= system.currenttimemillis() - currenttime;
if (time <= 0) {
acquirefailed(waittime, unit, threadid);
return false;
}
}
} finally {
unsubscribe(subscribefuture, threadid);
}
// return get(trylockasync(waittime, leasetime, unit));
}
应尽量使用trylock,且携带参数,因为可设置最大等待时间以及可及时获取加锁返回值,后续可做一些其他加锁失败的业务
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论