当前位置: 代码网 > it编程>前端脚本>Golang > Go语言实现并发控制的常见方式详解

Go语言实现并发控制的常见方式详解

2024年05月15日 Golang 我要评论
一、channel并发控制1.1 channel切片控制携程执行通过创建一个切片channel 控制多个携程地并发执行,并收集携程执行获取的数据及错误信息type resultdto struct {

一、channel并发控制

1.1 channel切片控制携程执行

通过创建一个切片channel 控制多个携程地并发执行,并收集携程执行获取的数据及错误信息

type resultdto struct {
    err  error
    data interface{}
}
​
func main() {
    channel := make([]chan *resultdto, 10) 
    for i := 0; i < 10; i++ {
        channel[i] = make(chan *resultdto)
        temp := i
        go process(temp, channel[i])
    }
​
    for _, ch := range channel {
        fmt.println(<-ch)
    }
}
​
func process(i int, ch chan *resultdto) {
    // do some work...
    if i == 1 {
        ch <- &resultdto{err: errors.new("do work err")}
    } else {
        ch <- &resultdto{data: i}
    }
}

1.2 channel控制并发数量

通过带缓冲区的channel控制并发执行携程的数量 , 注意这里需要配合 sync.waitgroup 一起使用,不然当执行到i为7 8 9 时,子携程还没有执行完,主携程就退出了

func main() {
    wg := &sync.waitgroup{}
    ch := make(chan struct{}, 3)
    
    for i := 0; i < 10; i++ {
        ch <- struct{}{}
        wg.add(1)
        
        // 执行携程
        temp := i
        go process(wg, temp, ch)
        
    }
    
    wg.wait()
}
​
func process(wg *sync.waitgroup, i int, ch chan struct{}) {
    defer func() {
        <-ch
        wg.done()
    }()
    
    // do some work...
    time.sleep(1 * time.second)
    fmt.println(i)
}

二、waitgroup并发控制

2.1 waitgroup 控制协程并行

waitgroup是golang应用开发过程中经常使用的并发控制技术。

waitgroup,可理解为wait-goroutine-group,即等待一组goroutine结束。比如某个goroutine需要等待其他几个goroutine全部完成,那么使用waitgroup可以轻松实现。

func main() {
    wg := &sync.waitgroup{}
    for i := 0; i < 10; i++ {
        wg.add(1)
        temp := i
        go process(wg, temp)
    }
    wg.wait()
}
​
func process(wg *sync.waitgroup, i int) {
    defer func() {
        wg.done()
    }()
    // do some work...
    time.sleep(1 * time.second)
    fmt.println(i)
}

简单的说,上面程序中wg内部维护了一个计数器:

  • 启动goroutine前将计数器通过add(2)将计数器设置为待启动的goroutine个数。
  • 启动goroutine后,使用wait()方法阻塞自己,等待计数器变为0。
  • 每个goroutine执行结束通过done()方法将计数器减1。
  • 计数器变为0后,阻塞的goroutine被唤醒。

2.2 waitgroup封装通用函数

waitgroup控制并发执行,limit 并发上限,收集错误返回

func main() {
    funclist := []exefunc{
        func(ctx context.context) error {
            fmt.println("5 开始")
            time.sleep(5 * time.second)
            fmt.println("5 结束")
            return nil
        },
        func(ctx context.context) error {
            fmt.println("3 开始")
            time.sleep(3 * time.second)
            fmt.println("3 结束")
            return nil
        },
    }
    err := goexeall(context.background(), 2,  funclist...)
    if err != nil {
        fmt.println(err)
    }
}
​
type exefunc func(ctx context.context) error
​
// goexeall 并发执行所有,limit 为并发上限,收集所有错误返回
func goexeall(ctx context.context, limit int, fs ...exefunc) (errs []error) {
    wg := &sync.waitgroup{}
    ch := make(chan struct{}, limit)
    errch := make(chan error, len(fs))
    for _, f := range fs {
        ftmp := f
        wg.add(1)
        ch <- struct{}{}
        go func() {
            defer func() {
                if panicerr := recover(); panicerr != nil {
                    errch <- errors.new("execution panic:" + fmt.sprintf("%v", panicerr))
                }
                wg.done()
                <-ch
            }()
            if err := ftmp(ctx); err != nil {
                errch <- err
            }
        }()
    }
    wg.wait()
    close(errch)
    close(ch)
    for cherr := range errch {
        errs = append(errs, cherr)
    }
    return
}

三、context

golang context是golang应用开发常用的并发控制技术,它与waitgroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine。

3.1 context定义的接口

context实际上只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别可用于不同的场景。

type context interface {
    deadline() (deadline time.time, ok bool)
​
    done() <-chan struct{}
​
    err() error
​
    value(key interface{}) interface{}
}

deadline()

该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此时deadline为一个初始值的time.time值

done()

该方法返回一个channel,需要在select-case语句中使用,如”case <-context.done():”。

当context关闭后,done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;当context还未关闭时,done()返回nil。

err()

该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如deadline context,关闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同:

value()

有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息

3.2 context控制协程结束

func main() {
    wg := &sync.waitgroup{}
    ctx, cancelfunc := context.withcancel(context.background())
    for i := 0; i < 10; i++ {
        wg.add(1)
        temp := i
        go process(ctx, wg, temp)
    }
    time.sleep(5 * time.second)
    cancelfunc()
    wg.wait()
}
​
func process(ctx context.context, wg *sync.waitgroup, i int) {
    defer wg.done()
    ch := make(chan error)
    go dowork(ctx, ch, i)
    select {
    case <-ctx.done():
        fmt.println("cancelfunc")
        return
    case <-ch:
        return
    }
}
​
func dowork(ctx context.context, ch chan error, i int) {
    defer func() {
        ch <- nil
    }()
    time.sleep(time.duration(i) * time.second)
    fmt.println(i)
}

四、 errorgroup

可采用第三方库golang.org/x/sync/errgroup堆多个协助并发执行进行控制

4.1 errorgroup并发执行,limit 为并发上限,timeout超时

func main() {
    funclist := []exefunc{
        func(ctx context.context) error {
            fmt.println("5 开始")
            time.sleep(5 * time.second)
            fmt.println("5 结束")
            return nil
        },
        func(ctx context.context) error {
            fmt.println("3 开始")
            time.sleep(3 * time.second)
            fmt.println("3 结束")
            return nil
        },
    }
​
    err := goexe(context.background(), 2, 10*time.second, funclist...)
    if err != nil {
        fmt.println(err)
    }
}
​
type exefunc func(ctx context.context) error
​
// goexe 并发执行,limit 为并发上限,其中任意一个报错,其他中断,timeout为0不超时
func goexe(ctx context.context, limit int, timeout time.duration, fs ...exefunc) error {
    eg, ctx := errgroup.withcontext(ctx)
    eg.setlimit(limit)
    var timech <-chan time.time
    if timeout > 0 {
        timech = time.after(timeout)
    }
    for _, f := range fs {
        ftmp := f
        eg.go(func() (err error) {
            ch := make(chan error)
            defer close(ch)
            go doworkfunc(ctx, ch, ftmp)
            select {
            case <-ctx.done():
                return ctx.err()
            case <-timech:
                return errors.new("execution timeout")
            case err = <-ch:
                return err
            }
        })
    }
    if err := eg.wait(); err != nil {
        return err
    }
    return nil
}
​
func doworkfunc(ctx context.context, ch chan error, fs exefunc) {
    var err error
    defer func() {
        if panicerr := recover(); panicerr != nil {
            err = errors.new("execution panic:" + fmt.sprintf("%v", panicerr))
        }
        ch <- err
    }()
    err = fs(ctx)
    return
}

五、通用协程控制工具封装

import (
    "context"
    "errors"
    "fmt"
    "golang.org/x/sync/errgroup"
    "sync"
    "time"
)
​
​
// exefunc 要被执行的函数或方法
type exefunc func(ctx context.context) error
​
// seqexe 顺序执行,遇到错误就返回
func seqexe(ctx context.context, fs ...exefunc) error {
    for _, f := range fs {
        if err := f(ctx); err != nil {
            return err
        }
    }
    return nil
}
​
// goexe 并发执行,limit 为并发上限,其中任意一个报错,其他中断,timeout为0不超时
func goexe(ctx context.context, limit int, timeout time.duration, fs ...exefunc) error {
    eg, ctx := errgroup.withcontext(ctx)
    eg.setlimit(limit)
    var timech <-chan time.time
    if timeout > 0 {
        timech = time.after(timeout)
    }
    for _, f := range fs {
        ftmp := f
        eg.go(func() (err error) {
            ch := make(chan error)
            defer close(ch)
            go doworkfunc(ctx, ch, ftmp)
            select {
            case <-ctx.done():
                return ctx.err()
            case <-timech:
                return errors.new("execution timeout")
            case err = <-ch:
                return err
            }
        })
    }
    if err := eg.wait(); err != nil {
        return err
    }
    return nil
}
​
func doworkfunc(ctx context.context, ch chan error, fs exefunc) {
    var err error
    defer func() {
        if panicerr := recover(); panicerr != nil {
            err = errors.new("execution panic:" + fmt.sprintf("%v", panicerr))
        }
        ch <- err
    }()
    err = fs(ctx)
    return
}
​
// seqexeall 顺序执行所有,收集所有错误返回
func seqexeall(ctx context.context, fs ...exefunc) (errs []error) {
    for _, f := range fs {
        if err := f(ctx); err != nil {
            errs = append(errs, err)
        }
    }
    return errs
}
​
// goexeall 并发执行所有,limit 为并发上限,收集所有错误返回
func goexeall(ctx context.context, limit int, fs ...exefunc) (errs []error) {
    wg := &sync.waitgroup{}
    ch := make(chan struct{}, limit)
    errch := make(chan error, len(fs))
    for _, f := range fs {
        ftmp := f
        wg.add(1)
        ch <- struct{}{}
        go func() {
            defer func() {
                if panicerr := recover(); panicerr != nil {
                    errch <- errors.new("execution panic:" + fmt.sprintf("%v", panicerr))
                }
                wg.done()
                <-ch
            }()
            if err := ftmp(ctx); err != nil {
                errch <- err
            }
        }()
    }
    wg.wait()
    close(errch)
    close(ch)
    for cherr := range errch {
        errs = append(errs, cherr)
    }
    return
}

以上就是go语言实现并发控制的常见方式详解的详细内容,更多关于go并发控制的资料请关注代码网其它相关文章!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com