当前位置: 代码网 > it编程>前端脚本>Golang > Go使用TimerController解决timer过多的问题

Go使用TimerController解决timer过多的问题

2025年02月14日 Golang 我要评论
背景在go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!有些时候需要注册一些timer也是有需要起大

背景

  • 在go里面我们实现超时需要起一个goroutine才能实现,但是当我有大量的任务需要做超时控制就需要起大量的goroutine,实际上是一种开销和负担!
  • 有些时候需要注册一些timer也是有需要起大量的 goroutine才能实现,比如我要异步定期刷新一个配置,异步的监听啥东西,此时简单做法就是使用大量的 goroutine + timer/sleep实现!

解决思路

多路复用,实际上go底层也是一种多路复用的思想去实现的timer,但是它是底层的timer,我们需要解决的问题就过多的timer问题!

我们的思路是实现一个 timercontroller 可以帮助我们管理很多个timer,并且可以开销做到最低!因此使用一个 小顶堆 + timer调度器即可实现!

实现

小顶堆(最小堆)

使用go自带的 container/heap 实现 小顶堆

import (
    "container/heap"
)

type heapitem[t any] interface {
    less(heapitem[t]) bool
    getvalue() t
}

// 参考 intheap
type heapqueue[t any] []heapitem[t]

func (h heapqueue[t]) len() int           { return len(h) }
func (h heapqueue[t]) less(i, j int) bool { return h[i].less(h[j]) }
func (h heapqueue[t]) swap(i, j int)      { h[i], h[j] = h[j], h[i] }

func (h *heapqueue[t]) push(x any) {
    // push and pop use pointer receivers because they modify the slice's length,
    // not just its contents.
    *h = append(*h, x.(heapitem[t]))
}

func (h *heapqueue[t]) pop() any {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

type heapqueue[t any] struct {
    queue heapqueue[t]
}

func (h *heapqueue[t]) ptr() *heapqueue[t] {
    return &h.queue
}

// newheapqueue 非并发安全
func newheapqueue[t any](items ...heapitem[t]) *heapqueue[t] {
    queue := make(heapqueue[t], len(items))
    for index, item := range items {
       queue[index] = item
    }
    heap.init(&queue)
    return &heapqueue[t]{queue: queue}
}

func  (h *heapqueue[t])  push(item heapitem[t]) { 
    heap.push(h.ptr(), item)
}

func  (h *heapqueue[t])  pop() (t, bool ) { 
    if h.ptr().len() == 0 {
       var nil t
       return nil, false
    }
    return heap.pop(h.ptr()).(heapitem[t]).getvalue(), true
}

// peek 方法用于返回堆顶元素而不移除它
func  (h *heapqueue[t])  peek() (t, bool ) { 
    if h.ptr().len() > 0 {
       return h.queue[0].getvalue(), true
    }
    var nil t
    return nil, false
}

func (h *heapqueue[t]) len() int {
    return h.ptr().len()
}

调度器

type timer struct {
    timeout    time.time
    name       string
    notifyfunc func()
}

func (t *timer) getcurtimeout() time.duration {
    return t.timeout.sub(time.now())
}

// notify todo support async notify
func (t *timer) notify() {
    if t.notifyfunc != nil {
       t.notifyfunc()
    }
}

func (t *timer) isexpired() bool {
    return t.timeout.before(time.now())
}

func (t *timer) less(v heapitem[*timer]) bool {
    return t.timeout.before(v.getvalue().timeout)
}

func (t *timer) getvalue() *timer {
    return t
}

type timercontroller struct {
    timers  chan *timer
    minheap *heapqueue[*timer]

    closeonce sync.once
    close     chan struct{}
}

func (t *timercontroller) addtimer(timer *timer) bool {
    if timer == nil {
       return false
    }
    select {
    case <-t.close:
       return false
    default:
       t.timers <- timer
       return true
    }
}

func (t *timercontroller) close() {
    t.closeonce.do(func() { close(t.close) })
}

func newtimercontroller(buffersize int) *timercontroller {
    return &timercontroller{
       timers:  make(chan *timer, buffersize),
       minheap: newheapqueue[*timer](),
       close:   make(chan struct{}),
    }
}

func (t *timercontroller) start() {
    go t._start()
}
func (t *timercontroller) _start() {
    const defaulttimeout = time.hour * 24

    var (
       curmintimer *timer
       timeout     = time.newtimer(defaulttimeout)
    )
    for {
       select {
       case <-t.close:
          close(t.timers)
          timeout.stop()
          return
       case timer := <-t.timers:
          t.minheap.push(timer)
          curmintimer, _ = t.minheap.peek()
          timeout.reset(curmintimer.getcurtimeout())
          //fmt.printf("timeout.reset-1 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout())
       case <-timeout.c:
          if curmintimer != nil {
             curmintimer.notify()
             curmintimer = nil
             t.minheap.pop()
          }
          curmintimer, _ = t.minheap.peek()
          if curmintimer == nil {
             timeout.reset(defaulttimeout)
             continue
          }
          timeout.reset(curmintimer.getcurtimeout())
          //fmt.printf("timeout.reset-2 name: %s, timeout: %s\n", curmintimer.name, curmintimer.getcurtimeout())
       }
    }
}

测试

func testtimercontroller(t *testing.t) {
    controller := newtimercontroller(1024)
    controller.start()
    defer controller.close()
    now := time.now()
    arrs := make([]string, 0)
    newtimer := func(num int) *timer {
       return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: strconv.itoa(num), notifyfunc: func() {
          arrs = append(arrs, strconv.itoa(num))
       }}
    }
    // 这里乱序的注册了8个timer
    controller.addtimer(newtimer(5))
    controller.addtimer(newtimer(6))
    controller.addtimer(newtimer(3))
    controller.addtimer(newtimer(4))
    controller.addtimer(newtimer(7))
    controller.addtimer(newtimer(8))
    controller.addtimer(newtimer(1))
    controller.addtimer(newtimer(2))

    time.sleep(time.second * 1)
    t.logf("%#v\n", arrs)
    // 最终我们可以获取到 顺序执行的!
    assert.equal(t, arrs, []string{"1", "2", "3", "4", "5", "6", "7", "8"})
}




func testtimercontroller_stable(t *testing.t) {
    controller := newtimercontroller(1024)
    controller.start()
    defer controller.close()
    now := time.now()
    arrs := make(map[string]bool, 0)
    newtimer := func(num int, name string) *timer {
       return &timer{timeout: now.add(time.duration(num) * time.millisecond), name: name, notifyfunc: func() {
          arrs[name] = true
       }}
    }
    // 我们重复注册了相同实现执行的 timer,那么预期是每次执行的结果和注册顺序一致
    controller.addtimer(newtimer(2, "1"))
    controller.addtimer(newtimer(2, "2"))
    controller.addtimer(newtimer(2, "3"))
    controller.addtimer(newtimer(2, "4"))
    controller.addtimer(newtimer(2, "5"))

    time.sleep(time.second * 1)
    t.logf("%#v\n", arrs)
    assert.equal(t, arrs, map[string]bool{"1": true, "2": true, "3": true, "4": true, "5": true})
}

以上就是go使用timercontroller解决timer过多的问题的详细内容,更多关于go timercontroller解决timer过多的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com