我来详细讲解spring boot中实现分布式锁的几种方式,包括手写redis锁和使用redisson框架。
一、基于redis原生命令实现分布式锁
1. 基础版redis分布式锁
@component
public class redisdistributedlock {
@autowired
private stringredistemplate redistemplate;
private static final string lock_prefix = "distributed:lock:";
private static final long default_expire_time = 30000; // 30秒
private static final long default_wait_time = 10000; // 10秒
private static final long default_sleep_time = 100; // 100ms
/**
* 尝试获取分布式锁(简单版)
* @param lockkey 锁的key
* @param value 锁的值(通常用uuid)
* @param expiretime 锁的过期时间(ms)
* @return 是否获取成功
*/
public boolean trylock(string lockkey, string value, long expiretime) {
string key = lock_prefix + lockkey;
// 使用set命令,通过nx参数实现"不存在时设置",ex参数设置过期时间
boolean result = redistemplate.opsforvalue()
.setifabsent(key, value, expiretime, timeunit.milliseconds);
return boolean.true.equals(result);
}
/**
* 释放锁
* 需要确保是自己加的锁才能释放(防止释放别人的锁)
*/
public boolean releaselock(string lockkey, string value) {
string key = lock_prefix + lockkey;
string currentvalue = redistemplate.opsforvalue().get(key);
// 通过lua脚本确保原子性:判断值是否匹配,匹配则删除
string luascript =
"if redis.call('get', keys[1]) == argv[1] then " +
" return redis.call('del', keys[1]) " +
"else " +
" return 0 " +
"end";
defaultredisscript<long> redisscript = new defaultredisscript<>();
redisscript.setscripttext(luascript);
redisscript.setresulttype(long.class);
long result = redistemplate.execute(
redisscript,
collections.singletonlist(key),
value
);
return result != null && result == 1;
}
/**
* 获取锁(支持重试)
*/
public boolean lockwithretry(string lockkey, string value,
long expiretime, long waittime) {
long endtime = system.currenttimemillis() + waittime;
while (system.currenttimemillis() < endtime) {
if (trylock(lockkey, value, expiretime)) {
return true;
}
try {
thread.sleep(default_sleep_time);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
break;
}
}
return false;
}
}
2. 可重入锁实现
@component
public class redisreentrantlock {
@autowired
private stringredistemplate redistemplate;
private static final string lock_prefix = "reentrant:lock:";
private static final threadlocal<map<string, integer>> lock_count =
threadlocal.withinitial(hashmap::new);
/**
* 可重入锁实现
*/
public boolean tryreentrantlock(string lockkey, string clientid,
long expiretime) {
string key = lock_prefix + lockkey;
map<string, integer> lockcountmap = lock_count.get();
int count = lockcountmap.getordefault(lockkey, 0);
// 重入:当前线程已持有锁
if (count > 0) {
lockcountmap.put(lockkey, count + 1);
return true;
}
// 尝试获取锁
string luascript =
"if redis.call('exists', keys[1]) == 0 then " +
" redis.call('hset', keys[1], 'owner', argv[1]) " +
" redis.call('hset', keys[1], 'count', 1) " +
" redis.call('pexpire', keys[1], argv[2]) " +
" return 1 " +
"elseif redis.call('hget', keys[1], 'owner') == argv[1] then " +
" local count = redis.call('hget', keys[1], 'count') " +
" redis.call('hset', keys[1], 'count', tonumber(count) + 1) " +
" redis.call('pexpire', keys[1], argv[2]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
defaultredisscript<long> script = new defaultredisscript<>();
script.setscripttext(luascript);
script.setresulttype(long.class);
long result = redistemplate.execute(
script,
collections.singletonlist(key),
clientid,
string.valueof(expiretime)
);
if (result != null && result == 1) {
lockcountmap.put(lockkey, 1);
return true;
}
return false;
}
/**
* 释放可重入锁
*/
public boolean releasereentrantlock(string lockkey, string clientid) {
string key = lock_prefix + lockkey;
map<string, integer> lockcountmap = lock_count.get();
int count = lockcountmap.getordefault(lockkey, 0);
if (count <= 0) {
return false;
}
if (count > 1) {
// 重入次数减1
lockcountmap.put(lockkey, count - 1);
return true;
}
// 最后一次重入,释放锁
string luascript =
"if redis.call('hget', keys[1], 'owner') == argv[1] then " +
" local current = redis.call('hget', keys[1], 'count') " +
" if tonumber(current) > 1 then " +
" redis.call('hset', keys[1], 'count', tonumber(current) - 1) " +
" redis.call('pexpire', keys[1], argv[2]) " +
" return 0 " +
" else " +
" redis.call('del', keys[1]) " +
" return 1 " +
" end " +
"else " +
" return -1 " +
"end";
defaultredisscript<long> script = new defaultredisscript<>();
script.setscripttext(luascript);
script.setresulttype(long.class);
long result = redistemplate.execute(
script,
collections.singletonlist(key),
clientid,
"30000"
);
if (result != null && result == 1) {
lockcountmap.remove(lockkey);
if (lockcountmap.isempty()) {
lock_count.remove();
}
return true;
}
return false;
}
}
二、使用redisson实现分布式锁(推荐生产环境使用)
1. 添加依赖
<!-- pom.xml -->
<dependency>
<groupid>org.redisson</groupid>
<artifactid>redisson-spring-boot-starter</artifactid>
<version>3.23.5</version>
</dependency>
2. 配置redisson
# application.yml
spring:
redis:
host: localhost
port: 6379
database: 0
timeout: 3000
redisson:
config: |
singleserverconfig:
address: "redis://${spring.redis.host}:${spring.redis.port}"
database: ${spring.redis.database}
connectionpoolsize: 64
connectionminimumidlesize: 24
idleconnectiontimeout: 10000
connecttimeout: ${spring.redis.timeout}
timeout: ${spring.redis.timeout}
retryattempts: 3
retryinterval: 1500
3. redisson分布式锁服务
import org.redisson.api.rlock;
import org.redisson.api.redissonclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import java.util.concurrent.timeunit;
@component
public class redissondistributedlock {
@autowired
private redissonclient redissonclient;
/**
* 可重入锁(最常用)
*/
public boolean tryreentrantlock(string lockkey, long waittime,
long leasetime, timeunit unit) {
rlock lock = redissonclient.getlock(lockkey);
try {
return lock.trylock(waittime, leasetime, unit);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
return false;
}
}
/**
* 公平锁
*/
public boolean tryfairlock(string lockkey, long waittime,
long leasetime, timeunit unit) {
rlock lock = redissonclient.getfairlock(lockkey);
try {
return lock.trylock(waittime, leasetime, unit);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
return false;
}
}
/**
* 联锁(多个锁同时获取)
*/
public boolean trymultilock(string[] lockkeys, long waittime,
long leasetime, timeunit unit) {
rlock[] locks = new rlock[lockkeys.length];
for (int i = 0; i < lockkeys.length; i++) {
locks[i] = redissonclient.getlock(lockkeys[i]);
}
rlock multilock = redissonclient.getmultilock(locks);
try {
return multilock.trylock(waittime, leasetime, unit);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
return false;
}
}
/**
* 红锁(redlock,多个redis实例)
*/
public boolean tryredlock(string lockkey, long waittime,
long leasetime, timeunit unit) {
rlock lock = redissonclient.getlock(lockkey);
try {
return lock.trylock(waittime, leasetime, unit);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
return false;
}
}
/**
* 释放锁
*/
public void unlock(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
if (lock.islocked() && lock.isheldbycurrentthread()) {
lock.unlock();
}
}
/**
* 强制释放锁
*/
public void forceunlock(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
lock.forceunlock();
}
/**
* 自动续期的锁(看门狗机制)
*/
public void lockwithwatchdog(string lockkey) {
rlock lock = redissonclient.getlock(lockkey);
lock.lock(); // 默认30秒,看门狗会自动续期
}
}
4. 使用aop简化分布式锁使用
@target(elementtype.method)
@retention(retentionpolicy.runtime)
@documented
public @interface distributedlock {
/** 锁的key,支持spel表达式 */
string key();
/** 锁类型,默认可重入锁 */
locktype locktype() default locktype.reentrant;
/** 等待时间(秒) */
long waittime() default 5;
/** 持有时间(秒),-1表示使用看门狗自动续期 */
long leasetime() default -1;
/** 时间单位 */
timeunit timeunit() default timeunit.seconds;
enum locktype {
reentrant, // 可重入锁
fair, // 公平锁
read, // 读锁
write, // 写锁
multi, // 联锁
red // 红锁
}
}
@aspect
@component
@slf4j
public class distributedlockaspect {
@autowired
private redissonclient redissonclient;
@autowired
private redissondistributedlock redissondistributedlock;
@around("@annotation(distributedlock)")
public object around(proceedingjoinpoint joinpoint,
distributedlock distributedlock) throws throwable {
string lockkey = parsekey(distributedlock.key(), joinpoint);
rlock lock = getlock(lockkey, distributedlock.locktype());
boolean locked = false;
try {
// 尝试获取锁
if (distributedlock.leasetime() == -1) {
// 使用看门狗自动续期
lock.lock();
} else {
locked = lock.trylock(
distributedlock.waittime(),
distributedlock.leasetime(),
distributedlock.timeunit()
);
}
if (!locked && distributedlock.leasetime() != -1) {
throw new runtimeexception("获取分布式锁失败: " + lockkey);
}
log.info("获取分布式锁成功: {}", lockkey);
return joinpoint.proceed();
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new runtimeexception("获取分布式锁被中断", e);
} finally {
if (lock.islocked() && lock.isheldbycurrentthread()) {
lock.unlock();
log.info("释放分布式锁: {}", lockkey);
}
}
}
private string parsekey(string keyspel, proceedingjoinpoint joinpoint) {
methodsignature signature = (methodsignature) joinpoint.getsignature();
method method = signature.getmethod();
// 解析spel表达式
if (keyspel.startswith("#")) {
expressionparser parser = new spelexpressionparser();
expression expression = parser.parseexpression(keyspel);
evaluationcontext context = new standardevaluationcontext();
context.setvariable("methodname", method.getname());
// 设置参数
object[] args = joinpoint.getargs();
parameter[] parameters = method.getparameters();
for (int i = 0; i < parameters.length; i++) {
context.setvariable(parameters[i].getname(), args[i]);
}
return expression.getvalue(context, string.class);
}
return keyspel;
}
private rlock getlock(string lockkey, distributedlock.locktype locktype) {
switch (locktype) {
case fair:
return redissonclient.getfairlock(lockkey);
case read:
return redissonclient.getreadwritelock(lockkey).readlock();
case write:
return redissonclient.getreadwritelock(lockkey).writelock();
case multi:
case red:
// 简化处理,实际使用需要多个实例
return redissonclient.getlock(lockkey);
case reentrant:
default:
return redissonclient.getlock(lockkey);
}
}
}
三、使用spring integration实现分布式锁
1. 添加依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-integration</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-redis</artifactid>
</dependency>
2. 配置redis锁注册表
@configuration
public class redislockconfiguration {
@bean
public redislockregistry redislockregistry(redisconnectionfactory connectionfactory) {
// 过期时间10秒
return new redislockregistry(connectionfactory, "distributed-lock", 10000l);
}
@bean
public lockregistry lockregistry(redislockregistry redislockregistry) {
return redislockregistry;
}
}
3. 使用spring integration锁
@service
@slf4j
public class orderservice {
@autowired
private lockregistry lockregistry;
public void createorder(string orderid) {
lock lock = lockregistry.obtain("order:" + orderid);
boolean locked = false;
try {
// 尝试获取锁,最多等待5秒
locked = lock.trylock(5, timeunit.seconds);
if (!locked) {
throw new runtimeexception("系统繁忙,请稍后重试");
}
// 执行业务逻辑
processorder(orderid);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new runtimeexception("订单处理被中断", e);
} finally {
if (locked) {
lock.unlock();
}
}
}
private void processorder(string orderid) {
// 订单处理逻辑
log.info("处理订单: {}", orderid);
}
}
四、实际应用示例
1. 商品秒杀场景
@service
@slf4j
public class seckillservice {
@autowired
private redissonclient redissonclient;
@autowired
private stringredistemplate redistemplate;
private static final string seckill_prefix = "seckill:product:";
private static final string lock_prefix = "seckill:lock:";
/**
* 秒杀下单(防超卖)
*/
public boolean seckillorder(long productid, integer quantity, long userid) {
string lockkey = lock_prefix + productid;
rlock lock = redissonclient.getlock(lockkey);
try {
// 尝试获取锁,等待100ms,持有锁3秒
if (!lock.trylock(100, 3000, timeunit.milliseconds)) {
throw new runtimeexception("抢购太火爆,请重试");
}
// 检查库存
string stockkey = seckill_prefix + productid + ":stock";
integer stock = integer.valueof(
redistemplate.opsforvalue().get(stockkey)
);
if (stock == null || stock < quantity) {
throw new runtimeexception("库存不足");
}
// 扣减库存
long newstock = redistemplate.opsforvalue().decrement(stockkey, quantity);
if (newstock < 0) {
// 库存不足,恢复库存
redistemplate.opsforvalue().increment(stockkey, quantity);
throw new runtimeexception("库存不足");
}
// 创建订单
createorder(productid, quantity, userid);
return true;
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new runtimeexception("系统异常", e);
} finally {
if (lock.islocked() && lock.isheldbycurrentthread()) {
lock.unlock();
}
}
}
/**
* 使用注解简化版
*/
@distributedlock(
key = "'seckill:lock:' + #productid",
waittime = 1,
leasetime = 3,
timeunit = timeunit.seconds
)
public boolean seckillorderwithannotation(long productid, integer quantity, long userid) {
// 业务逻辑,无需关心锁的获取和释放
string stockkey = seckill_prefix + productid + ":stock";
integer stock = integer.valueof(
redistemplate.opsforvalue().get(stockkey)
);
if (stock == null || stock < quantity) {
throw new runtimeexception("库存不足");
}
long newstock = redistemplate.opsforvalue().decrement(stockkey, quantity);
if (newstock < 0) {
redistemplate.opsforvalue().increment(stockkey, quantity);
throw new runtimeexception("库存不足");
}
createorder(productid, quantity, userid);
return true;
}
private void createorder(long productid, integer quantity, long userid) {
// 创建订单逻辑
log.info("用户{}抢购商品{},数量{}", userid, productid, quantity);
}
}
2. 定时任务防重复执行
@component
@slf4j
public class scheduledtasks {
@autowired
private redissonclient redissonclient;
/**
* 分布式定时任务,确保集群中只有一个实例执行
*/
@scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
public void syncdatatask() {
string lockkey = "task:sync:data";
rlock lock = redissonclient.getlock(lockkey);
// 不等待,获取不到锁直接返回
boolean locked = lock.trylock();
if (!locked) {
log.info("其他节点正在执行数据同步任务");
return;
}
try {
log.info("开始执行数据同步任务");
// 执行业务逻辑
syncdata();
log.info("数据同步任务完成");
} finally {
lock.unlock();
}
}
private void syncdata() {
// 数据同步逻辑
try {
thread.sleep(5000);
} catch (interruptedexception e) {
thread.currentthread().interrupt();
}
}
}
五、配置和最佳实践
1. redisson配置类
@configuration
public class redissonconfig {
@value("${spring.redis.host}")
private string redishost;
@value("${spring.redis.port}")
private string redisport;
@value("${spring.redis.password:}")
private string password;
@value("${spring.redis.database:0}")
private int database;
@bean(destroymethod = "shutdown")
public redissonclient redissonclient() {
config config = new config();
// 单节点模式
config.usesingleserver()
.setaddress(string.format("redis://%s:%s", redishost, redisport))
.setdatabase(database)
.setpassword(stringutils.hastext(password) ? password : null)
.setconnectionpoolsize(64)
.setconnectionminimumidlesize(10)
.setidleconnectiontimeout(10000)
.setconnecttimeout(3000)
.settimeout(3000)
.setretryattempts(3)
.setretryinterval(1500)
.setpingconnectioninterval(30000)
.setkeepalive(true);
// 锁配置
config.setlockwatchdogtimeout(30000l); // 看门狗超时时间
return redisson.create(config);
}
}
2. 分布式锁工具类
@component
@slf4j
public class distributedlockutil {
@autowired
private redissonclient redissonclient;
/**
* 执行带锁的方法
*/
public <t> t executewithlock(string lockkey, long waittime, long leasetime,
timeunit unit, supplier<t> supplier) {
rlock lock = redissonclient.getlock(lockkey);
boolean locked = false;
try {
locked = lock.trylock(waittime, leasetime, unit);
if (!locked) {
throw new distributedlockexception("获取分布式锁失败: " + lockkey);
}
return supplier.get();
} catch (interruptedexception e) {
thread.currentthread().interrupt();
throw new distributedlockexception("获取分布式锁被中断", e);
} finally {
if (locked && lock.isheldbycurrentthread()) {
lock.unlock();
}
}
}
/**
* 执行带锁的方法(无返回值)
*/
public void executewithlock(string lockkey, long waittime, long leasetime,
timeunit unit, runnable runnable) {
executewithlock(lockkey, waittime, leasetime, unit, () -> {
runnable.run();
return null;
});
}
/**
* 执行带锁的方法(快速失败)
*/
public <t> optional<t> tryexecutewithlock(string lockkey, long waittime,
long leasetime, timeunit unit,
supplier<t> supplier) {
try {
return optional.ofnullable(
executewithlock(lockkey, waittime, leasetime, unit, supplier)
);
} catch (distributedlockexception e) {
log.warn("获取锁失败,跳过执行: {}", lockkey);
return optional.empty();
}
}
public static class distributedlockexception extends runtimeexception {
public distributedlockexception(string message) {
super(message);
}
public distributedlockexception(string message, throwable cause) {
super(message, cause);
}
}
}
六、测试分布式锁
@springboottest
@slf4j
class distributedlocktest {
@autowired
private redissondistributedlock redissondistributedlock;
@autowired
private distributedlockutil distributedlockutil;
private final atomicinteger counter = new atomicinteger(0);
@test
void testconcurrentlock() throws interruptedexception {
int threadcount = 10;
string lockkey = "test:concurrent:lock";
countdownlatch latch = new countdownlatch(threadcount);
executorservice executor = executors.newfixedthreadpool(threadcount);
for (int i = 0; i < threadcount; i++) {
executor.submit(() -> {
try {
boolean locked = redissondistributedlock
.tryreentrantlock(lockkey, 2, 5, timeunit.seconds);
if (locked) {
try {
// 模拟业务处理
thread.sleep(100);
int value = counter.incrementandget();
log.info("线程 {} 获取锁成功,计数: {}",
thread.currentthread().getname(), value);
} finally {
redissondistributedlock.unlock(lockkey);
}
} else {
log.warn("线程 {} 获取锁失败", thread.currentthread().getname());
}
} catch (exception e) {
log.error("线程执行异常", e);
} finally {
latch.countdown();
}
});
}
latch.await();
executor.shutdown();
assertequals(threadcount, counter.get());
}
@test
void testlockwithutil() {
string lockkey = "test:util:lock";
string result = distributedlockutil.executewithlock(
lockkey,
2,
5,
timeunit.seconds,
() -> {
// 业务逻辑
return "success";
}
);
assertequals("success", result);
}
}
总结与建议
1.选择方案
- 简单场景:使用spring integration的
redislockregistry - 生产环境:推荐使用redisson,功能最全,稳定性最好
- 特殊需求:需要精细控制时,可以使用原生redis命令自定义
2.最佳实践
- 锁的key要有业务含义,如
order:create:{orderid} - 一定要设置合理的过期时间,防止死锁
- 释放锁时要检查是否当前线程持有
- 使用lua脚本保证原子性
- 考虑锁的可重入性
- 生产环境使用redis集群或哨兵模式
3.注意事项
- 避免锁粒度过大,影响并发性能
- 避免锁持有时间过长
- 实现锁的自动续期(看门狗机制)
- 考虑锁等待超时和快速失败
- 添加监控和告警机制
4.常见问题解决
- 锁提前过期:使用redisson的看门狗机制
- 锁误删:每个锁设置唯一value,释放时验证
- 锁不可重入:使用redisson或实现可重入逻辑
- redis集群故障:使用redlock算法(多个redis实例)
这样实现的分布式锁既安全又可靠,可以满足大多数业务场景的需求。
以上就是基于springboot实现分布式锁的三种方法的详细内容,更多关于springboot分布式锁的资料请关注代码网其它相关文章!
发表评论