背景
- 在go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
- 有些时候需要注册一些timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!
解决思路
多路复用,实际上go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!
我们的思路是实现一个 timercontroller 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + timer调度器即可实现!
实现
小顶堆(最小堆)
使用go自带的 container/heap
实现 小顶堆
import ( "container/heap" ) type heapitem[t any] interface { less(heapitem[t]) bool getvalue() t } // 参考 intheap type heapqueue[t any] []heapitem[t] func (h heapqueue[t]) len() int { return len(h) } func (h heapqueue[t]) less(i, j int) bool { return h[i].less(h[j]) } func (h heapqueue[t]) swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *heapqueue[t]) push(x any) { // push and pop use pointer receivers because they modify the slice's length, // not just its contents. *h = append(*h, x.(heapitem[t])) } func (h *heapqueue[t]) pop() any { old := *h n := len(old) x := old[n-1] *h = old[0 : n-1] return x } type heapqueue[t any] struct { queue heapqueue[t] } func (h *heapqueue[t]) ptr() *heapqueue[t] { return &h.queue } // newheapqueue 非并发安全 func newheapqueue[t any](items ...heapitem[t]) *heapqueue[t] { queue := make(heapqueue[t], len(items)) for index, item := range items { queue[index] = item } heap.init(&queue) return &heapqueue[t]{queue: queue} } func (h *heapqueue[t]) push(item heapitem[t]) { heap.push(h.ptr(), item) } func (h *heapqueue[t]) pop() (t, bool ) { if h.ptr().len() == 0 { var nil t return nil, false } return heap.pop(h.ptr()).(heapitem[t]).getvalue(), true } // peek 方法用于返回堆顶元素而不移除它 func (h *heapqueue[t]) peek() (t, bool ) { if h.ptr().len() > 0 { return h.queue[0].getvalue(), true } var nil t return nil, false } func (h *heapqueue[t]) len() int { return h.ptr().len() }
调度器
type timer struct { timeout time.time name string notifyfunc func() } func (t *timer) getcurtimeout() time.duration { return t.timeout.sub(time.now()) } // notify todo support async notify func (t *timer) notify() { if t.notifyfunc != nil { t.notifyfunc() } } func (t *timer) isexpired() bool { return t.timeout.before(time.now()) } func (t *timer) less(v heapitem[*timer]) bool { return t.timeout.before(v.getvalue().timeout) } func (t *timer) getvalue() *timer { return t } type timercontroller struct { timers chan *timer minheap *heapqueue[*timer] closeonce sync.once close chan struct{} } func (t *timercontroller) addtimer(timer *timer) bool { if timer == nil { return false } select { case <-t.close: return false default: t.timers <- timer return true } } func (t *timercontroller) close() { t.closeonce.do(func() { close(t.close) }) } func newtimercontroller(buffersize int) *timercontroller { return &timercontroller{ timers: make(chan *timer, buffersize), minheap: newheapqueue[*timer](), close: make(chan struct{}), } } func (t *timercontroller) start() { go t._start() } func (t *timercontroller) _start() { const defaulttimeout = time.hour * 24 var ( curmintimer *timer timeout = time.newtimer(defaulttimeout) ) for { select { case <-t.close: close(t.timers) timeout.stop() return case timer := <-t.timers: t.minheap.push(timer) curmintimer, _ = t.minheap.peek() timeout.reset(curmintimer.getcurtimeout()) //fmt.printf("timeout.reset-1 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout()) case <-timeout.c: if curmintimer != nil { curmintimer.notify() curmintimer = nil t.minheap.pop() } curmintimer, _ = t.minheap.peek() if curmintimer == nil { timeout.reset(defaulttimeout) continue } timeout.reset(curmintimer.getcurtimeout()) //fmt.printf("timeout.reset-2 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout()) } } }
测试
func testtimercontroller(t *testing.t) { controller := newtimercontroller(1024) controller.start() defer controller.close() now := time.now() arrs := make([]string, 0) newtimer := func(num int) *timer { return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: strconv.itoa(num), notifyfunc: func() { arrs = append(arrs, strconv.itoa(num)) }} } // 这里乱序的注册了8个timer controller.addtimer(newtimer(5)) controller.addtimer(newtimer(6)) controller.addtimer(newtimer(3)) controller.addtimer(newtimer(4)) controller.addtimer(newtimer(7)) controller.addtimer(newtimer(8)) controller.addtimer(newtimer(1)) controller.addtimer(newtimer(2)) time.sleep(time.second * 1) t.logf("%#v\n", arrs) // 最终我们可以获取到 顺序执行的! assert.equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"}) } func testtimercontroller_stable(t *testing.t) { controller := newtimercontroller(1024) controller.start() defer controller.close() now := time.now() arrs := make(map[string]bool, 0) newtimer := func(num int, name string) *timer { return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: name, notifyfunc: func() { arrs[name] = true }} } // 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致 controller.addtimer(newtimer(2, "1")) controller.addtimer(newtimer(2, "2")) controller.addtimer(newtimer(2, "3")) controller.addtimer(newtimer(2, "4")) controller.addtimer(newtimer(2, "5")) time.sleep(time.second * 1) t.logf("%#v\n", arrs) assert.equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true}) }
以上就是go使用timercontroller解决timer过多的问题的详细内容,更多关于go timercontroller解决timer过多的资料请关注代码网其它相关文章!
发表评论