redis和redission分布式锁原理及区别
我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。
结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。
1、有的同伴想到了synchronized关键字锁
暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:

2、有的小伙伴可能想到了乐观锁
没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:
- 我们在车的表中添加一个字段:version(int类型)(建议使用这个名称,这样别人看到就会直觉这是乐观锁字段,也可以使用别的名称)
- 查询出该车的数据,数据中就有version字段,假如version=1
select * from u_car where car_id = 10;
- 修改该车的状态为锁定
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1
在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败
乐观锁不是本次的终点,但还是简单说下;
3、使用redis的分布式锁
public boolean lock(string key, v v, int expiretime){
//获取锁
//在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有bug
//后来redis将加锁和设置时间用同一个命令
//这里是重点,redis.setnx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了
boolean b = false;
try {
b = redis.setnx(key, v, expiretime);
} catch (exception e) {
log.error(e.getmessage(), e);
}
return b;
}
public boolean unlock(string key){
return redis.delete(key);
}
}
但是这样写还是存在bug的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setnx(key, v, expiretime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的bug;
实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置n秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;
4、使用redission的分布式锁
//引入依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
<groupid>org.redisson</groupid>
<artifactid>redisson-spring-boot-starter</artifactid>
<version>3.10.6</version>
</dependency>
redisson支持单点、集群等模式,这里选择单点的。
- application.yml配置好redis的连接:
spring:
redis:
host: 127.0.0.1
port: 6379
password:
- 配置redisson的客户端bean
@configuration
public class redisconfig {
@value("${spring.redis.host}")
private string host;
@bean(name = {"redistemplate", "stringredistemplate"})
public stringredistemplate stringredistemplate(redisconnectionfactory factory) {
stringredistemplate redistemplate = new stringredistemplate();
redistemplate.setconnectionfactory(factory);
return redistemplate;
}
@bean
public redisson redisson() {
config config = new config();
config.usesingleserver().setaddress("redis://" + host + ":6379");
return (redisson) redisson.create(config);
}
}
- 加锁使用
private logger log = loggerfactory.getlogger(getclass());
@resource
private redisson redisson;
//加锁
public boolean lock(string key,long waittime,long leasetime){
boolean b = false;
try {
rlock rlock = redisson.getlock(key);
//说下参数 waittime:锁的存活时间 leasetime:锁的延长时间 后面的参数是单位
b = rlock.trylock(waittime,leasetime,timeunit.seconds);
} catch (exception e) {
log.error(e.getmessage(), e);
}
}
return b;
}
//释放锁
public void unlock(string key){
try {
rlock rlock = redisson.getlock(key);
if(null!=lock){
lock.unlock();
lock.forceunlock();
filelog.info("unlock succesed");
}
} catch (exception e) {
filelog.error(e.getmessage(), e);
}
}
- 带大家看下trylock方法的实现源码:
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {
long time = unit.tomillis(waittime);
long current = system.currenttimemillis();
long threadid = thread.currentthread().getid();
//尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
long ttl = tryacquire(leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return true;
}
//如果waittime已经超时了,就返回false
time -= system.currenttimemillis() - current;
if (time <= 0) {
acquirefailed(threadid);
return false;
}
current = system.currenttimemillis();
rfuture<redissonlockentry> subscribefuture = subscribe(threadid);
if (!await(subscribefuture, time, timeunit.milliseconds)) {
if (!subscribefuture.cancel(false)) {
subscribefuture.oncomplete((res, e) -> {
if (e == null) {
unsubscribe(subscribefuture, threadid);
}
});
}
acquirefailed(threadid);
return false;
}
try {
time -= system.currenttimemillis() - current;
if (time <= 0) {
acquirefailed(threadid);
return false;
}
//进入死循环,反复去调用tryacquire尝试获取锁,ttl为null时就是别的线程已经unlock了
while (true) {
long currenttime = system.currenttimemillis();
ttl = tryacquire(leasetime, unit, threadid);
// lock acquired
if (ttl == null) {
return true;
}
time -= system.currenttimemillis() - currenttime;
if (time <= 0) {
acquirefailed(threadid);
return false;
}
// waiting for message
currenttime = system.currenttimemillis();
if (ttl >= 0 && ttl < time) {
getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);
} else {
getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds);
}
time -= system.currenttimemillis() - currenttime;
if (time <= 0) {
acquirefailed(threadid);
return false;
}
}
} finally {
unsubscribe(subscribefuture, threadid);
}
// return get(trylockasync(waittime, leasetime, unit));
}
可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。
- 再看看tryacquire方法

- 这个方法的调用栈也是比较多,之后会进入下面这个方法

上面的lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:
- keys[1]代表你的key
- argv[1]代表你的key的存活时间,默认存活30秒
- argv[2]代表的是请求加锁的客户端id,后面的1则理解为加锁的次数,简单理解就是 如果该客户端多次对key加锁时,就会执行hincrby原子加1命令
第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key argv[2],1)加锁和设置redis call(pexpire key argv[1])存活时间;
当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的id,很明显不是;
则进入到最后的return返回该key的剩余存活时间
当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间
所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒
所以,总体流程应该是这样的。

总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论