前言
在日常开发中,我们经常会遇到这样的场景:
- 有一堆任务要跑(比如:发请求、处理数据、爬虫等)
- 不希望一次性全部跑完,担心打爆服务端或者被封
- 想要设置并发数、限速,还能控制任务重试、失败记录
那么,能不能用 go 实现一个“轻量级的并发任务调度器”?——答案是:当然可以!
今天我们就来用 go 从零实现一个可配置的任务调度器,支持:
- 最大并发数控制(worker pool)
- 每秒请求速率限制(rate limit)
- 简单的失败重试机制
- 支持结果收集与错误输出
效果展示
你可以像这样调用我们的调度器:
scheduler := newscheduler(5, 10) // 并发 5,速率限制每秒 10 次 for i := 0; i < 100; i++ { task := newtask(func() error { // 模拟网络请求或业务逻辑 fmt.println("正在处理任务:", i) time.sleep(300 * time.millisecond) return nil }) scheduler.submit(task) } scheduler.wait() fmt.println("全部任务完成")
核心组件设计
1. 任务(task)
我们将每个任务抽象为一个结构体:
type task struct { fn func() error retry int }
2. 调度器(scheduler)
负责维护任务队列、worker、速率限制器:
type scheduler struct { tasks chan *task wg sync.waitgroup ratelimiter <-chan time.time }
实现代码
下面是完整实现(可以直接复制使用):
type task struct { fn func() error retry int } func newtask(fn func() error) *task { return &task{fn: fn, retry: 3} } type scheduler struct { tasks chan *task wg sync.waitgroup ratelimiter <-chan time.time } func newscheduler(concurrency int, ratepersecond int) *scheduler { s := &scheduler{ tasks: make(chan *task, 100), ratelimiter: time.tick(time.second / time.duration(ratepersecond)), } for i := 0; i < concurrency; i++ { go s.worker() } return s } func (s *scheduler) submit(task *task) { s.wg.add(1) s.tasks <- task } func (s *scheduler) worker() { for task := range s.tasks { <-s.ratelimiter // 限速 err := task.fn() if err != nil && task.retry > 0 { fmt.println("任务失败,重试中...") task.retry-- s.submit(task) } else if err != nil { fmt.println("任务最终失败:", err) } s.wg.done() } } func (s *scheduler) wait() { s.wg.wait() close(s.tasks) }
实战应用场景
- 网络爬虫限速抓取
- 批量发送邮件/sms/请求,防止接口限流
- 云服务任务调度、批量自动化操作
- 异步数据采集和聚合
总结
go 的并发模型非常适合处理“海量任务 + 控制速率 + 错误重试”的需求。本篇实现的调度器非常轻量,适合作为基础组件集成到你自己的系统中。
如果你有更多需求,比如:
- 增加失败回调
- 支持超时控制
- 任务优先级
- 后台监控 dashboard
到此这篇关于go实现一个轻量级并发任务调度器(支持限速)的文章就介绍到这了,更多相关go 并发任务调度器内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论