分布式锁在集群的架构中发挥着重要的作用。以下有主要的使用场景
1.在秒杀、抢购等高并发场景下,多个用户同时下单同一商品,可能导致库存超卖。
2.支付、转账等金融操作需保证同一账户的资金变动是串行执行的。
3.分布式环境下,多个节点可能同时触发同一任务(如定时报表生成)。
4.用户因网络延迟重复提交表单,可能导致数据重复插入。
自定义分布式锁
获取锁
比如一下一个场景,需要对订单号为 order-88888944010的订单进行扣款处理,因为后端是多节点的,防止出现用户重复点击导致扣款请求到不用的集群节点,所以需要同时只有一个节点处理该订单。
public static async task<(bool success, string lockvalue)> lockasync(string cachekey, int timeoutseconds = 5)
{
var lockkey = getlockkey(cachekey);
var lockvalue = guid.newguid().tostring();
var timeoutmilliseconds = timeoutseconds * 1000;
var expiration = timespan.frommilliseconds(timeoutmilliseconds);
bool flag = await _redisdb.stringsetasync(lockkey, lockvalue, expiration, when.notexists);
return (flag, flag ? lockvalue : string.empty);
}
public static string getlockkey(string cachekey)
{
return $"myapplication:locker:{cachekey}";
}
上述代码是在请求时将订单号作为redis key的一部分存储到redis中,并且生成了一个随机的lockvalue作为值。只有当redis中不存在该key的时候才能够成功设置,即为获取到该订单的分布式锁了。
await lockasync("order-88888944010",30); //获取锁,并且设置超时时间为30秒释放锁
public static async task<bool> unlockasync(string cachekey, string lockvalue)
{
var lockkey = getlockkey(cachekey);
var script = @"local invalue = @value
local currvalue = redis.call('get',@key)
if(invalue==currvalue) then redis.call('del',@key)
return 1
else
return 0
end";
var parameters = new { key = lockkey, value = lockvalue };
var prepared = luascript.prepare(script);
var result = (int)await _redisdb.scriptevaluateasync(prepared, parameters);
return result == 1;
}
释放锁采用了lua脚本先判断lockvalue是否是同一个处理节点发过来的删除请求,即判断加锁和释放锁是同一个来源。
用lua脚本而不是直接使用api执行删除的原因:
1.a获取锁后因gc停顿或网络延迟导致锁过期,此时客户端b获取了锁。若a恢复后直接调用del,会错误删除b持有的锁。
2.脚本在redis中单线程执行,确保get和del之间不会被其他命令打断。
自动续期
一些比较耗时的任务,可能在指定的超时时间内无法完成业务处理,需要存在自动续期的机制。
/// <summary>
/// 自动续期
/// </summary>
/// <param name="redisdb"></param>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="milliseconds">续期的时间</param>
/// <returns></returns>
public async static task delay(idatabase redisdb, string key, string value, int milliseconds)
{
if (!autodelayhandler.instance.containskey(key))
return;
var script = @"local val = redis.call('get', @key)
if val==@value then
redis.call('pexpire', @key, @milliseconds)
return 1
end
return 0";
object parameters = new { key, value, milliseconds };
var prepared = luascript.prepare(script);
var result = await redisdb.scriptevaluateasync(prepared, parameters, commandflags.none);
if ((int)result == 0)
{
autodelayhandler.instance.closetask(key);
}
return;
}
保存自动续期任务的处理器
public class autodelayhandler
{
private static readonly lazy<autodelayhandler> lazy = new lazy<autodelayhandler>(() => new autodelayhandler());
private static concurrentdictionary<string, (task, cancellationtokensource)> _tasks = new concurrentdictionary<string, (task, cancellationtokensource)>();
public static autodelayhandler instance => lazy.value;
/// <summary>
/// 任务令牌添加到集合中
/// </summary>
/// <param name="key"></param>
/// <param name="task"></param>
/// <returns></returns>
public bool tryadd(string key, task task, cancellationtokensource token)
{
if (_tasks.tryadd(key, (task, token)))
{
task.start();
return true;
}
else
{
return false;
}
}
public void closetask(string key)
{
if (_tasks.containskey(key))
{
if (_tasks.tryremove(key, out (task, cancellationtokensource) item))
{
item.item2?.cancel();
item.item1?.dispose();
}
}
}
public bool containskey(string key)
{
return _tasks.containskey(key);
}
}
在申请带有自动续期的分布式锁的完整代码
/// <summary>
/// 获取锁
/// </summary>
/// <param name="cachekey"></param>
/// <param name="timeoutseconds">超时时间</param>
/// <param name="autodelay">是否自动续期</param>
/// <returns></returns>
public static async task<(bool success, string lockvalue)> lockasync(string cachekey, int timeoutseconds = 5, bool autodelay = false)
{
var lockkey = getlockkey(cachekey);
var lockvalue = guid.newguid().tostring();
var timeoutmilliseconds = timeoutseconds * 1000;
var expiration = timespan.frommilliseconds(timeoutmilliseconds);
bool flag = await _redisdb.stringsetasync(lockkey, lockvalue, expiration, when.notexists);
if (flag && autodelay)
{
//需要自动续期,创建后台任务
cancellationtokensource cancellationtokensource = new cancellationtokensource();
var autodelaytask = new task(async () =>
{
while (!cancellationtokensource.iscancellationrequested)
{
await task.delay(timeoutmilliseconds / 2);
await delay(lockkey, lockvalue, timeoutmilliseconds);
}
}, cancellationtokensource.token);
var result = autodelayhandler.instance.tryadd(lockkey, autodelaytask, cancellationtokensource);
if (!result)
{
autodelaytask.dispose();
await unlockasync(cachekey, lockvalue);
return (false, string.empty);
}
}
return (flag, flag ? lockvalue : string.empty);
}
redis的过期时间精度约为1秒,且过期检查是周期性执行的(默认每秒10次)。选择ttl/2的间隔能:
确保在redis下一次过期检查前完成续期。
兼容redis的主从同步延迟(通常<1秒)
stackexchange.redis分布式锁
获取锁
string lockkey = "order:88888944010:lock"; string lockvalue = guid.newguid().tostring(); // 唯一标识锁持有者 timespan expiry = timespan.fromseconds(10); // 锁自动过期时间 // 尝试获取锁(原子操作) bool lockacquired = db.locktake(lockkey, lockvalue, expiry);
释放锁
bool released = await releaselockasync(db, lockkey, lockvalue);
自动续期
同样需要自己实现
到此这篇关于c#使用stackexchange.redis实现分布式锁的两种方式介绍的文章就介绍到这了,更多相关c# stackexchange.redis实现分布式锁内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论