背景
- 在go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
- 有些时候需要注册一些timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!
解决思路
多路复用,实际上go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!
我们的思路是实现一个 timercontroller 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + timer调度器即可实现!
实现
小顶堆(最小堆)
使用go自带的 container/heap 实现 小顶堆
import (
"container/heap"
)
type heapitem[t any] interface {
less(heapitem[t]) bool
getvalue() t
}
// 参考 intheap
type heapqueue[t any] []heapitem[t]
func (h heapqueue[t]) len() int { return len(h) }
func (h heapqueue[t]) less(i, j int) bool { return h[i].less(h[j]) }
func (h heapqueue[t]) swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *heapqueue[t]) push(x any) {
// push and pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(heapitem[t]))
}
func (h *heapqueue[t]) pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type heapqueue[t any] struct {
queue heapqueue[t]
}
func (h *heapqueue[t]) ptr() *heapqueue[t] {
return &h.queue
}
// newheapqueue 非并发安全
func newheapqueue[t any](items ...heapitem[t]) *heapqueue[t] {
queue := make(heapqueue[t], len(items))
for index, item := range items {
queue[index] = item
}
heap.init(&queue)
return &heapqueue[t]{queue: queue}
}
func (h *heapqueue[t]) push(item heapitem[t]) {
heap.push(h.ptr(), item)
}
func (h *heapqueue[t]) pop() (t, bool ) {
if h.ptr().len() == 0 {
var nil t
return nil, false
}
return heap.pop(h.ptr()).(heapitem[t]).getvalue(), true
}
// peek 方法用于返回堆顶元素而不移除它
func (h *heapqueue[t]) peek() (t, bool ) {
if h.ptr().len() > 0 {
return h.queue[0].getvalue(), true
}
var nil t
return nil, false
}
func (h *heapqueue[t]) len() int {
return h.ptr().len()
}
调度器
type timer struct {
timeout time.time
name string
notifyfunc func()
}
func (t *timer) getcurtimeout() time.duration {
return t.timeout.sub(time.now())
}
// notify todo support async notify
func (t *timer) notify() {
if t.notifyfunc != nil {
t.notifyfunc()
}
}
func (t *timer) isexpired() bool {
return t.timeout.before(time.now())
}
func (t *timer) less(v heapitem[*timer]) bool {
return t.timeout.before(v.getvalue().timeout)
}
func (t *timer) getvalue() *timer {
return t
}
type timercontroller struct {
timers chan *timer
minheap *heapqueue[*timer]
closeonce sync.once
close chan struct{}
}
func (t *timercontroller) addtimer(timer *timer) bool {
if timer == nil {
return false
}
select {
case <-t.close:
return false
default:
t.timers <- timer
return true
}
}
func (t *timercontroller) close() {
t.closeonce.do(func() { close(t.close) })
}
func newtimercontroller(buffersize int) *timercontroller {
return &timercontroller{
timers: make(chan *timer, buffersize),
minheap: newheapqueue[*timer](),
close: make(chan struct{}),
}
}
func (t *timercontroller) start() {
go t._start()
}
func (t *timercontroller) _start() {
const defaulttimeout = time.hour * 24
var (
curmintimer *timer
timeout = time.newtimer(defaulttimeout)
)
for {
select {
case <-t.close:
close(t.timers)
timeout.stop()
return
case timer := <-t.timers:
t.minheap.push(timer)
curmintimer, _ = t.minheap.peek()
timeout.reset(curmintimer.getcurtimeout())
//fmt.printf("timeout.reset-1 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout())
case <-timeout.c:
if curmintimer != nil {
curmintimer.notify()
curmintimer = nil
t.minheap.pop()
}
curmintimer, _ = t.minheap.peek()
if curmintimer == nil {
timeout.reset(defaulttimeout)
continue
}
timeout.reset(curmintimer.getcurtimeout())
//fmt.printf("timeout.reset-2 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout())
}
}
}
测试
func testtimercontroller(t *testing.t) {
controller := newtimercontroller(1024)
controller.start()
defer controller.close()
now := time.now()
arrs := make([]string, 0)
newtimer := func(num int) *timer {
return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: strconv.itoa(num), notifyfunc: func() {
arrs = append(arrs, strconv.itoa(num))
}}
}
// 这里乱序的注册了8个timer
controller.addtimer(newtimer(5))
controller.addtimer(newtimer(6))
controller.addtimer(newtimer(3))
controller.addtimer(newtimer(4))
controller.addtimer(newtimer(7))
controller.addtimer(newtimer(8))
controller.addtimer(newtimer(1))
controller.addtimer(newtimer(2))
time.sleep(time.second * 1)
t.logf("%#v\n", arrs)
// 最终我们可以获取到 顺序执行的!
assert.equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"})
}
func testtimercontroller_stable(t *testing.t) {
controller := newtimercontroller(1024)
controller.start()
defer controller.close()
now := time.now()
arrs := make(map[string]bool, 0)
newtimer := func(num int, name string) *timer {
return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: name, notifyfunc: func() {
arrs[name] = true
}}
}
// 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致
controller.addtimer(newtimer(2, "1"))
controller.addtimer(newtimer(2, "2"))
controller.addtimer(newtimer(2, "3"))
controller.addtimer(newtimer(2, "4"))
controller.addtimer(newtimer(2, "5"))
time.sleep(time.second * 1)
t.logf("%#v\n", arrs)
assert.equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true})
}
以上就是go使用timercontroller解决timer过多的问题的详细内容,更多关于go timercontroller解决timer过多的资料请关注代码网其它相关文章!
发表评论