redis和redission分布式锁原理及区别
我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。
结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。
1、有的同伴想到了synchronized关键字锁
暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:
2、有的小伙伴可能想到了乐观锁
没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:
- 我们在车的表中添加一个字段:version(int类型)(建议使用这个名称,这样别人看到就会直觉这是乐观锁字段,也可以使用别的名称)
- 查询出该车的数据,数据中就有version字段,假如version=1
select * from u_car where car_id = 10;
- 修改该车的状态为锁定
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1
在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败
乐观锁不是本次的终点,但还是简单说下;
3、使用redis的分布式锁
public boolean lock(string key, v v, int expiretime){ //获取锁 //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有bug //后来redis将加锁和设置时间用同一个命令 //这里是重点,redis.setnx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了 boolean b = false; try { b = redis.setnx(key, v, expiretime); } catch (exception e) { log.error(e.getmessage(), e); } return b; } public boolean unlock(string key){ return redis.delete(key); } }
但是这样写还是存在bug的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setnx(key, v, expiretime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的bug;
实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置n秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;
4、使用redission的分布式锁
//引入依赖 <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.redisson</groupid> <artifactid>redisson-spring-boot-starter</artifactid> <version>3.10.6</version> </dependency>
redisson支持单点、集群等模式,这里选择单点的。
- application.yml配置好redis的连接:
spring: redis: host: 127.0.0.1 port: 6379 password:
- 配置redisson的客户端bean
@configuration public class redisconfig { @value("${spring.redis.host}") private string host; @bean(name = {"redistemplate", "stringredistemplate"}) public stringredistemplate stringredistemplate(redisconnectionfactory factory) { stringredistemplate redistemplate = new stringredistemplate(); redistemplate.setconnectionfactory(factory); return redistemplate; } @bean public redisson redisson() { config config = new config(); config.usesingleserver().setaddress("redis://" + host + ":6379"); return (redisson) redisson.create(config); } }
- 加锁使用
private logger log = loggerfactory.getlogger(getclass()); @resource private redisson redisson; //加锁 public boolean lock(string key,long waittime,long leasetime){ boolean b = false; try { rlock rlock = redisson.getlock(key); //说下参数 waittime:锁的存活时间 leasetime:锁的延长时间 后面的参数是单位 b = rlock.trylock(waittime,leasetime,timeunit.seconds); } catch (exception e) { log.error(e.getmessage(), e); } } return b; } //释放锁 public void unlock(string key){ try { rlock rlock = redisson.getlock(key); if(null!=lock){ lock.unlock(); lock.forceunlock(); filelog.info("unlock succesed"); } } catch (exception e) { filelog.error(e.getmessage(), e); } }
- 带大家看下trylock方法的实现源码:
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception { long time = unit.tomillis(waittime); long current = system.currenttimemillis(); long threadid = thread.currentthread().getid(); //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间 long ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } //如果waittime已经超时了,就返回false time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } current = system.currenttimemillis(); rfuture<redissonlockentry> subscribefuture = subscribe(threadid); if (!await(subscribefuture, time, timeunit.milliseconds)) { if (!subscribefuture.cancel(false)) { subscribefuture.oncomplete((res, e) -> { if (e == null) { unsubscribe(subscribefuture, threadid); } }); } acquirefailed(threadid); return false; } try { time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(threadid); return false; } //进入死循环,反复去调用tryacquire尝试获取锁,ttl为null时就是别的线程已经unlock了 while (true) { long currenttime = system.currenttimemillis(); ttl = tryacquire(leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } // waiting for message currenttime = system.currenttimemillis(); if (ttl >= 0 && ttl < time) { getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds); } else { getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds); } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(threadid); return false; } } } finally { unsubscribe(subscribefuture, threadid); } // return get(trylockasync(waittime, leasetime, unit)); }
可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。
- 再看看tryacquire方法
- 这个方法的调用栈也是比较多,之后会进入下面这个方法
上面的lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:
- keys[1]代表你的key
- argv[1]代表你的key的存活时间,默认存活30秒
- argv[2]代表的是请求加锁的客户端id,后面的1则理解为加锁的次数,简单理解就是 如果该客户端多次对key加锁时,就会执行hincrby原子加1命令
第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key argv[2],1)加锁和设置redis call(pexpire key argv[1])存活时间;
当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的id,很明显不是;
则进入到最后的return返回该key的剩余存活时间
当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间
所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒
所以,总体流程应该是这样的。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论