登录
推荐 文章 Go 技术 课程 下载 专题 AI
首页 >  Golang >  Go教程

Go errgroup 并发任务完整流程:错误取消、SetLimit 限流和结果收集

来源:17golang原创

时间:2026-06-16 09:50:50 301浏览 收藏

Go 里启动 goroutine 很容易,但把一组并发任务管稳并不简单。比如一个接口要同时查询用户、订单、库存、推荐数据,只要其中一个任务失败,其他任务是否继续跑?并发数是否会打满下游?结果切片是否安全?这些问题如果靠手写 channel 和 WaitGroup,很快会变复杂。

这篇文章按完整工作流整理 errgroup 的使用方式:从 errgroup.WithContext 创建任务组,到错误取消、SetLimit 控制并发、结果收集和 Wait 统一检查。重点是把并发任务做成可取消、可限流、可验证的流程。

目录
  • 目标和边界:errgroup 解决的是一组任务的生命周期
  • 先说结论:任务要统一启动、统一取消、统一收口
  • 全流程总览:请求到任务组再到结果汇总
  • 阶段 1:用 WithContext 建立任务组
  • 阶段 2:让错误触发取消信号
  • 阶段 3:用 SetLimit 控制并发上限
  • 阶段 4:安全收集结果并做 Wait 检查
  • 我的推荐流程
  • 容易踩坑
  • 速查表

目标和边界:errgroup 解决的是一组任务的生命周期

先把边界定清楚。errgroup 适合管理一组“同进同退”的并发任务:批量请求下游、并发处理文件、并行拉取多个分片、同时检查多份资源。它比裸 sync.WaitGroup 多了错误返回和 context 取消这两件关键能力。

如果任务之间完全独立,失败也不影响其他任务,那可以只用普通任务队列。如果任务之间需要统一收口,并且希望任一任务失败后尽快停止其他任务,errgroup.WithContext 更合适。

先说结论:任务要统一启动、统一取消、统一收口

推荐流程是:用 errgroup.WithContext 创建任务组;每个任务都接收派生出来的 ctx;任一任务返回错误后,其他任务能感知取消;最后用 g.Wait() 做统一检查。任务数量较多时,再用 g.SetLimit(n) 限制并发上限。

这个流程的好处是边界明确:任务在哪里启动、错误在哪里返回、取消在哪里生效、结果在哪里汇总,都有固定位置。后续维护时不会散落在多个 goroutine 里。

全流程总览:请求到任务组再到结果汇总

下面这张图展示主流程:请求进入 Go 服务后创建 errgroup,多个任务并发运行;任一任务返回错误时触发取消;最终在 Wait 处统一检查并汇总结果。

Go errgroup 从接收请求、并发任务、错误取消到结果汇总的完整流程图

阶段 目标 关键动作 检查点
阶段 1 建立任务组 使用 errgroup.WithContext 所有任务使用同一个派生 ctx
阶段 2 错误可取消 任务返回 error,其他任务监听 ctx 失败后不会继续做无效工作
阶段 3 控制压力 使用 SetLimit 设置并发上限 不会打爆下游
阶段 4 统一收口 Wait 后处理错误和结果 结果切片安全,错误路径清晰

阶段 1:用 WithContext 建立任务组

先看基础结构。假设要并发查询一批商品详情,每个任务都使用同一个派生 ctx

