redisson 分布式锁原理
1. 工具类
package com.meta.mall.common.utils; import lombok.extern.slf4j.slf4j; import org.redisson.api.rlock; import org.redisson.api.redissonclient; import org.springframework.stereotype.component; import javax.annotation.resource; import java.util.concurrent.timeunit; /** * redisson 分布式工具类 * * @author gaoyang * @date 2022-05-14 08:58 */ @slf4j @component public class redissonutils { @resource private redissonclient redissonclient; /** * 加锁 * * @param lockkey */ public void lock(string lockkey) { rlock lock = redissonclient.getlock(lockkey); lock.lock(); } /** * 带过期时间的锁 * * @param lockkey key * @param leasetime 上锁后自动释放锁时间 */ public void lock(string lockkey, long leasetime) { rlock lock = redissonclient.getlock(lockkey); lock.lock(leasetime, timeunit.seconds); } /** * 带超时时间的锁 * * @param lockkey key * @param leasetime 上锁后自动释放锁时间 * @param unit 时间单位 */ public void lock(string lockkey, long leasetime, timeunit unit) { rlock lock = redissonclient.getlock(lockkey); lock.lock(leasetime, unit); } /** * 尝试获取锁 * * @param lockkey key * @return */ public boolean trylock(string lockkey) { rlock lock = redissonclient.getlock(lockkey); return lock.trylock(); } /** * 尝试获取锁 * * @param lockkey key * @param waittime 最多等待时间 * @param leasetime 上锁后自动释放锁时间 * @return boolean */ public boolean trylock(string lockkey, long waittime, long leasetime) { rlock lock = redissonclient.getlock(lockkey); try { return lock.trylock(waittime, leasetime, timeunit.seconds); } catch (interruptedexception e) { log.error("redissonutils - trylock异常", e); } return false; } /** * 尝试获取锁 * * @param lockkey key * @param waittime 最多等待时间 * @param leasetime 上锁后自动释放锁时间 * @param unit 时间单位 * @return boolean */ public boolean trylock(string lockkey, long waittime, long leasetime, timeunit unit) { rlock lock = redissonclient.getlock(lockkey); try { return lock.trylock(waittime, leasetime, unit); } catch (interruptedexception e) { log.error("redissonutils - trylock异常", e); } return false; } /** * 释放锁 * * @param lockkey key */ public void unlock(string lockkey) { rlock lock = redissonclient.getlock(lockkey); lock.unlock(); } /** * 是否存在锁 * * @param lockkey key * @return */ public boolean islocked(string lockkey) { rlock lock = redissonclient.getlock(lockkey); return lock.islocked(); } }
2. lock和trylock的区别
1.返回值
- lock 是 void;
- trylock 是 boolean。
2.时机
- lock 一直等锁释放;
- trylock 获取到锁返回true,获取不到锁并直接返回false。
lock拿不到锁会一直等待。trylock是去尝试,拿不到就返回false,拿到返回true。
trylock是可以被打断的,被中断的,lock是不可以。
3. 源码分析
3.1 lock
private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception { // 获取当前线程 id long threadid = thread.currentthread().getid(); // 获取锁,正常获取锁则ttl为null,竞争锁时返回锁的过期时间 long ttl = tryacquire(-1, leasetime, unit, threadid); // lock acquired if (ttl == null) { return; } // 订阅锁释放事件 // 如果当前线程通过 redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争 rfuture<redissonlockentry> future = subscribe(threadid); if (interruptibly) { commandexecutor.syncsubscriptioninterrupted(future); } else { commandexecutor.syncsubscription(future); } try { while (true) { // 循环重试获取锁,直至重新获取锁成功才跳出循环 // 此种做法阻塞进程,一直处于等待锁手动释放或者超时才继续线程 ttl = tryacquire(-1, leasetime, unit, threadid); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { future.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds); } catch (interruptedexception e) { if (interruptibly) { throw e; } future.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds); } } else { if (interruptibly) { future.getnow().getlatch().acquire(); } else { future.getnow().getlatch().acquireuninterruptibly(); } } } } finally { // 最后释放订阅事件 unsubscribe(future, threadid); } // get(lockasync(leasetime, unit)); }
<t> rfuture<t> trylockinnerasync(long waittime, long leasetime, timeunit unit, long threadid, redisstrictcommand<t> command) { return evalwriteasync(getrawname(), longcodec.instance, command, "if (redis.call('exists', keys[1]) == 0) then " + "redis.call('hincrby', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', keys[1], argv[2]) == 1) then " + "redis.call('hincrby', keys[1], argv[2], 1); " + "redis.call('pexpire', keys[1], argv[1]); " + "return nil; " + "end; " + "return redis.call('pttl', keys[1]);", collections.singletonlist(getrawname()), unit.tomillis(leasetime), getlockname(threadid)); }
此段脚本为一段lua脚本:
key[1]: 为你加锁的lock值
argv[2]: 为线程id
argv[1]: 为设置的过期时间
第一个if:
- 判断是否存在设置lock的key是否存在,不存在则利用redis的hash结构设置一个hash,值为1,并设置过期时间,后续返回锁。
第二个if:
- 判断是否存在设置lock的key是否存在,存在此线程的hash,则为这个锁的重入次数加1(将hash值+1),并重新设置过期时间,后续返回锁。
最后返回:
- 这个最后返回不是说最后结果返回,是代表以上两个if都没有进入,则代表处于竞争锁的情况,后续返回竞争锁的过期时间。
3.2 trylock
trylock具有返回值,true或者false,表示是否成功获取锁。
trylock前期获取锁逻辑基本与lock一致,主要是后续获取锁失败的处理逻辑与lock不一致。
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception { long time = unit.tomillis(waittime); long current = system.currenttimemillis(); long threadid = thread.currentthread().getid(); long ttl = tryacquire(waittime, leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } // 获取锁失败后,中途trylock会一直判断中间操作耗时是否已经消耗锁的过期时间,如果消耗完则返回false time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(waittime, unit, threadid); return false; } current = system.currenttimemillis(); // 订阅锁释放事件 // 如果当前线程通过 redis 的 channel 订阅锁的释放事件获取得知已经被释放,则会发消息通知待等待的线程进行竞争. rfuture<redissonlockentry> subscribefuture = subscribe(threadid); // 将订阅阻塞,阻塞时间设置为我们调用trylock设置的最大等待时间,超过时间则返回false if (!subscribefuture.await(time, timeunit.milliseconds)) { if (!subscribefuture.cancel(false)) { subscribefuture.oncomplete((res, e) -> { if (e == null) { unsubscribe(subscribefuture, threadid); } }); } acquirefailed(waittime, unit, threadid); return false; } try { time -= system.currenttimemillis() - current; if (time <= 0) { acquirefailed(waittime, unit, threadid); return false; } // 循环获取锁,但由于上面有最大等待时间限制,基本会在上面返回false while (true) { long currenttime = system.currenttimemillis(); ttl = tryacquire(waittime, leasetime, unit, threadid); // lock acquired if (ttl == null) { return true; } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(waittime, unit, threadid); return false; } // waiting for message currenttime = system.currenttimemillis(); if (ttl >= 0 && ttl < time) { subscribefuture.getnow().getlatch().tryacquire(ttl, timeunit.milliseconds); } else { subscribefuture.getnow().getlatch().tryacquire(time, timeunit.milliseconds); } time -= system.currenttimemillis() - currenttime; if (time <= 0) { acquirefailed(waittime, unit, threadid); return false; } } } finally { unsubscribe(subscribefuture, threadid); } // return get(trylockasync(waittime, leasetime, unit)); }
应尽量使用trylock,且携带参数,因为可设置最大等待时间以及可及时获取加锁返回值,后续可做一些其他加锁失败的业务
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
发表评论