1.需求
我们公司想实现一个简单的分布式锁,用于服务启动初始化执行init方法的时候,只执行一次,避免重复执行加载缓存规则的代码,还有预防高并发流程发起部分,产品超发,多发问题。所以结合网上信息,自己简单实现了一个redis分布式锁,可以进行单次资源锁定,排队锁定(没有实现权重,按照时间长短争夺锁信息),还有锁定业务未完成,需要延期锁等简单方法,死锁则是设置过期时间即可。期间主要用到的技术为redis,延时线程池,lua脚本,比较简单,此处记录一下,方便下次学习查看。
2.具体实现
整体配置相对简单,主要是编写redisutil工具类,实现redis的简单操作,编写分布式锁类simpledistributelock,主要内容都在此锁的实现类中,simpledistributelock实现类主要实现方法如下:
- 1.一次抢夺加锁方法 trylock
- 2.连续排队加锁方法trycontinuelock,此方法中间有调用线程等待thread.sleep方法防止防止stackoverflow异常,比较耗费资源,后续应该需要优化处理
- 3.重入锁tryreentrantlock,一个资源调用过程中,处于加锁状态仍然可以再次加锁,重新刷新其过期时间
- 4.刷新锁过期时间方法resetlockexpire
- 5.释放锁方法,注意,释放过程中需要传入加锁的value信息,以免高并发情况下多线程锁信息被其他线程释放锁操作误删
2.1 redis基本操作工具类redisutil
package cn.git.redis; import cn.hutool.core.util.idutil; import cn.hutool.core.util.objectutil; import cn.hutool.core.util.strutil; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.dao.dataaccessexception; import org.springframework.data.geo.*; import org.springframework.data.redis.connection.redisconnection; import org.springframework.data.redis.connection.redisgeocommands; import org.springframework.data.redis.core.rediscallback; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.core.script.redisscript; import org.springframework.util.collectionutils; import java.nio.charset.standardcharsets; import java.util.list; import java.util.map; import java.util.set; import java.util.concurrent.timeunit; /** * @program: bank-credit-sy * @description: 封装redis的工具类 * @author: lixuchun * @create: 2021-01-23 11:53 */ public class redisutil { /** * 模糊查询匹配 */ private static final string fuzzy_enquiry_key = "*"; @autowired @qualifier("redistemplate") private redistemplate<string, object> redistemplate; public void setredistemplate(redistemplate<string, object> redistemplate) { this.redistemplate = redistemplate; } /** * 指定缓存失效时间 * * @param key 键 * @param time 时间(秒) * @return */ public boolean expire(string key, long time) { try { if (time > 0) { redistemplate.expire(key, time, timeunit.seconds); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 根据key 获取过期时间 * * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getexpire(string key) { return redistemplate.getexpire(key, timeunit.seconds); } /** * 判断key是否存在 * * @param key 键 * @return true 存在 false不存在 */ public boolean haskey(string key) { try { return redistemplate.haskey(key); } catch (exception e) { return false; } } /** * 删除缓存 * * @param key 可以传一个值 或多个 */ @suppresswarnings("unchecked") public void del(string... key) { if (key != null && key.length > 0) { if (key.length == 1) { redistemplate.delete(key[0]); } else { redistemplate.delete(collectionutils.arraytolist(key)); } } } /** * 普通缓存获取 * * @param key 键 * @return 值 */ public object get(string key) { return key == null ? null : redistemplate.opsforvalue().get(key); } /** * 普通缓存放入 * * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(string key, object value) { try { redistemplate.opsforvalue().set(key, value); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 普通缓存放入并设置时间 * * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(string key, object value, long time) { try { if (time > 0) { redistemplate.opsforvalue().set(key, value, time, timeunit.seconds); } else { set(key, value); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 如果不存在,则设置对应key,value 键值对,并且设置过期时间 * @param key 锁key * @param value 锁值 * @param time 时间单位second * @return 设定结果 */ /** public boolean setnxex(string key, string value, long time) { boolean setresult = (boolean) redistemplate.execute((rediscallback) connection -> { redisstringcommands.setoption setoption = redisstringcommands.setoption.ifabsent(); // 设置过期时间 expiration expiration = expiration.seconds(time); // 执行setnx操作 boolean result = connection.set(key.getbytes(standardcharsets.utf_8), value.getbytes(standardcharsets.utf_8), expiration, setoption); return result; }); return setresult; } **/ /** * 如果不存在,则设置对应key,value 键值对,并且设置过期时间 * @param key 锁key * @param value 锁值 * @param time 时间单位second * @return 设定结果 */ public boolean setnxex(string key, string value, long time) { return redistemplate.opsforvalue().setifabsent(key, value, time, timeunit.seconds); } /** * 递增 * * @param key 键 * @return */ public long incr(string key, long delta) { if (delta < 0) { throw new runtimeexception("递增因子必须大于0"); } return redistemplate.opsforvalue().increment(key, delta); } /** * 递减 * * @param key 键 * @return */ public long decr(string key, long delta) { if (delta < 0) { throw new runtimeexception("递减因子必须大于0"); } return redistemplate.opsforvalue().increment(key, -delta); } /** * hashget * * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public object hget(string key, string item) { return redistemplate.opsforhash().get(key, item); } /** * 获取hashkey对应的所有键值 * * @param key 键 * @return 对应的多个键值 */ public map<object, object> hmget(string key) { return redistemplate.opsforhash().entries(key); } /** * 获取hashkey对应的所有键值 * * @param key 键 * @return 对应的多个键值 */ public list<object> hmget(string key, list<object> itemlist) { return redistemplate.opsforhash().multiget(key, itemlist); } /** * 获取key对应的hashkey值 * * @param key 键 * @param hashkey 键 * @return 对应的键值 */ public object hmget(string key, string hashkey) { return redistemplate.opsforhash().get(key, hashkey); } /** * hashset * * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(string key, map<string, object> map) { try { redistemplate.opsforhash().putall(key, map); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * hashset 并设置时间 * * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(string key, map<object, object> map, long time) { try { redistemplate.opsforhash().putall(key, map); if (time > 0) { expire(key, time); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(string key, string item, object value) { try { redistemplate.opsforhash().put(key, item, value); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(string key, string item, object value, long time) { try { redistemplate.opsforhash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(string key, object... item) { redistemplate.opsforhash().delete(key, item); } /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hhaskey(string key, string item) { return redistemplate.opsforhash().haskey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * * @param key 键 * @param item 项 * @param by 要增加几(大于0) * @return */ public double hincr(string key, string item, double by) { return redistemplate.opsforhash().increment(key, item, by); } /** * hash递减 * * @param key 键 * @param item 项 * @param by 要减少记(小于0) * @return */ public double hdecr(string key, string item, double by) { return redistemplate.opsforhash().increment(key, item, -by); } /** * 根据key获取set中的所有值 * * @param key 键 * @return */ public set<object> sget(string key) { try { return redistemplate.opsforset().members(key); } catch (exception e) { e.printstacktrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean shaskey(string key, object value) { try { return redistemplate.opsforset().ismember(key, value); } catch (exception e) { e.printstacktrace(); return false; } } /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sset(string key, object... values) { try { return redistemplate.opsforset().add(key, values); } catch (exception e) { e.printstacktrace(); return 0; } } /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long ssetandtime(string key, long time, object... values) { try { long count = redistemplate.opsforset().add(key, values); if (time > 0) { expire(key, time); } return count; } catch (exception e) { e.printstacktrace(); return 0; } } /** * 获取set缓存的长度 * * @param key 键 * @return */ public long sgetsetsize(string key) { try { return redistemplate.opsforset().size(key); } catch (exception e) { e.printstacktrace(); return 0; } } /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setremove(string key, object... values) { try { long count = redistemplate.opsforset().remove(key, values); return count; } catch (exception e) { e.printstacktrace(); return 0; } } /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public list<object> lget(string key, long start, long end) { try { return redistemplate.opsforlist().range(key, start, end); } catch (exception e) { e.printstacktrace(); return null; } } /** * 获取list缓存的长度 * * @param key 键 * @return */ public long lgetlistsize(string key) { try { return redistemplate.opsforlist().size(key); } catch (exception e) { e.printstacktrace(); return 0; } } /** * 通过索引 获取list中的值 * * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 * @return */ public object lgetindex(string key, long index) { try { return redistemplate.opsforlist().index(key, index); } catch (exception e) { e.printstacktrace(); return null; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */ public boolean lset(string key, object value) { try { redistemplate.opsforlist().rightpush(key, value); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lset(string key, object value, long time) { try { redistemplate.opsforlist().rightpush(key, value); if (time > 0) { expire(key, time); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @return */ public boolean lset(string key, list<object> value) { try { redistemplate.opsforlist().rightpushall(key, value); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lset(string key, list<object> value, long time) { try { redistemplate.opsforlist().rightpushall(key, value); if (time > 0) { expire(key, time); } return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lupdateindex(string key, long index, object value) { try { redistemplate.opsforlist().set(key, index, value); return true; } catch (exception e) { e.printstacktrace(); return false; } } /** * 移除n个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lremove(string key, long count, object value) { try { long remove = redistemplate.opsforlist().remove(key, count, value); return remove; } catch (exception e) { e.printstacktrace(); return 0; } } public void testadd(double x, double y, string accountid) { long addednum = redistemplate.opsforgeo() .add("citygeokey", new point(x, y), accountid); system.out.println(addednum); } public long addgeopoin() { point point = new point(123.05778991994906, 41.188314667658965); long addednum = redistemplate.opsforgeo().geoadd("citygeokey", point, 3); return addednum; } public void testnearbyplace() { distance distance = new distance(100, metrics.kilometers); redisgeocommands.georadiuscommandargs args = redisgeocommands .georadiuscommandargs .newgeoradiusargs() .includedistance() .includecoordinates() .sortascending() .limit(5); georesults<redisgeocommands.geolocation<object>> results = redistemplate.opsforgeo() .radius("citygeokey", "北京", distance, args); system.out.println(results); } public georesults<redisgeocommands.geolocation<object>> testgeonearbyxy(double x, double y) { distance distance = new distance(100, metrics.kilometers); circle circle = new circle(x, y, metrics.kilometers.getmultiplier()); redisgeocommands.georadiuscommandargs args = redisgeocommands .georadiuscommandargs .newgeoradiusargs() .includedistance() .includecoordinates() .sortascending(); georesults<redisgeocommands.geolocation<object>> results = redistemplate.opsforgeo() .radius("citygeokey", circle, distance, args); system.err.println(results); return results; } /** * @description: 执行lua脚本,只对key进行操作 * @param: [redisscript, keys] * @return: java.lang.long * @date: 2021/2/21 15:00 */ public long executelua(redisscript<long> redisscript, list keys) { return redistemplate.execute(redisscript, keys); } /** * @description: 执行lua脚本,只对key进行操作 * @param: [redisscript, keys, value] * @return: java.lang.long * @date: 2021/2/21 15:00 */ public long executeluacustom(redisscript<long> redisscript, list keys, object ...value) { return redistemplate.execute(redisscript, keys, value); } /** * @description: 执行lua脚本,只对key进行操作 * @param: [redisscript, keys, value] * @return: java.lang.long * @date: 2021/2/21 15:00 */ public boolean executebooleanluacustom(redisscript<boolean> redisscript, list keys, object ...value) { return redistemplate.execute(redisscript, keys, value); } /** * 时间窗口限流 * @param key key * @param timewindow 时间窗口 * @return */ public integer rangebyscore(string key, integer timewindow) { // 获取当前时间戳 long currenttime = system.currenttimemillis(); set<object> rangeset = redistemplate.opsforzset().rangebyscore(key, currenttime - timewindow, currenttime); if (objectutil.isnotnull(rangeset)) { return rangeset.size(); } else { return 0; } } /** * 新增zset * @param key */ public string addzset(string key) { string value = idutil.simpleuuid(); long currenttime = system.currenttimemillis(); redistemplate.opsforzset().add(key, value, currenttime); return value; } /** * 删除zset * @param key */ public void removezset(string key, string value) { // 参数存在校验 if (objectutil.isnotnull(redistemplate.opsforzset().score(key, value))) { redistemplate.opsforzset().remove(key, value); } } /** * 通过前缀key值获取所有key内容(hash) * @param keyprefix 前缀key * @param fieldarray 查询对象列信息 */ public list<object> getprefixkeys(string keyprefix, byte[][] fieldarray) { if (strutil.isblank(keyprefix)) { return null; } keyprefix = keyprefix.concat(fuzzy_enquiry_key); // 所有完整key值 set<string> keyset = redistemplate.keys(keyprefix); list<object> objectlist = redistemplate.executepipelined(new rediscallback<object>() { /** * gets called by {@link redistemplate} with an active redis connection. does not need to care about activating or * closing the connection or handling exceptions. * * @param connection active redis connection * @return a result object or {@code null} if none * @throws dataaccessexception */ @override public object doinredis(redisconnection connection) throws dataaccessexception { for (string key : keyset) { connection.hmget(key.getbytes(standardcharsets.utf_8), fieldarray); } return null; } }); return objectlist; } }
2.2 simpledistributelock实现
具体锁以及解锁业务实现类,具体如下所示
package cn.git.common.lock; import cn.git.common.exception.serviceexception; import cn.git.redis.redisutil; import cn.hutool.core.util.idutil; import cn.hutool.core.util.objectutil; import cn.hutool.core.util.strutil; import lombok.extern.slf4j.slf4j; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.script.defaultredisscript; import org.springframework.data.redis.core.script.redisscript; import org.springframework.stereotype.component; import java.util.collections; import java.util.concurrent.copyonwritearrayset; import java.util.concurrent.executors; import java.util.concurrent.scheduledexecutorservice; import java.util.concurrent.timeunit; /** * 简单分布式锁 * 可以实现锁的重入,锁自动延期 * @program: bank-credit-sy * @author: lixuchun * @create: 2022-04-25 */ @slf4j @component public class simpledistributelock { /** * 活跃的锁集合 */ private volatile static copyonwritearrayset active_key_set = new copyonwritearrayset(); /** * 定时线程池,续期使用 */ private static scheduledexecutorservice executor_service = executors.newscheduledthreadpool(5); /** * 解锁脚本, 脚本参数 keys[1]: 传入的key, argv[1]: 传入的value * // 如果没有key,直接返回1 * if redis.call('exists',keys[1]) == 0 then * return 1 * else * // 如果key存在,并且value与传入的value相等,删除key,返回1,如果值不等,返回0 * if redis.call('get',keys[1]) == argv[1] then * return redis.call('del',keys[1]) * else * return 0 * end * end */ private static final string unlock_script = "if redis.call('exists',keys[1]) == 0 then return 1 else if redis.call('get',keys[1]) == argv[1] then return redis.call('del',keys[1]) else return 0 end end"; /** * lua脚本参数介绍 keys[1]:传入的key argv[1]:传入的value argv[2]:传入的过期时间 * // 如果成功设置keys,value值,然后设定过期时间,直接返回1 * if redis.call('setnx', keys[1], argv[1]) == 1 then * redis.call('expire', keys[1], tonumber(argv[2])) * return 1 * else * // 如果key存在,并且value值相等,则重置过期时间,直接返回1,值不等则返回0 * if redis.call('get', keys[1]) == argv[1] then * redis.call('expire', keys[1], tonumber(argv[2])) * return 1 * else * return 0 * end * end */ private static final string reentrant_lock_lua = "if redis.call('setnx', keys[1], argv[1]) == 1 then redis.call('expire', keys[1], tonumber(argv[2])) return 1 else if redis.call('get', keys[1]) == argv[1] then redis.call('expire', keys[1], tonumber(argv[2])) return 1 else return 0 end end"; /** * 续期脚本 * // 如果key存在,并且value值相等,则重置过期时间,直接返回1,值不等则返回0 * if redis.call('exists',keys[1]) == 1 and redis.call('get',keys[1]) == argv[1] then * redis.call('expire',keys[1],tonumber(argv[2])) * return 1 * else * return 0 * end */ public static final string expire_lua = "if redis.call('exists',keys[1]) == 1 and redis.call('get',keys[1]) == argv[1] then redis.call('expire',keys[1], tonumber(argv[2])) return 1 else return 0 end"; /** * 释放锁失败标识 */ private static final long release_ok_flag = 0l; /** * 最大重试时间间隔,单位毫秒 */ private static final int max_retry_delay_ms = 2000; @autowired private redisutil redisutil; /** * 加锁方法 * @param locktypeenum 锁信息 * @param customkey 自定义锁定key * @return true 成功,false 失败 */ public string trylock(locktypeenum locktypeenum, string customkey) { // 锁对应值信息 string lockvalue = idutil.simpleuuid(); // 对自定义key进行加锁操作,value值与key值相同 boolean result = redisutil.setnxex(locktypeenum.getlocktype().concat(strutil.colon).concat(customkey), lockvalue, locktypeenum.getexpiretime().intvalue()); if (result) { log.info("[{}]加锁成功!", locktypeenum.getlocktype().concat(strutil.colon).concat(customkey)); return lockvalue; } return null; } /** * 进行加锁,加锁失败,再次进行加锁直到加锁成功 * @param locktypeenum 分布式锁类型设定enum * @param customkey 自定义key * @return */ public string trycontinuelock(locktypeenum locktypeenum, string customkey) { // 锁对应值信息 string lockvalue = idutil.simpleuuid(); // 设置最大重试次数 int maxretries = 10; // 初始重试间隔,可调整 int retryintervalms = 100; for (int attempt = 1; attempt <= maxretries; attempt++) { // 对自定义key进行加锁操作,value值与key值相同 boolean result = redisutil.setnxex(locktypeenum.getlocktype().concat(strutil.colon).concat(customkey), lockvalue, locktypeenum.getexpiretime().intvalue()); if (result) { log.info("[{}] 加锁成功!", locktypeenum.getlocktype().concat(strutil.colon).concat(customkey)); return lockvalue; } /** * 如果未能获取锁,计算下一次重试间隔(可使用指数退避策略), max_retry_delay_ms 为最大重试间隔 * 这行代码用于计算下一次重试前的等待间隔(delay)。这里采用了指数退避策略,这是一种常用的重试间隔设计方法,旨在随着重试次数的增加逐步增大等待间隔,同时限制其增长上限。 * 1. (1 << (attempt - 1)):这是一个二进制左移运算,相当于将 1 左移 attempt - 1 位。对于整数 attempt,该表达式的结果等于 2^(attempt - 1)。随着 attempt 增加,结果值按指数级增长(1, 2, 4, 8, ...),符合指数退避策略的要求。 * 2. * retryintervalms:将上述结果乘以基础重试间隔 retryintervalms,得到实际的等待时间(单位为毫秒)。 * 3. math.min(..., max_retry_delay_ms):确保计算出的 delay 值不超过预设的最大重试间隔 max_retry_delay_ms。这样做可以防止在极端情况下因等待时间过长而导致系统响应缓慢或其他问题。 */ int delay = math.min((1 << (attempt - 1)) * retryintervalms, max_retry_delay_ms); /** * 使用 try-catch 块包裹线程休眠操作,以处理可能抛出的 interruptedexception 异常。 * 1. thread.sleep(delay):让当前线程进入休眠状态,暂停执行指定的 delay 时间(之前计算得出的重试间隔)。在此期间,线程不会消耗 cpu 资源,有助于减轻系统压力。 * 2. catch (interruptedexception e):捕获在休眠过程中被中断时抛出的 interruptedexception。线程中断通常用于请求线程提前结束其当前任务或进入某个特定状态。 * 3. thread.currentthread().interrupt();:当捕获到 interruptedexception 时,恢复线程的中断状态。这是因为在处理中断时,thread.sleep() 方法会清除中断状态。通过重新设置中断状态,通知后续代码(如其他 catch 子句或 finally 子句)或外部代码当前线程已被中断。 * 4. throw new runtimeexception(e);:将捕获到的 interruptedexception 包装成一个新的 runtimeexception 并抛出。这样做是为了向上层代码传递中断信号,并保留原始异常堆栈信息以供调试。根据具体应用需求,可以选择抛出自定义interruptedexception`。 */ try { thread.sleep(delay); } catch (interruptedexception e) { // 保持中断状态 thread.currentthread().interrupt(); throw new runtimeexception(e); } } throw new serviceexception("failed to acquire lock after " + maxretries + " attempts"); } /** * 重入锁 * @param locktypeenum 锁定类型 * @param value 锁定值,一般为线程id或者uuid * @param customkey 自定义key * @return */ public boolean tryreentrantlock(locktypeenum locktypeenum, string value, string customkey) { // 设置释放锁定key,value值 string lockkey = locktypeenum.getlocktype().concat(strutil.colon).concat(customkey); // 设置重入锁脚本信息 defaultredisscript<boolean> defaultredisscript = new defaultredisscript<>(); // boolean 对应 lua脚本返回的0,1 defaultredisscript.setresulttype(boolean.class); // 设置重入锁脚本信息 defaultredisscript.setscripttext(reentrant_lock_lua); // 进行重入锁执行 boolean executeresult = redisutil.executebooleanluacustom(defaultredisscript, collections.singletonlist(lockkey), value, locktypeenum.getexpiretime().intvalue()); if (executeresult) { // 设置当前key为激活状态 active_key_set.add(lockkey); // 设置定时任务,进行续期操作 resetlockexpire(locktypeenum, customkey, value, locktypeenum.getexpiretime()); } return executeresult; } /** * 进行续期操作 * @param locktypeenum 锁定类型 * @param customkey 自定义key * @param value 锁定值,一般为线程id或者uuid * @param expiretime 过期时间 单位秒, */ public void resetlockexpire(locktypeenum locktypeenum, string customkey, string value, long expiretime) { // 续期的key信息 string resetkey = locktypeenum.getlocktype().concat(strutil.colon).concat(customkey); // 校验当前key是否还在执行过程中 if (!active_key_set.contains(resetkey)) { return; } // 时间设定延迟执行时间delay,默认续期时间是过期时间的1/3,在获取锁之后每expiretime/3时间进行一次续期操作 long delay = expiretime <= 3 ? 1 : expiretime / 3; executor_service.schedule(() -> { log.info("自定义key[{}],对应值[{}]开始执行续期操作!", resetkey, value); // 执行续期操作,如果续期成功则再次添加续期任务,如果续期成功,进行下一次定时任务续期 defaultredisscript<boolean> defaultredisscript = new defaultredisscript<>(); // boolean 对应 lua脚本返回的0,1 defaultredisscript.setresulttype(boolean.class); // 设置重入锁脚本信息 defaultredisscript.setscripttext(expire_lua); // 进行重入锁执行 boolean executelua = redisutil.executebooleanluacustom(defaultredisscript, collections.singletonlist(resetkey), value, locktypeenum.getexpiretime().intvalue()); if (executelua) { log.info("执行key[{}],value[{}]续期成功,进行下一次续期操作", resetkey, value); resetlockexpire(locktypeenum, customkey, value, expiretime); } else { // 续期失败处理,移除活跃key信息 active_key_set.remove(resetkey); } }, delay, timeunit.seconds); } /** * 解锁操作 * @param locktypeenum 锁定类型 * @param customkey 自定义key * @param releasevalue 释放value * @return true 成功,false 失败 */ public boolean releaselock(locktypeenum locktypeenum, string customkey, string releasevalue) { // 各个模块服务启动时间差,预留5秒等待时间,防止重调用 if (objectutil.isnotnull(locktypeenum.getlockedwaittimemiles())) { try { thread.sleep(locktypeenum.getlockedwaittimemiles()); } catch (interruptedexception e) { e.printstacktrace(); } } // 设置释放锁定key,value值 string releasekey = locktypeenum.getlocktype().concat(strutil.colon).concat(customkey); // 释放锁定资源 redisscript<long> longdefaultredisscript = new defaultredisscript<>(unlock_script, long.class); long result = redisutil.executeluacustom(longdefaultredisscript, collections.singletonlist(releasekey), releasevalue); // 根据返回结果判断是否成功成功匹配并删除 redis 键值对,若果结果不为空和0,则验证通过 if (objectutil.isnotnull(result) && result != release_ok_flag) { // 当前key释放成功,从活跃生效keyset中移除 active_key_set.remove(releasekey); return true; } return false; } }
注意,lua脚本执行过程中有时候会有执行失败情况,这些情况下异常信息很难捕捉,所以可以在lua脚本中设置日志打印,但是需要注意,需要配置redis配置文件,打开日志信息,此处以重入锁为例子,具体配置以及脚本信息如下:
- 1.redis配置日志级别,日志存储位置信息
# 日志级别,可以设置为 debug、verbose、notice、warning,默认为 notice loglevel notice # 日志文件路径 logfile "/path/to/redis-server.log"
- 2.配置lua脚本信息
local function log(level, message) redis.log(level, "[distributed_lock]: " .. message) end if redis.call('setnx', keys[1], argv[1]) == 1 then log(redis.log_notice, "successfully acquired lock with key: " .. keys[1]) local expire_result = redis.call('expire', keys[1], tonumber(argv[2])) if expire_result == 1 then log(redis.log_notice, "set expiration of " .. argv[2] .. " seconds on lock.") else log(redis.log_warning, "failed to set expiration on lock with key: " .. keys[1]) end return 1 else local current_value = redis.call('get', keys[1]) if current_value == argv[1] then log(redis.log_notice, "lock already held by this client; renewing expiration.") local expire_result = redis.call('expire', keys[1], tonumber(argv[2])) if expire_result == 1 then log(redis.log_notice, "renewed expiration of " .. argv[2] .. " seconds on lock.") else log(redis.log_warning, "failed to renew expiration on lock with key: " .. keys[1]) end return 1 else log(redis.log_debug, "lock is held by another client; not acquiring.") return 0 end end
2.3 锁枚举类实现
此处使用base_product_test_lock作为测试的锁类型
package cn.git.common.lock; import lombok.getter; /** * 分布式锁类型设定enum * @program: bank-credit-sy * @author: lixuchun * @create: 2022-04-25 */ @getter public enum locktypeenum { /** * 分布式锁类型详情 */ distribute_task_lock("distribute_task_lock", 120l, "xxljob初始化分布式锁", 5000l), cache_init_lock("cache_init_lock", 120l, "缓存平台初始化缓存信息分布式锁", 5000l), rule_init_lock("rule_init_lock", 120l, "规则引擎规则加载初始化", 5000l), sequence_lock("sequence_lock", 120l, "序列信息月末初始化!", 5000l), uaa_online_number_lock("uaa_online_lock", 20l, "登录模块刷新在线人数", 5000l), base_server_idempotence("base_idempotence_lock", 15l, "基础业务幂等性校验"), work_flow_web_service_lock("work_flow_web_service_lock", 15l, "流程webservice服务可用ip地址获取锁", 5000l), base_product_test_lock("base_product_test_lock", 10l, "产品测试分布式锁", null), ; /** * 锁类型 */ private string locktype; /** * 即过期时间,单位为second */ private long expiretime; /** * 枷锁成功后,默认等待时间,时间应小于过期时间,单位毫秒 */ private long lockedwaittimemiles; /** * 描述信息 */ private string lockdesc; /** * 构造方法 * @param locktype 类型 * @param locktime 锁定时间 * @param lockdesc 锁描述 */ locktypeenum(string locktype, long locktime, string lockdesc) { this.lockdesc = lockdesc; this.expiretime = locktime; this.locktype = locktype; } /** * 构造方法 * @param locktype 类型 * @param locktime 锁定时间 * @param lockdesc 锁描述 * @param lockedwaittimemiles 锁失效时间 */ locktypeenum(string locktype, long locktime, string lockdesc, long lockedwaittimemiles) { this.lockdesc = lockdesc; this.expiretime = locktime; this.locktype = locktype; this.lockedwaittimemiles = lockedwaittimemiles; } }
3. 测试
测试分为两部分,模拟多线程清库存产品,10个产品,1000个线程进行争夺,具体实现如下
3.1 测试代码部分
package cn.git.foreign; import cn.git.api.client.esbcommonclient; import cn.git.api.dto.p043001009dto; import cn.git.common.lock.locktypeenum; import cn.git.common.lock.simpledistributelock; import cn.git.foreign.dto.querycreditdto; import cn.git.foreign.manage.foreigncreditcheckapiimpl; import cn.hutool.core.util.idutil; import cn.hutool.core.util.strutil; import com.alibaba.fastjson.jsonobject; import lombok.allargsconstructor; import lombok.data; import lombok.noargsconstructor; import lombok.extern.slf4j.slf4j; import org.junit.test; import org.junit.runner.runwith; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.test.context.springboottest; import org.springframework.test.context.junit4.springrunner; import java.util.concurrent.countdownlatch; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit; /** * @description: 分布式锁测试类 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-04-07 08:03:23 */ @slf4j @runwith(springrunner.class) @springboottest(classes = foreignapplication.class) public class distributionlocktest { @autowired private simpledistributelock distributelock; /** * 产品信息 */ private product product = new product("0001", 10, 0, "iphone"); /** * @description: 产品信息 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-04-03 */ @data @noargsconstructor @allargsconstructor public static class product { /** * id */ private string id; /** * 库存 */ private integer stock; /** * 已售 */ private integer sold; /** * 名称 */ private string name; } /** * 释放锁 */ @test public void releaselock() { distributelock.releaselock(locktypeenum.base_product_test_lock, "0001", "xxxx"); } /** * 分布式锁模拟测试 */ @test public void testlock() throws interruptedexception { // 20核心线程,最大线程也是100,非核心线程空闲等待时间10秒,队列最大1000 threadpoolexecutor executor = new threadpoolexecutor(100, 100, 10, timeunit.seconds, new linkedblockingqueue<>(10000)); // 模拟1000个请求 countdownlatch countdownlatch = new countdownlatch(1000); // 模拟10000个人抢10个商品 for (int i = 0; i < 1000; i++) { executor.execute(() -> { // 加锁 // soldbylock(); // 不加锁扣减库存 normalsold(); countdownlatch.countdown(); }); } countdownlatch.await(); executor.shutdown(); // 输出产品信息 system.out.println(jsonobject.tojsonstring(product)); } /** * 加锁减库存 */ public void soldbylock() { // 设置加锁value信息 string lockvalue = idutil.simpleuuid(); try { boolean islocked = distributelock.tryreentrantlock(locktypeenum.base_product_test_lock, lockvalue, product.getid()); if (islocked) { // 加锁成功,开始减库存信息 if (product.getstock() > 0) { product.setstock(product.getstock() - 1); product.setsold(product.getsold() + 1); system.out.println(strutil.format("减库存成功,剩余库存[{}]", product.getstock())); } else { system.out.println("库存不足"); } } // 暂停1000毫秒,模拟业务处理 try { thread.sleep(1000); } catch (interruptedexception e) { throw new runtimeexception(e); } } catch (exception e) { e.printstacktrace(); } finally { distributelock.releaselock(locktypeenum.base_product_test_lock, product.getid(), lockvalue); } } /** * 不加锁减库存 */ public void normalsold() { // 获取线程id long id = thread.currentthread().getid(); // 暂停1000毫秒,模拟业务处理 try { thread.sleep(1000); } catch (interruptedexception e) { throw new runtimeexception(e); } // 开始库存计算 if (product.getstock() > 0) { product.setstock(product.getstock() - 1); product.setsold(product.getsold() + 1); system.out.println(strutil.format("线程[{}]减库存成功,剩余库存[{}]", id, product.getstock())); } else { system.out.println("库存不足"); } } }
3.2 无锁库存处理情况
无锁情况下,发生产品超发情况,卖出11个产品,具体如下图
3.3 加锁处理情况
多次实验,没有发生产品超发情况,具体测试结果如下:
4. 其他实现
还可以使用redisson客户端进行分布式锁实现,这样更加简单安全,其有自己的看门狗机制,续期加锁解锁都更加方便,简单操作过程实例代码如下
import org.redisson.redisson; import org.redisson.api.rlock; import org.redisson.api.redissonclient; import org.redisson.config.config; import java.util.concurrent.timeunit; /** * @description: redisson客户端实现 * @program: bank-credit-sy * @author: lixuchun * @create: 2022-07-12 09:03:23 */ public class redissonlockexample { public static void main(string[] args) { // 配置redisson客户端 config config = new config(); config.usesingleserver().setaddress("redis://localhost:6379"); redissonclient redisson = redisson.create(config); // 获取锁对象 rlock lock = redisson.getlock("mylock"); try { // 尝试获取锁,最多等待100秒,锁定之后10秒自动释放 // 锁定之后会自动续期10秒 if (lock.trylock(100, 10, timeunit.seconds)) { try { // 处理业务逻辑 } finally { // 释放锁 lock.unlock(); } } } catch (interruptedexception e) { // 处理中断异常 thread.currentthread().interrupt(); } finally { // 关闭redisson客户端 redisson.shutdown(); } } }
其中锁定api是否开启看门狗,整理如下
// 开始拿锁,失败阻塞重试 rlock lock = redissonclient.getlock("guodong"); // 具有watch dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s lock.lock(); // 尝试拿锁10s后停止重试,返回false 具有watch dog 自动延期机制 默认续30s boolean res1 = lock.trylock(10, timeunit.seconds); // 尝试拿锁10s后,没有watch dog lock.lock(10, timeunit.seconds); // 没有watch dog ,10s后自动释放 lock.lock(10, timeunit.seconds); // 尝试拿锁100s后停止重试,返回false 没有watch dog ,10s后自动释放 boolean res2 = lock.trylock(100, 10, timeunit.seconds);
到此这篇关于redis分布式锁实现示例的文章就介绍到这了,更多相关redis分布式锁内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论