前言
读写锁的好处就是能帮助客户读到的数据一定是最新的,写锁是排他锁,而读锁是一个共享锁,如果写锁一直存在,那么读取数据就要一直等待,直到写入数据完成才能看到,保证了数据的一致性
一、为什么使用lua
lua脚本是高并发、高性能的必备脚本语言, 大部分的开源框架(如:redission)中的分布式锁组件,都是用纯lua脚本实现的。
那么,为什么要使用lua语言来实现分布式锁呢?我们从一个案例看起:
所以,只有确保判断锁和删除锁是一步操作时,才能避免上面的问题,才能确保原子性。
其实很简单,首先获取锁对应的value值,检查是否与requestid相等,如果相等则删除锁(解锁)。虽然看似做了两件事,但是却只有一个完整的原子操作。
第一行代码,我们写了一个简单的 lua 脚本代码; 第二行代码,我们将lua代码传到 edis.eval()方法里,并使参数 keys[1] 赋值为 lockkey,argv[1] 赋值为 requestid,eval() 方法是将lua代码交给 redis 服务端执行。
二、执行流程
加锁和删除锁的操作,使用纯 lua 进行封装,保障其执行时候的原子性。
基于纯lua脚本实现分布式锁的执行流程,大致如下:
三、代码详解
lua\lock.lua
-- keys = [lock_key, lock_intent] -- argv = [lock_id, ttl] local t = redis.call('type', keys[1])["ok"] if t == "string" then return redis.call('pttl', keys[1]) end if redis.call("exists", keys[2]) == 1 then return redis.call('pttl', keys[2]) end redis.call('sadd', keys[1], argv[1]) redis.call('pexpire', keys[1], argv[2]) return nil
-- keys = [lock_key, lock_intent]
和-- argv = [lock_id, ttl, enable_lock_intent]
if not redis.call("set", keys[1], argv[1], "px", argv[2], "nx") then
行首使用了redis.call
函数调用,将 lock_key 和 lock_id 存储到 redis 中,并设置过期时间为 ttl。如果设置失败,则进入条件内部。- 在条件内部,判断 enable_lock_intent 的值。如果为 1,则执行
redis.call("set", keys[2], 1, "px", argv[2])
,将 lock_intent 键设置为 1,并设置与 lock_key 相同的过期时间。这是为了表示锁被占用的意图。 - 返回
redis.call("pttl", keys[1])
,即 lock_key 的剩余过期时间,以毫秒为单位。这是为了告知调用方锁已被占用,返回锁的剩余过期时间。 - 若上述条件都不满足,则执行
redis.call("del", keys[2])
,删除 lock_intent 键。 - 返回
nil
,表示锁已成功获取。
它首先尝试通过 set 命令将 lock_key 存储到 redis 中,如果设置失败,则表示锁已被其他进程占用,返回锁的剩余过期时间。如果设置成功,则删除 lock_intent 键,表示锁已成功获取
lua\refresh.lua
-- keys = [lock_key] -- argv = [lock_id, ttl] local t = redis.call('type', keys[1])["ok"] if (t == "string" and redis.call('get', keys[1]) ~= argv[1]) or (t == "set" and redis.call('sismember', keys[1], argv[1]) == 0) or (t == "none") then return 0 end return redis.call('pexpire', keys[1], argv[2])
- 延长锁的时间
lua\rlock.lua
-- keys = [lock_key, lock_intent] -- argv = [lock_id, ttl] local t = redis.call('type', keys[1])["ok"] if t == "string" then return redis.call('pttl', keys[1]) end if redis.call("exists", keys[2]) == 1 then return redis.call('pttl', keys[2]) end redis.call('sadd', keys[1], argv[1]) redis.call('pexpire', keys[1], argv[2]) return nil
local t = redis.call('type', keys[1])["ok"]
通过type
命令获取键的类型,并将结果存储在变量t
中。使用条件逻辑判断锁的状态:
- 如果
t
是字符串,则返回pttl
命令的结果,即锁的剩余过期时间。 - 如果
lock_intent
键存在,则返回pttl
命令的结果,即锁占用意图的剩余过期时间。
- 如果
由于以上条件都不满足,即锁未被占用,将锁 id (
argv[1]
) 添加到lock_key
集合中。使用
pexpire
命令设置lock_key
的过期时间为argv[2]
(以毫秒为单位)。返回
nil
,表示锁已成功获取。
lua\unlock.lua
-- keys = [lock_key] -- argv = [lock_id] local t = redis.call('type', keys[1])["ok"] if t == "string" and redis.call('get', keys[1]) == argv[1] then return redis.call('del', keys[1]) elseif t == "set" and redis.call('sismember', keys[1], argv[1]) == 1 then redis.call('srem', keys[1], argv[1]) if redis.call('scard', keys[1]) == 0 then return redis.call('del', keys[1]) end end return 1
- 检查指定键的类型,如果是字符串并且键的值等于给定的
argv
值,则删除该键。 - 如果指定键的类型是集合,并且集合中包含给定的
argv
值,则将该值从集合中移除。随后,如果集合中不再包含任何元素,则删除该键。
写优先还是读优先?
写锁会阻塞读锁,所以是写优先
写锁是如何阻塞写锁的?
如果当前的写锁已经被占用,其他写锁的获取请求会被阻塞,因为在释放锁的逻辑中,会先判断锁的类型,如果是写锁,则会判断当前锁的值是否符合预期,从而判断能否删除该锁。
读锁与读锁之间互斥吗?
对于读锁而言,多个读锁之间是可以并发持有的,因此读锁之间默认是不会互斥的,可以同时执行读操作。
写锁会有被饿死的情况吗?
写优先锁可以保证写线程不会饿死,但是如果一直有写线程获取写锁,读线程也会被「饿死」。
既然不管优先读锁还是写锁,对方可能会出现饿死问题,那么我们就不偏袒任何一方,搞个「公平读写锁」。
公平读写锁比较简单的一种方式是:用队列把获取锁的线程排队,不管是写线程还是读线程都按照先进先出的原则加锁即可,这样读线程仍然可以并发,也不会出现「饥饿」的现象。
抽象lock类
import ( "context" "errors" "time" "github.com/redis/go-redis/v9" ) var _ context.context = (*lock)(nil) // lock represents a lock with context. type lock struct { redis redis.scripter id string ttl time.duration key string log logfunc ctx context.context cancel context.cancelfunc } // id returns the id value set by the lock. func (l *lock) id() string { return l.id } // key returns the key value set by the lock. func (l *lock) key() string { return l.key } func (l *lock) deadline() (deadline time.time, ok bool) { return l.ctx.deadline() } func (l *lock) done() <-chan struct{} { return l.ctx.done() } func (l *lock) err() error { return l.ctx.err() } func (l *lock) value(key any) any { return l.ctx.value(key) } // unlock unlocks. func (l *lock) unlock() { l.cancel() _, err := scriptunlock.run(context.background(), l.redis, []string{l.key}, l.id).result() if err != nil { l.log("[error] unlock %q %s: %v", l.key, l.id, err) } } func (l *lock) refreshttl(left time.time) { defer l.cancel() refresh := l.updatettl() for { diff := time.since(left) select { case <-l.ctx.done(): return case <-time.after(-diff): // cant refresh return case <-time.after(refresh): status, err := scriptrefresh.run(l.ctx, l.redis, []string{l.key}, l.id, l.ttl.milliseconds()).int() if err != nil { if errors.is(err, context.canceled) { return } refresh = refreshtimeout l.log("[error] refresh key %q %s: %v", l.key, l.id, err) continue } left = l.leftttl() refresh = l.updatettl() if status == 0 { l.log("[error] refresh key %q %s already expired", l.key, l.id) return } } } } func (l *lock) leftttl() time.time { return time.now().add(l.ttl) } func (l *lock) updatettl() time.duration { return l.ttl / 2 }
id()
:返回锁的id。key()
:返回锁的键名。deadline()
:返回锁的截止时间和标志,如果没有设置则返回零值。done()
:返回一个通道,在锁的上下文被取消或者锁过期后会被关闭。err()
:返回锁的错误状态。value(key any) any
:返回一个键关联的值,用于传递上下文相关的数据。unlock()
:解锁操作,会取消锁的上下文,并调用redis的脚本解锁操作。refreshttl(left time.time)
:刷新锁的过期时间,定期更新redis中锁的过期时间,直到锁的上下文被取消、锁过期或无法继续刷新为止。leftttl()
:返回锁的剩余过期时间。updatettl()
:更新刷新锁的间隔时间。每次减少一半
为什么需要为什么l.ttl / 2
这是为了实现锁的自动续约。通过定期刷新锁的过期时间,可以确保锁在使用过程中不会过期而被意外释放。
这种做法可以在以下情况下带来一些好处:
- 减少锁的续约操作对redis的压力:由于续约操作是相对较昂贵的,通过将过期时间缩短为原来的一半,可以降低续约的频率,从而减少对redis的请求,减少了网络和计算资源的消耗。
- 避免长时间持有锁带来的问题:如果某个持有锁的进程/线程发生故障或延迟,导致无法及时释放锁,那么其他进程可能会长时间等待获取该锁,造成资源浪费。通过定期刷新锁的过期时间,可以在锁即将过期之前及时释放锁,降低该问题的风险。
options
package redismutex import ( "context" "log" "os" "sync" "time" ) const ( lenbytesid = 16 refreshtimeout = time.millisecond * 500 defaultkeyttl = time.second * 4 ) var ( globalmx sync.rwmutex globallog = func() logfunc { l := log.new(os.stderr, "redismutex: ", log.lstdflags) return func(format string, v ...any) { l.printf(format, v...) } }() ) // logfunc type is an adapter to allow the use of ordinary functions as logfunc. type logfunc func(format string, v ...any) // noplog logger does nothing var noplog = logfunc(func(string, ...any) {}) // setlog sets the logger. func setlog(l logfunc) { globalmx.lock() defer globalmx.unlock() if l != nil { globallog = l } } // mutexoption is the option for the mutex. type mutexoption func(*mutexoptions) type mutexoptions struct { name string ttl time.duration lockintent bool log logfunc } // withttl sets the ttl of the mutex. func withttl(ttl time.duration) mutexoption { return func(o *mutexoptions) { if ttl >= time.second*2 { o.ttl = ttl } } } // withlockintent sets the lock intent. func withlockintent() mutexoption { return func(o *mutexoptions) { o.lockintent = true } } // lockoption is the option for the lock. type lockoption func(*lockoptions) type lockoptions struct { ctx context.context key string lockintentkey string enablelockintent int ttl time.duration log logfunc } func newlockoptions(m mutexoptions, opt ...lockoption) lockoptions { opts := lockoptions{ ctx: context.background(), key: m.name, enablelockintent: booltoint(m.lockintent), ttl: m.ttl, log: m.log, } for _, o := range opt { o(&opts) } opts.lockintentkey = lockintentkey(opts.key) return opts } // withkey sets the key of the lock. func withkey(key string) lockoption { return func(o *lockoptions) { if key != "" { o.key += ":" + key } } } // withcontext sets the context of the lock. func withcontext(ctx context.context) lockoption { return func(o *lockoptions) { if ctx != nil { o.ctx = ctx } } } func booltoint(b bool) int { if b { return 1 } return 0 } func lockintentkey(key string) string { return key + ":lock-intent" }
setlog(l logfunc)
:设置日志记录器。withttl(ttl time.duration)
:设置互斥锁的生存时间(ttl)选项。withlockintent()
:设置锁意图选项。newlockoptions(m mutexoptions, opt ...lockoption)
:创建锁的选项。withkey(key string)
:设置锁的键选项。withcontext(ctx context.context)
:设置锁的上下文选项。lockintentkey(key string)
:为给定的锁键生成锁意图键。
可以通过设置选项来控制互斥锁的行为和属性,如生存时间、锁意图、上下文等。还提供了一些实用函数和类型,用于管理互斥锁和生成选项
redismutex
// package redismutex provides a distributed rw mutex. package redismutex import ( "context" "crypto/rand" "embed" "encoding/hex" "errors" "sync" "time" "github.com/redis/go-redis/v9" ) var errlock = errors.new("redismutex: lock not obtained") var ( //go:embed lua lua embed.fs scriptrlock *redis.script scriptlock *redis.script scriptrefresh *redis.script scriptunlock *redis.script ) func init() { scriptrlock = redis.newscript(mustreadfile("rlock.lua")) scriptlock = redis.newscript(mustreadfile("lock.lua")) scriptrefresh = redis.newscript(mustreadfile("refresh.lua")) scriptunlock = redis.newscript(mustreadfile("unlock.lua")) } // a rwmutex is a distributed mutual exclusion lock. type rwmutex struct { redis redis.scripter opts mutexoptions id struct { sync.mutex buf []byte } } // newmutex creates a new distributed mutex. func newmutex(rc redis.scripter, name string, opt ...mutexoption) *rwmutex { globalmx.rlock() defer globalmx.runlock() opts := mutexoptions{ name: name, ttl: defaultkeyttl, log: globallog, } for _, o := range opt { o(&opts) } rw := &rwmutex{ redis: rc, opts: opts, } rw.id.buf = make([]byte, lenbytesid) return rw } // tryrlock tries to lock for reading and reports whether it succeeded. func (m *rwmutex) tryrlock(opt ...lockoption) (*lock, bool) { opts := newlockoptions(m.opts, opt...) ctx, _, err := m.rlock(opts) if err != nil { if !errors.is(err, errlock) { m.opts.log("[error] try-read-lock key %q: %v", opts.key, err) } return nil, false } return ctx, true } // rlock locks for reading. func (m *rwmutex) rlock(opt ...lockoption) (*lock, bool) { opts := newlockoptions(m.opts, opt...) ctx, ttl, err := m.rlock(opts) if err == nil { return ctx, true } if !errors.is(err, errlock) { m.opts.log("[error] read-lock key %q: %v", opts.key, err) return nil, false } for { select { case <-opts.ctx.done(): m.opts.log("[error] read-lock key %q: %v", opts.key, opts.ctx.err()) return nil, false case <-time.after(ttl): ctx, ttl, err = m.rlock(opts) if err == nil { return ctx, true } if !errors.is(err, errlock) { m.opts.log("[error] read-lock key %q: %v", opts.key, err) return nil, false } continue } } } // trylock tries to lock for writing and reports whether it succeeded. func (m *rwmutex) trylock(opt ...lockoption) (*lock, bool) { opts := newlockoptions(m.opts, opt...) opts.enablelockintent = 0 // force disable lock intent ctx, _, err := m.lock(opts) if err != nil { if !errors.is(err, errlock) { m.opts.log("[error] try-lock key %q: %v", opts.key, err) } return nil, false } return ctx, true } // lock locks for writing. func (m *rwmutex) lock(opt ...lockoption) (*lock, bool) { opts := newlockoptions(m.opts, opt...) ctx, ttl, err := m.lock(opts) if err == nil { return ctx, true } if !errors.is(err, errlock) { m.opts.log("[error] lock key %q: %v", opts.key, err) return nil, false } for { select { case <-opts.ctx.done(): m.opts.log("[error] lock key %q: %v", opts.key, opts.ctx.err()) return nil, false case <-time.after(ttl): ctx, ttl, err = m.lock(opts) if err == nil { return ctx, true } if !errors.is(err, errlock) { m.opts.log("[error] lock key %q: %v", opts.key, err) return nil, false } continue } } } func (m *rwmutex) lock(opts lockoptions) (*lock, time.duration, error) { id, err := m.randomid() if err != nil { return nil, 0, err } pttl, err := scriptlock.run(opts.ctx, m.redis, []string{opts.key, opts.lockintentkey}, id, opts.ttl.milliseconds(), opts.enablelockintent).result() leftttl := time.now().add(opts.ttl) if err == nil { return nil, time.duration(pttl.(int64)) * time.millisecond, errlock } if err != redis.nil { return nil, 0, err } ctx, cancel := context.withcancel(opts.ctx) lock := &lock{ redis: m.redis, id: id, ttl: opts.ttl, key: opts.key, log: opts.log, ctx: ctx, cancel: cancel, } go lock.refreshttl(leftttl) return lock, 0, nil } func (m *rwmutex) rlock(opts lockoptions) (*lock, time.duration, error) { id, err := m.randomid() if err != nil { return nil, 0, err } pttl, err := scriptrlock.run(opts.ctx, m.redis, []string{opts.key, opts.lockintentkey}, id, opts.ttl.milliseconds()).result() leftttl := time.now().add(opts.ttl) if err == nil { return nil, time.duration(pttl.(int64)) * time.millisecond, errlock } if err != redis.nil { return nil, 0, err } ctx, cancel := context.withcancel(opts.ctx) lock := &lock{ redis: m.redis, id: id, ttl: opts.ttl, key: opts.key, log: opts.log, ctx: ctx, cancel: cancel, } go lock.refreshttl(leftttl) return lock, 0, nil } // randomid generates a random hex string with 16 bytes. func (m *rwmutex) randomid() (string, error) { m.id.lock() defer m.id.unlock() _, err := rand.read(m.id.buf) if err != nil { return "", err } return hex.encodetostring(m.id.buf), nil } func mustreadfile(filename string) string { b, err := lua.readfile("lua/" + filename) if err != nil { panic(err) } return string(b) }
- 通过
newmutex
函数创建一个新的分布式互斥锁。该函数接受 redis 客户端、锁的名称和一系列选项作为参数,返回一个 rwmutex 结构体实例。 - 通过
rlock
和lock
方法来获取读锁和写锁。如果无法立即获取锁,则会阻塞等待,直到获取成功或者上下文取消。 - 通过
tryrlock
和trylock
方法来尝试获取读锁和写锁,如果无法立即获取锁则立即返回失败,不会阻塞。 - 该包实现了一个
lock
结构体,包含了锁相关的信息和操作方法,比如刷新锁的过期时间。 - 使用
redis.script
来执行 lua 脚本,通过 redis 客户端执行相应的 redis 命令。 - 使用了
crypto/rand
包来生成随机的锁标识符。 - 最终的
mustreadfile
函数用于读取嵌入的 lua 脚本文件。
测试用例
package redismutex import ( "context" "errors" "log" "strings" "testing" "time" "github.com/redis/go-redis/v9" ) func init() { setlog(func(format string, a ...any) { if strings.hasprefix(format, "[error]") { log.fatalf(format, a...) } }) } func testmutex(t *testing.t) { t.parallel() const lockkey = "mutex" rc := redis.newclient(redisopts()) prep(t, rc, lockkey) mx := newmutex(rc, lockkey) lock, ok := mx.lock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() assertttl(t, rc, lockkey, defaultkeyttl) // try again _, ok = mx.trylock() if exp, got := false, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } _, ok = mx.tryrlock() if exp, got := false, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } // manually unlock lock.unlock() // lock again lock, ok = mx.lock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() } func testrwmutex(t *testing.t) { t.parallel() const lockkey = "rw_mutex" rc := redis.newclient(redisopts()) prep(t, rc, lockkey) mx := newmutex(rc, lockkey) lock, ok := mx.rlock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() assertttl(t, rc, lockkey, defaultkeyttl) // try again _, ok = mx.trylock() if exp, got := false, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } // try rlock rlock, ok := mx.tryrlock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } rlock.unlock() // manually unlock lock.unlock() // lock again lock, ok = mx.lock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() } func testrwmutex_lockintent(t *testing.t) { t.parallel() const lockkey = "lock_intent_mutex" rc := redis.newclient(redisopts()) prep(t, rc, lockkey) mx := newmutex(rc, lockkey, withlockintent()) lock, ok := mx.rlock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() // mark lock intent _, _, err := mx.lock(newlockoptions(mx.opts)) if exp, got := errlock, err; !errors.is(got, exp) { t.fatalf("exp %v, got %v", exp, got) } // try rlock _, ok = mx.tryrlock() if exp, got := false, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } // manually unlock lock.unlock() // lock write lock, ok = mx.lock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } lock.unlock() // remove lock intent // lock again lock, ok = mx.rlock() if exp, got := true, ok; exp != got { t.fatalf("exp %v, got %v", exp, got) } defer lock.unlock() } func testrwmutex_id(t *testing.t) { t.parallel() rw := &rwmutex{} rw.id.buf = make([]byte, lenbytesid) id, _ := rw.randomid() if exp, got := 32, len(id); exp != got { t.fatalf("exp %v, got %v", exp, got) } } func prep(t *testing.t, rc *redis.client, key string) { t.cleanup(func() { for _, v := range []string{key, lockintentkey(key)} { if err := rc.del(context.background(), v).err(); err != nil { t.fatal(err) } } if err := rc.close(); err != nil { t.fatal(err) } }) } func assertttl(t *testing.t, rc *redis.client, key string, exp time.duration) { t.helper() got, err := rc.ttl(context.background(), key).result() if exp, got := (any)(nil), err; exp != got { t.fatalf("exp %v, got %v", exp, got) } delta := got - exp if delta < 0 { delta = 1 - delta } if delta > time.second { t.fatalf("exp ~%v, got %v", exp, got) } } func redisopts() *redis.options { return &redis.options{ network: "tcp", addr: "0.0.0.0:6379", db: 9, } }
以上就是利用redis lua实现高效读写锁的代码实例的详细内容,更多关于redis lua读写锁的资料请关注代码网其它相关文章!
发表评论