func LoadProducts(ctx context.Context, ids []int64) ([]Product, error) {
    g, ctx := errgroup.WithContext(ctx)

    products := make([]Product, len(ids))

    for i, id := range ids {
        i, id := i, id
        g.Go(func() error {
            item, err := queryProduct(ctx, id)
            if err != nil {
                return err
            }
            products[i] = item
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return products, nil
}

这里有两个检查点。第一,循环变量要在循环体里重新绑定,避免闭包拿到错误的值。第二,下游函数必须接收 ctx,否则取消信号无法传进去。

阶段 2:让错误触发取消信号

errgroup.WithContext 的关键价值是:当任一任务返回非空错误,派生出的 ctx 会被取消。其他任务如果正在请求下游、读取数据或等待资源,就能尽快停止。

func queryProduct(ctx context.Context, id int64) (Product, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, productURL(id), nil)
    if err != nil {
        return Product{}, err
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return Product{}, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return Product{}, fmt.Errorf("status %d", resp.StatusCode)
    }

    return decodeProduct(resp.Body)
}

如果任务内部不检查 ctx,错误取消就只是表面存在。所有耗时操作都要尽量使用支持 context 的 API,或者在循环中主动检查 ctx.Done()

阶段 3:用 SetLimit 控制并发上限

如果任务数量很大,不能把所有 goroutine 一次性放出去。SetLimit 可以限制同一时间运行的任务数量。

Go errgroup 使用 SetLimit 控制并发上限、互斥保护结果收集和 Wait 检查流程图

func LoadProductsLimited(ctx context.Context, ids []int64) ([]Product, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(5)

    products := make([]Product, len(ids))

    for i, id := range ids {
        i, id := i, id
        g.Go(func() error {
            item, err := queryProduct(ctx, id)
            if err != nil {
                return err
            }
            products[i] = item
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return products, nil
}

这里的检查点是并发上限不能拍脑袋。要根据下游 QPS、接口耗时、机器资源和调用方超时时间一起定。上限太小会慢,上限太大容易把下游压垮。

阶段 4:安全收集结果并做 Wait 检查

如果结果按输入下标写入切片,通常不需要额外锁,因为每个 goroutine 写不同位置。但如果结果是 append 到同一个切片,就要加互斥保护。

func SearchAll(ctx context.Context, words []string) ([]Result, error) {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(3)

    var mu sync.Mutex
    results := make([]Result, 0, len(words))

    for _, word := range words {
        word := word
        g.Go(func() error {
            res, err := searchOne(ctx, word)
            if err != nil {
                return err
            }

            mu.Lock()
            results = append(results, res)
            mu.Unlock()
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

最终只在 g.Wait() 后返回结果。不要在任务还没结束时提前读取共享结果,否则很容易得到半成品。

我的推荐流程

  1. 先确认这组任务是否需要同进同退。
  2. 需要统一取消时,使用 errgroup.WithContext
  3. 所有下游调用都传入派生 ctx
  4. 循环中启动任务时重新绑定循环变量。
  5. 任务数量较多时用 SetLimit 设并发上限。
  6. 按下标写结果时保持位置稳定;append 共享切片时加锁。
  7. 统一在 g.Wait() 后处理错误和返回结果。

容易踩坑

  • 只用 WaitGroup,不处理错误:某个任务失败后,其他任务仍然继续跑,调用方也拿不到清晰错误。
  • 下游不接收 ctx:错误取消无法传递,慢请求仍然占用资源。
  • 循环变量没有重新绑定:goroutine 可能拿到错误的下标或 ID。
  • 无限制启动任务:批量任务多时会压垮下游或触发本机资源瓶颈。
  • append 共享结果不加锁:会产生数据竞争,结果不稳定。

速查表

需求 推荐做法 检查点
一组任务统一收口 errgroup.WithContext 所有任务使用派生 ctx
任一任务失败后停止其他任务 任务返回 error 下游 API 支持 context
限制同时运行任务数 SetLimit 上限符合下游承载能力
按输入顺序返回结果 按下标写入切片 循环变量已重新绑定
动态追加结果 互斥保护 append 无数据竞争

总结

errgroup 不是简单替代 sync.WaitGroup,它更适合管理一组有关联的并发任务。它把错误返回、取消信号和统一等待放到一套流程里,能减少手写并发控制的复杂度。

落地时关注四个点:任务是否需要同进同退、下游是否接收 ctx、并发上限是否合理、结果收集是否安全。把这些边界定清楚,Go 并发代码会更稳,也更容易排查。

声明:本文转载于:17golang原创 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>