Golang sync包中errgroup的使用详解

目录
  • 1、初识 errgroup
  • 2、errgroup 源码解析
  • 3、errgroup 上下文取消
  • 4、总结

1、初识 errgroup

WaitGroup 主要用于控制任务组下的并发子任务。它的具体做法就是,子任务 goroutine 执行前通过 Add 方法添加任务数目,子任务 goroutine 结束时调用 Done 标记已完成任务数,主任务 goroutine 通过 Wait 方法等待所有的任务完成后才能执行后续逻辑。

package main

 import (
     "net/http"
     "sync"
 )

 func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.baidu.com/",
        "http://www.bokeyuan12111.com/",
    }
    for _, url := range urls {
        wg.Add(1)
        go func(url string) {
            defer wg.Done()
            resp, err := http.Get(url)
            if err != nil {
                return
            }
            resp.Body.Close()
        }(url)
    }
    wg.Wait()
}

在以上示例代码中,我们通过三个 goroutine 去并发的请求 url,直到所有的子任务 goroutine 均完成访问,主任务 goroutine 下的 wg.Wait 才会停止阻塞。

但在实际的项目代码中,子任务 goroutine 的执行并不总是顺风顺水,它们也许会产生 error。而 WaitGroup 并没有告诉我们在子 goroutine 发生错误时,如何将其抛给主任务 groutine。

这个时候可以考虑使用 errgroup

package main

 import (
     "fmt"
     "net/http"

     "golang.org/x/sync/errgroup"
 )

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.baidu.com/",
        "http://www.bokeyuan12111.com/",
    }
    g := new(errgroup.Group)
    for _, url := range urls {
        url := url
        g.Go(func() error {
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println(err)
                return err
            }
            fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
            return resp.Body.Close()
        })
    }
    if err := g.Wait(); err != nil {
        fmt.Println(err)
    } else {
        fmt.Println("All success!")
    }
}

结果如下:

