golang基于mutex实现可重入锁
为什么需要可重入锁
我们平时说的分布式锁,一般指的是在不同服务器上的多个线程中,只有一个线程能抢到一个锁,从而执行一个任务。而我们使用锁就是保证一个任务只能由一个线程来完成。所以我们一般是使用这样的三段式逻辑:
lock(); dojob(); unlock();
但是由于我们的系统都是分布式的,这个锁一般不会只放在某个进程中,我们会借用第三方存储,比如 redis 来做这种分布式锁。但是一旦借助了第三方存储,我们就必须面对这个问题:unlock是否能保证一定运行呢?
这个问题,我们面对的除了程序的bug之外,还有网络的不稳定,进程被杀死,服务器被down机等。我们是无法保证unlock一定被运行的。
那么我们就一般在lock的时候为这个锁加一个超时时间作为兜底。
lockbyexpire(duration); dojob(); unlock();
这个超时时间是为了一旦出现异常情况导致unlock没有被运行,这个锁在duration时间内也会被自动释放。这个在redis中我们一般就是使用set ex 来进行锁超时的设定。
但是有这个超时时间我们又遇上了问题,超时时间设置多久合适呢?当然要设置的比 dojob 消耗的时间更长,否则的话,在任务还没结束的时候,锁就被释放了,还是有可能导致并发任务的存在。
但是实际上,同样由于网络超时问题,系统运行状况问题等,我们是无法准确知道dojob这个函数要执行多久的。那么这时候怎么办呢?
有两个办法:
第一个方法,我们可以对dojob做一个超时设置。让dojob最多只能执行n秒,那么我的分布式锁的超时时长设置比n秒长就可以了。为一个任务设置超时时间在很多语言是可以做到的。比如golang 中的 timeoutcontext。
而第二种方法,就是我们先为锁设置一个比较小的超时时长,然后不断续期这个锁。对一个锁的不断需求,也可以理解为重新开始加锁,这种可以不断续期的锁,就叫做可重入锁。
除了主线程之外,可重入锁必然有一个另外的线程(或者携程)可以对这个锁进行续期,我们叫这个额外的程序叫做watchdog(看门狗)。
锁重入的定义
锁可重入也就是当前已经获取到锁的goroutine继续调用lock方法获取锁,go标准库中提供了sync.mutex实现了排他锁,但并不是可重入的,如果在代码中重入锁,也就是lock之后再次进行lock获取锁,则会被阻塞到第二次lock上,锁没有办法得到释放从而影响其它goroutine执行
// 例如 package main; import "sync" func reentryexample() { var c int64 var mu sync.mutex mu.lock() // 第一次加锁 // todo // mu.lock() // 第二次加锁,阻塞 c++; // todo ... }
重入锁的简单实现思路
- 拿到能够识别到当前协程的id,(通过堆栈信息获取到goroutine的id)
- 写一个结构体,实现locker接口
首先获取到goroutine的id
func goid() int { var buf [32]byte n := runtime.stack(buf[:],false) // 获取堆栈的信息 // string(buf[:n] /** goroutine 6 [running]: main.xxx */ // 拿到goroutine的id goidstr := strings.fields(strings.trimprefix(string(buf[:n]), "goroutine"))[0] goid, err := strconv.atoi(fieldid)// 转换为int return goid }
然后开始编写可重入锁的结构体
// reentrantmutex 可重入的互斥锁 type reentrantmutex struct { sync.mutex // 互斥锁 goid int64 // 用于保存goroutine的id recursion int64 // 锁重入的次数 } // lock 实现locker接口,用于加锁 func (r *reentrantmutex) lock() { gid := goid() if atomic.loadint64(&r.goid) == gid { // 看看是否已经加过锁了? atomic.addint64(&r.recursion, 1) // 如果之前加过锁,则重入的次数+1 return } r.mutex.lock() // 使用互斥锁上锁 atomic.storeint64(&r.goid, gid) // 使用原子操作保存goroutine的id atomic.storeint64(&r.recursion, 1) // 第一次加锁,因此重入的次数为一 } // unlock 实现了locker的接口,用于解锁 func (r *reentrantmutex) unlock() { gid := goid() if atomic.loadint64(&r.goid) != gid { // 看是否加过锁 panic("未加锁") // 没有加过锁,不存在解锁,直接panic } recursion := atomic.addint64(&r.recursion, -1) // 重入次数-1 if recursion != 0 { // 如果重入次数没有等于0(意味着还有锁没有释放) return } atomic.storeint64(&r.goid, -1) // 重入次数为0,则不存在锁没有释放,解锁 r.mutex.unlock() // 互斥锁解锁 }
测试用例
package main; func main() { var m reentrantmutex m.lock() m.lock() // 不会阻塞 fmt.println("1") // 正常打印1 m.unlock() m.unlock()// 解锁 }
其他方法实现golang可重入锁:
具体实现
在golang中,语言级别天生支持协程,所以这种可重入锁就非常容易实现:
// distributelockredis 基于redis的分布式可重入锁,自动续租 type distributelockredis struct { key string // 锁的key expire int64 // 锁超时时间 status bool // 上锁成功标识 cancelfun context.cancelfunc // 用于取消自动续租携程 redis redis.client // redis句柄 } // 创建可 func newdistributelockredis(key string, expire int64) *distributelockredis { return &distributelockredis{ key : key, expire : expire, } } // trylock 上锁 func (dl *distributelockredis) trylock() (err error) { if err = dl.lock(); err != nil { return err } ctx, cancelfun := context.withcancel(context.background()) dl.cancelfun = cancelfun dl.startwatchdog(ctx) // 创建守护协程,自动对锁进行续期 dl.status = true return nil } // competition 竞争锁 func (dl *distributelockredis) lock() error { if res, err := redis.string(dl.redis.do(context.background(), "set", dl.key, 1, "nx", "ex", dl.expire)); err != nil { return err } return nil } // guard 创建守护协程,自动续期 func (dl *distributelockredis) startwatchdog(ctx context.context) { safego(func() error { for { select { // unlock通知结束 case <-ctx.done(): return nil default: // 否则只要开始了,就自动重入(续租锁) if dl.status { if res, err := redis.int(dl.redis.do(context.background(), "expire", dl.key, dl.expire)); err != nil { return nil } // 续租时间为 expire/2 秒 time.sleep(time.duration(dl.expire/2) * time.second) } } } }) } // unlock 释放锁 func (dl *distributelockredis) unlock() (err error) { // 这个重入锁必须取消,放在第一个地方执行 if dl.cancelfun != nil { dl.cancelfun() // 释放成功,取消重入锁 } var res int if dl.status { if res, err = redis.int(dl.redis.do(context.background(), "del", dl.key)); err != nil { return fmt.errorf("释放锁失败") } if res == 1 { dl.status = false return nil } } return fmt.errorf("释放锁失败") }
这段代码的逻辑基本上都以注释的形式来写了。其中主要就在startwatchdog,对锁进行重新续期
ctx, cancelfun := context.withcancel(context.background()) dl.cancelfun = cancelfun dl.startwatchdog(ctx) // 创建守护协程,自动对锁进行续期 dl.status = true
首先创建一个cancelcontext,它的context函数cancelfunc是给unlock进行调用的。然后启动一个goroutine进程来循环续期。
这个新启动的goroutine在主goroutine处理结束,调用unlock的时候,才会结束,否则会在 过期时间/2 的时候,调用一次redis的expire命令来进行续期。
至于外部,在使用的时候如下
func foo() error { key := foo // 创建可重入的分布式锁 dl := newdistributelockredis(key, 10) // 争抢锁 err := dl.trylock() if err != nil { // 没有抢到锁 return err } // 抢到锁的记得释放锁 defer func() { dl.unlock() } // 做真正的任务 dojob() }
到此这篇关于golang基于mutex实现可重入锁的文章就介绍到这了,更多相关golang mutex可重入锁内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论