当前位置: 代码网 > it编程>数据库>Redis > redis和redission分布式锁原理及区别说明

redis和redission分布式锁原理及区别说明

2025年08月12日 Redis 我要评论
redis和redission分布式锁原理及区别我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。结合我

redis和redission分布式锁原理及区别

我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。

结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。

1、有的同伴想到了synchronized关键字锁

暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:

2、有的小伙伴可能想到了乐观锁

没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:

  • 我们在车的表中添加一个字段:version(int类型)(建议使用这个名称,这样别人看到就会直觉这是乐观锁字段,也可以使用别的名称)
  • 查询出该车的数据,数据中就有version字段,假如version=1
select * from u_car where car_id = 10;
  • 修改该车的状态为锁定
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1

在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败

乐观锁不是本次的终点,但还是简单说下;

3、使用redis的分布式锁

	public boolean lock(string key, v v, int expiretime){
           //获取锁
           //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有bug
           //后来redis将加锁和设置时间用同一个命令
           //这里是重点,redis.setnx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了
         boolean b = false;
         try {
         	b = redis.setnx(key, v, expiretime);
         } catch (exception e) {
        	log.error(e.getmessage(), e);
    	 }
    	 return b;
    }
    public boolean unlock(string key){
        return redis.delete(key);
    }
}

但是这样写还是存在bug的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setnx(key, v, expiretime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的bug;

实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置n秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;

4、使用redission的分布式锁

//引入依赖
<dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-redis</artifactid>
        </dependency>
        <dependency>
            <groupid>org.redisson</groupid>
            <artifactid>redisson-spring-boot-starter</artifactid>
            <version>3.10.6</version>
        </dependency>

redisson支持单点、集群等模式,这里选择单点的。

  • application.yml配置好redis的连接:
spring:  
    redis:
        host: 127.0.0.1
        port: 6379
        password: 
  • 配置redisson的客户端bean
@configuration
public class redisconfig {
    @value("${spring.redis.host}")
    private string host;
 
    @bean(name = {"redistemplate", "stringredistemplate"})
    public stringredistemplate stringredistemplate(redisconnectionfactory factory) {
        stringredistemplate redistemplate = new stringredistemplate();
        redistemplate.setconnectionfactory(factory);
        return redistemplate;
    }
 
    @bean
    public redisson redisson() {
        config config = new config();
        config.usesingleserver().setaddress("redis://" + host + ":6379");
        return (redisson) redisson.create(config);
    }
 
}
  • 加锁使用
private logger log = loggerfactory.getlogger(getclass());
@resource
private redisson redisson;
//加锁
public boolean lock(string key,long waittime,long leasetime){
	boolean  b = false;
	try {
        rlock rlock = redisson.getlock(key);
        //说下参数 waittime:锁的存活时间 leasetime:锁的延长时间 后面的参数是单位
        b = rlock.trylock(waittime,leasetime,timeunit.seconds);
      } catch (exception e) {
         log.error(e.getmessage(), e);
      } 
    }
    return b;
}
//释放锁
public void unlock(string key){
	try {
		rlock rlock = redisson.getlock(key);
		if(null!=lock){
			lock.unlock();
			lock.forceunlock();
			filelog.info("unlock succesed");
    	}
    } catch (exception e) {
        filelog.error(e.getmessage(), e);
    }
}
  • 带大家看下trylock方法的实现源码:
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {
        long time = unit.tomillis(waittime);
        long current = system.currenttimemillis();
        long threadid = thread.currentthread().getid();
        //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
        long ttl = tryacquire(leasetime, unit, threadid);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //如果waittime已经超时了,就返回false
        time -= system.currenttimemillis() - current;
        if (time <= 0) {
            acquirefailed(threadid);
            return false;
        }
        
        current = system.currenttimemillis();
        rfuture<redissonlockentry> subscribefuture = subscribe(threadid);
        if (!await(subscribefuture, time, timeunit.milliseconds)) {
            if (!subscribefuture.cancel(false)) {
                subscribefuture.oncomplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribefuture, threadid);
                    }
                });
            }
            acquirefailed(threadid);
            return false;
        }
 
        try {
            time -= system.currenttimemillis() - current;
            if (time <= 0) {
                acquirefailed(threadid);
                return false;
            }
            //进入死循环,反复去调用tryacquire尝试获取锁,ttl为null时就是别的线程已经unlock了
            while (true) {
                long currenttime = system.currenttimemillis();
                ttl = tryacquire(leasetime, unit, threadid);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= system.currenttimemillis() - currenttime;
                if (time <= 0) {
                    acquirefailed(threadid);
                    return false;
                }
 
                // waiting for message
                currenttime = system.currenttimemillis();
                if (ttl >= 0 && ttl < time) {
                    getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);
                } else {
                    getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds);
                }
 
                time -= system.currenttimemillis() - currenttime;
                if (time <= 0) {
                    acquirefailed(threadid);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribefuture, threadid);
        }
//        return get(trylockasync(waittime, leasetime, unit));
    }

可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。

  • 再看看tryacquire方法

  • 这个方法的调用栈也是比较多,之后会进入下面这个方法

上面的lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:

  • keys[1]代表你的key
  • argv[1]代表你的key的存活时间,默认存活30秒
  • argv[2]代表的是请求加锁的客户端id,后面的1则理解为加锁的次数,简单理解就是 如果该客户端多次对key加锁时,就会执行hincrby原子加1命令

第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key argv[2],1)加锁和设置redis call(pexpire key argv[1])存活时间;

当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的id,很明显不是;

则进入到最后的return返回该key的剩余存活时间

当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间

所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒

所以,总体流程应该是这样的。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com