get [http://www.baidu.com/] success: [200]
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
Get "http://www.golang.org/": dial tcp 142.251.42.241:80: i/o timeout
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host

可以看到,执行获取www.bokeyuan12111.com和www.golang.org两个 url 的子 groutine 均发生了错误,在主任务 goroutine 中成功捕获到了第一个错误信息。

除了 拥有 WaitGroup 的控制能力 和 错误传播 的功能之外,errgroup 还有最重要的 context 反向传播机制,我们来看一下它的设计。

2、errgroup 源码解析

errgroup 的设计非常精练,全部代码如下

type Group struct {
     cancel func()

     wg sync.WaitGroup

     errOnce sync.Once
     err     error
 }

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

func (g *Group) Go(f func() error) {
    g.wg.Add(1)

    go func() {
        defer g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

可以看到,errgroup 的实现依靠于结构体 Group,它通过封装 sync.WaitGroup,继承了 WaitGroup 的特性,在 Go() 方法中新起一个子任务 goroutine,并在 Wait() 方法中通过 sync.WaitGroup 的 Wait 进行阻塞等待。

同时 Group 利用 sync.Once 保证了它有且仅会保留第一个子 goroutine 错误。

最后,Group 通过嵌入 context.WithCancel 方法产生的 cancel 函数,能够在子 goroutine 发生错误时,及时通过调用 cancle 函数,将 Context 的取消信号及时传播出去。当然,这一特性需要用户代码的配合。

3、errgroup 上下文取消

在 errgroup 的文档(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文档的 pipeline( https://blog.golang.org/pipelines) ,实现了一个任务组 goroutine 中上下文取消(Context cancelation)演示的示例。但该 Demo 的前提知识略多,本文这里基于其思想,提供一个易于理解的使用示例。

package main

import (
    "context"
    "fmt"

    "golang.org/x/sync/errgroup"
)

func main() {

    g, ctx := errgroup.WithContext(context.Background())
    dataChan := make(chan int, 20)

    // 数据生产端任务子 goroutine
    g.Go(func() error {
        defer close(dataChan)
        for i := 1; ; i++ {
            if i == 10 {
                return fmt.Errorf("data 10 is wrong")
            }
            dataChan <- i
            fmt.Println(fmt.Sprintf("sending %d", i))
        }
    })

    // 数据消费端任务子 goroutine
    for i := 0; i < 3; i++ {
        g.Go(func() error {
            for j := 1; ; j++ {
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case number := <-dataChan:
                    fmt.Println(fmt.Sprintf("receiving %d", number))
                }
            }
        })
    }

    // 主任务 goroutine 等待 pipeline 结束数据流
    err := g.Wait()
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("main goroutine done!")
}

在以上示例中,我们模拟了一个数据传送管道。在数据的生产与消费任务集中,有四个子任务 goroutine:一个生产数据的 goroutine,三个消费数据的 goroutine。当数据生产方存在错误数据时(数据等于 10 ),我们停止数据的生产与消费,并将错误抛出,回到 main goroutine 的执行逻辑中。

可以看到,因为 errgroup 中的 Context cancle 函数的嵌入,我们在子任务 goroutine 中也能反向控制任务上下文。

程序的某一次运行,输出结果如下:

sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 1
receiving 3
receiving 2
receiving 4
data 10 is wrong
main goroutine done!

4、总结

errgroup 是 Go 官方的并发原语补充库,相对于标准库中提供的原语而言,显得没那么核心。这里总结一下 errgroup 的特性。

  • 继承了 WaitGroup 的功能
  • 错误传播:能够返回任务组中发生的第一个错误,但有且仅能返回该错误
  • context 信号传播:如果子任务 goroutine 中有循环逻辑,则可以添加 ctx.Done 逻辑,此时通过 context 的取消信号,提前结束子任务执行。

到此这篇关于Golang sync包中errgroup的使用详解的文章就介绍到这了,更多相关Golang errgroup内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang基于errgroup实现并发调用的方法

    目录 串行调用 基于sync.WaitGroup实现简单的并发调用 基于errgroup.Group实现并发调用 总结 串行调用 在用go编写web/rpc服务器的时候,经常会出现需要对下游多 个/组 服务调用rpc(或者其他比较耗时的操作)的情况.按照自然的写法,比如对下游有ABC三个调用,串行顺着写,就总共要花费TimeA+TimeB+TimeC的时间: func Handler(ctx context.Context) { var a, b, c respType a = A(ctx) b

  • Golang errgroup 设计及实现原理解析

    目录 开篇 errgroup 源码拆解 Group WithContext Wait Go SetLimit TryGo 使用方法 结束语 开篇 继上次学习了信号量 semaphore 扩展库的设计思路和实现之后,今天我们继续来看 golang.org/x/sync 包下的另一个经常被 Golang 开发者使用的大杀器:errgroup. 业务研发中我们经常会遇到需要调用多个下游的场景,比如加载一个商品的详情页,你可能需要访问商品服务,库存服务,券服务,用户服务等,才能从各个数据源获取到所需要的

  • Go并发编程中sync/errGroup的使用

    目录 一.序 二.errGroup 2.1 函数签名 三.源码 3.1 Group 3.2 WaitContext 3.3 Go 3.4 Wait 四. 案例 五. 参考 一.序 这一篇算是并发编程的一个补充,起因是当前有个项目,大概の 需求是,根据kafka的分区(partition)数,创建同等数量的 消费者( goroutine)从不同的分区中消费者消费数据,但是总有某种原因导致,某一个分区消费者创建失败,但是其他分区消费者创建失败. 最初的逻辑是,忽略分区失败的逻辑,将成功创建的分区消费

  • GO中sync包自由控制并发示例详解

    目录 资源竞争 sync.Mutex sync.RWMutex sync.WaitGroup sync.Once sync.Cond 资源竞争 channel 常用于并发通信,要保证并发安全,主要使用互斥锁.在并发的过程中,当一个内存被多个 goroutine 同时访问时,就会产生资源竞争的情况.这块内存也可以称为共享资源. 并发时对于共享资源必然会出现抢占资源的情况,如果是对某资源的统计,很可能就会导致结果错误.为保证只有一个协程拿到资源并操作它,可以引入互斥锁 sync.Mutex. syn

  • Linux下Docker CE使用从包中安装的方式详解

    使用从包中安装的方式,在Linux上安装Docker CE 1.查看Linux系统信息nuame -a 我的是Debian, amd64 2.查看Linux 系统发行版的名称 lsb_release -cs 我的是stretch 3.进入到下载包页面https://download.docker.com/linux/ 第一步获取的Debian,点击进入debian>dists 进入了这个连接地址 https://download.docker.com/linux/debian/dists/ 第二

  • Golang import本地包和导入问题相关详解

    1 本地包声明 包是Go程序的基本单位,所以每个Go程序源代码的开始都是一个包声明: package pkgName 这就是包声明,pkgName 告诉编译器,当前文件属于哪个包.一个包可以对应多个*.go源文件,标记它们属于同一包的唯一依据就是这个package声明,也就是说:无论多少个源文件,只要它们开头的package包相同,那么它们就属于同一个包,在编译后就只会生成一个.a文件,并且存放在$GOPATH/pkg文件夹下. 示例: (1) 我们在$GOPATH/目录下,创建如下结构的文件夹

  • 基于Spring中各个jar包的作用及依赖(详解)

    先附spring各版本jar包下载链接http://repo.spring.io/release/org/springframework/spring/ spring.jar 是包含有完整发布模块的单个jar 包.但是不包括mock.jar, aspects.jar, spring-portlet.jar, and spring-hibernate2.jar 示例图片为Spring-2.5.6.jar的包目录 下面讲解各个jar包的作用: 1.org.springframework.aop或sp

  • Golang拾遗之实现一个不可复制类型详解

    目录 如何复制一个对象 为什么要禁止复制 运行时检测实现禁止复制 初步尝试 更好的实现 性能 优点和缺点 静态检测实现禁止复制 利用Locker接口不可复制实现静态检测 优点和缺点 更进一步 利用package和interface进行封装 优点和缺点 总结 如何复制一个对象 不考虑IDE提供的代码分析和go vet之类的静态分析工具,golang里几乎所有的类型都能被复制. // 基本标量类型和指针 var i int = 1 iCopy := i str := "string" st

  • go语言context包功能及操作使用详解

    目录 Context包到底是干嘛用的? context原理 什么时候应该使用 Context? 如何创建 Context? 主协程通知有子协程,子协程又有多个子协程 context核心接口 emptyCtx结构体 Backgroud TODO valueCtx结构体 WithValue向context添加值 Value向context取值 示例 WithCancel可取消的context cancelCtx结构体 WithDeadline-超时取消context WithTimeout-超时取消

  • Golang配置解析神器go viper使用详解

    目录 前言 viper简介 功能 viper配置优先级 安装viper 支持哪些文件格式 key大小写问题 使用指南 如何访问viper的功能 配置默认值 读取配置文件 写配置文件 WriteConfig SafeWriteConfig WriteConfigAs SafeWriteConfigAs 监听配置文件 从io.Reader读取配置 显示设置配置项 注册和使用别名 读取环境变量 与命令行参数搭配使用 pflag 扩展其他flag 远程key/value存储支持 访问配置 直接访问 序列

  • Golang实现快速求幂的方法详解

    今天讲个有趣的算法:如何快速求nm,其中n和m都是整数. 为方便起见,此处假设m>=0,对于m< 0的情况,求出n|m|后再取倒数即可. 另外此处暂不考虑结果越界的情况(超过 int64 范围). 当然不能用编程语言的内置函数,我们只能用加减乘除来实现. n的m次方的数学含义是:m个n相乘:n*n*n...*n,也就是说最简单的方式是执行 m 次乘法. 直接用乘法实现的问题是性能不高,其时间复杂度是 O(m),比如 329要执行29次乘法,而乘法运算是相对比较重的,我们看看能否采用什么方法将时

  • Golang基础教程之字符串string实例详解

    目录 1. string的定义 2.string不可变 3.使用string给另一个string赋值 4.string重新赋值 补充:字符串拼接 总结 1. string的定义 Golang中的string的定义在reflect包下的value.go中,定义如下: StringHeader 是字符串的运行时表示,其中包含了两个字段,分别是指向数据数组的指针和数组的长度. // StringHeader is the runtime representation of a string. // I

  • Golang 实现 RTP音视频传输示例详解

    目录 引言 RTP 数据包头部字段 Golang 的相关实现 结尾 引言 在 Coding 之前我们先来简单介绍一下 RTP(Real-time Transport Protocol), 正如它的名字所说,用于互联网的实时传输协议,通过 IP 网络传输音频和视频的网络协议. 由音视频传输工作小组开发,1996 年首次发布,并提出了以下使用设想. 简单的多播音频会议 使用 IP 的多播服务进行语音通信.通过某种分配机制,获取多播组地址和端口对.一个端口用于音频数据的,另一个用于控制(RTCP)包,

随机推荐

其他