上篇介绍 go 的 gui 库 fyne 时,提到 fyne 的数据绑定用到了监听器模式。本文就展开说下我对 go 中监听器模式的理解和应用吧。
监听器模式简介
监听器模式,或称观察者模式,它主要涉及两个组件:主题(subject)和监听器(listener)。
subject 负责维护一系列的监听器,在所观测主题状态变化,将这个事件通知给所有注册的监听器。我统一将其定义为注册中心 registry。而监听器 listener 则是实现了特定接口的对象,用于响应事件消息,执行处理逻辑。
对具体应用而言,通常还会分出一个 watcher
或者 monitor
用于检测变化并推送给 registry
。从而实现将检测目标从系统解耦,无视监控组件类别。
这个模式在组件之间建立一种松散耦合的关系。将特定事件通知到关心它的其他组件,无需它们直接相互引用。看起来这个不也是发布-订阅模式吗?差不多一个意思。
之前工作中,用它最多的是配置的热更新场景,这篇文章也会简单介绍基于它的 etcd 配置热更新。
go 实现监听器模式
如何用 go 实现监听模式?我将定义两个新类型分别是注册中心(registry)和监听器接口(listener)。
首先是 listener
,它是一个接口,用于实现事件的响应逻辑。
type listener interface { ontrigger() }
先将其定义为一个接口,它的实现类型要求支持 ontrigger
方法,会在事件发生时被执行。
type registry struct { listeners []listener } func (r *registry) register(l listener) { r.listeners = append(r.listeners, l) } func (r *registry) notifyall() { for _, listener := range r.listeners { listener.ontrigger(key, value) } }
registry
是所有监听器的注册地,当特定事件发生,我们通过 registry.notifyall
将事件传递给所有 listener
。
我们实现一个简单的案例,当监听到某个事件发生,打印 "a specified event accured"。
为了模拟效果,本案例没有 watcher,直接通过主函数调用 notifyall
模拟触发事件。
为了打印事件消息,我们实现 listener
接口,创建新类型 eventprinter,如下所示:
type eventprinter struct { } func (printer *eventprinter) ontrigger() { fmt.println("a specified event accured!") }
写个主函数触发下事件,测试看看是否符合预期,代码如下所示:
func main() { r := ®istry{} r.reigster(&eventprinter{}) // 模拟接收到消息,触发事件通知 r.notifyall() }
执行测试,内容如下所示:
$ go run main.go a specified event occurred
如果希望自定义处理函数,只需让 listener
支持自定义事件回调函数即可。
修改代码如下所示:
type eventhandler struct { callback func() } func neweventhandler(callback func()) *eventhandler { return &eventhandler{callback: callback} } func (e *eventhandler) ontrigger() { e.callback() }
我们注册一个 eventhandler
到 registry
,主函数代码:
func main() { r := ®istry{} r.reigster(&eventprinter{}) r.reigster(neweventhandler(func() { fmt.println("custom print: a specified event occurred!") })) r.notifyall() }
测试执行:
$ go run main.go a specified event occurred! custom print: a specified event occurred!
基于 go channel 实现并发处理
前面的示例中 notifyall
是通过 for 循环依次调用 listener.ontrigger
将消息发送给 listener
,处理效率低下。
如何加速呢?
最直接的方法是通过 goroutine
运行 listener.ontrigger
方法。
func (r *registry) notifyall() { for _, listener := range r.listeners { go listener.ontrigger() } }
还有一种方法,通过 channel 传递事件消息,这样每个 listener
有独立的 goroutine 监听和处理。
如下是 listener
的实现代码:
type listener struct { eventchannel chan struct{} callback func() } func newlistener(callback func()) *listener { return &listener{ eventchannel: make(chan struct{}, 1), // 带缓冲的 channel,防止阻塞 callback: callback, } } func (l *listener) start() { go func() { for range l.eventchannel { l.callback() } }() }
这里 listener
的事件处理函数在单独的 goroutine 中运行。而相应的 registry 实现也需要修改,代码变更如下所示:
type registry struct { listeners []*listener } func (r *registry) register(listener *listener) { r.listeners = append(r.listeners, listener) listener.start() // 启动监听器的 goroutine } func (r *registry) notifyall(message string) { for _, listener := range r.listeners { listener.eventchannel <- struct{}{} // 发送事件到监听器 } } func (r *registry) close() { for _, listener := range r.listeners { close(listener.eventchannel) // 关闭 channel,停止监听器 goroutine } }
整体上的变化不大,在 listner.register
方法中启动 listener
事件处理 goroutine 等待事件消息。
实际案例:etcd 配置热更新
让我们实践一个具体的应用场景:实现配置的动态更新以及组件的自动重连机制。
我们将针对包括 mysql、redis 在内的各种组件,实现它们在配置变更时能够自动重连。这些组件的配置信息将以 json 格式存储于 etcd 的多个键(key)中。
假设,配置结构如下所示:
type mysqlconfig struct { host string port int user string password string } type redisconfig struct { host string port int }
这些配置被保存在 etcd 中,我们要实时监控配置的变化并据此更新配置和执行重连操作。
示例用法如下所示:
registry.register("/config/mysql", func(data) { // unmarshal data // reconnect mysql })
让我们基于监听器模式简单设计一个模块,实现 etcd 热更新:
- 每个监听器可以订阅特定的 key 或 key 前缀的更新事件。
- 使用
channel
通知配置变更,触发对应的监听回调。
这个示例,函数回调和轮询其实已经满足需求,此处只是为了演示,而是否使用 channel 要具体分析。
我们这个设计要涉及到三个部分。分别是 watcher、listener 和 registry。
- watcher 责监听 etcd 中的 key 变更事件。
- listener 定义了当特定 key 发生变化时需要执行的回调逻辑。
- registry 管理所有 listener,将 etcd 变更事件分发给对应 listener。
先定义 event
类型,一个简单的结构体,表示 etcd 中 key 的变更事件:
type event struct { key string value string }
listener
listener
实现如下所示:
type listener struct { eventchannel chan *event callback func(*event) } func newlistner(callback func(*event)) *listener { l := &listener{ eventchannel: make(chan *event), callback: callback, } return l } func (l *listener) start() { go func() { for event := range l.eventchannel { l.callback(event) } }() }
基本之前的没太大差别,从 eventchannel
中拿到事件消息,调用回调函数。
实现 registry
registry
负责维护 listener
的注册,并在接收到 key 变更事件时通知相关的 listener
:
type registry struct { listeners map[string][]*listener } func newregistry() *registry { return ®istry{ listeners: make(map[string][]*listener), } } func (r *registry) register(key string, listener *listener) { r.listeners[key] = append(r.listeners[key], listener) listener.start() } func (r *registry) notify(event *event) { if listeners, ok := r.listeners[event.key]; ok { for _, listener := range listeners { listener.eventchannel <- event } } }
注册 listener
到 registry
中,通过 map
将 key
与 listener
关联起来。
实现 watcher
watcher
负责从 etcd 订阅 key 的变更事件,并将这些事件发送到 registry
的 eventchannel
上:
func watchetcdkeys(client *clientv3.client, registry *registry, watchkeys ...string) { for _, key := range watchkeys { go func(key string) { watchchan := client.watch(context.background(), key, clientv3.withprefix()) for wresp := range watchchan { for _, ev := range wresp.events { event := &event{ key: string(ev.kv.key), value: string(ev.kv.value), } registry.notify(event) } } }(key) } }
使用示例
让我们实际在 main 函数上使用一下,观察行为是否正常。
func main() { client, err := clientv3.new(clientv3.config{ endpoints: []string{"localhost:2379"}, }) if err != nil { log.fatal(err) } defer client.close() registry := newregistry() // 注册监听器 registry.register("/config/mysql", newlistener(func(event * event) { fmt.println(event) // 执行数据重连之类的操作 })) // 开始监听 etcd key 变更 watchetcdkeys(client, registry, "/config/") time.sleep(10 * time.minute) }
这个示例创建了一个 etcd 客户端,初始化了一个 registry
,并为特定的 key 注册了一个 listener
。然后,通过 watchetcdkeys
函数开始监听 /config/
前缀下的所有 key 的变更。
这种设计支持对特定 key 或 key 前缀的监听。当相关 key 变更时,通过 channel
通知 listener
,而收到更新事件后的具体操作。视场景而定,这里是执行重连操作。
特别说明,示例仅作为概念验证,实际应用中需要更多的错误处理和优化。
到此这篇关于详解go语言中的监视器模式与配置热更新的文章就介绍到这了,更多相关go监视器模式与配置热更新内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论