登录
首页 >  Golang >  Go问答

使用 goroutines 来处理函数中的递归调用,以及在所有工作 goroutine 完成后继续调用者的常规方法

来源:stackoverflow

时间:2024-02-12 10:48:23 343浏览 收藏

各位小伙伴们,大家好呀!看看今天我又给各位带来了什么文章?本文标题《使用 goroutines 来处理函数中的递归调用,以及在所有工作 goroutine 完成后继续调用者的常规方法》,很明显是关于Golang的文章哈哈哈,其中内容主要会涉及到等等,如果能帮到你,觉得很不错的话,欢迎各位多多点评和分享!

问题内容

我正在利用 goroutine 在 go 中实现一种(某种)组合回溯算法。我的问题可以表示为一棵具有一定程度/分布的树,我想访问每个叶子并根据所采取的路径计算结果。在给定的级别上,我想生成 goroutine 来同时处理子问题,即,如果我有一棵 3 级的树,并且我想在 2 级之后启动并发,我会生成 3*3=9 个 goroutine 来继续处理同时解决子问题。

func main() {
    cres := make(chan string, 100)
    res := []string{}
    numlevels := 5
    spread := 3
    startconcurrencyatlevel := 2
    ntree("", numlevels, spread, startconcurrencyatlevel, cres)
    for {
        select {
        case r := <-cres:
            res = append(res, r)
        case <-time.after(10 * time.second):
            fmt.println("caculation timed out")
            fmt.println(len(res), math.pow(float64(spread), float64(numlevels)))
            return
        }
    }
}

func ntree(path string, maxlevels int, spread int, startconcurrencyatlevel int, cres chan string) {
    if len(path) == maxlevels {
        // some longer running task here associated with the found path, also using a lookup table
        // real problem actually returns not the path but the result if it satisfies some condition
        cres <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextpath := path + fmt.sprint(i)
        if len(path) == startconcurrencyatlevel {
            go ntree(nextpath, maxlevels, spread, startconcurrencyatlevel, cres)
        } else {
            ntree(nextpath, maxlevels, spread, startconcurrencyatlevel, cres)
        }
    }
}

上面的代码可以工作,但是我依赖于 for select 语句超时。我正在寻找一种方法,在所有 goroutine 完成后立即继续 main(),即所有子问题都已处理完毕。

我已经想出了两种可能的(不受欢迎/不优雅的)解决方案:

  • 使用互斥锁保护的结果映射+等待组而不是基于通道的方法应该可以解决问题,但我很好奇是否有一个简洁的通道解决方案。

  • 使用退出通道(int 类型)。每次生成 goroutine 时,退出通道都会获得 +1 int,每次在叶子中完成计算时,它都会获得 -1 int 并且调用者将值相加。请参阅以下代码片段,但这不是一个好的解决方案,因为它(相当明显)遇到了我不想处理的计时问题。例如,如果第一个 goroutine 在另一个 goroutine 生成之前完成,它就会提前退出。

for {
        select {
        case q := <-cRunningRoutines:
            runningRoutines += q
            if runningRoutines == 0 {
                fmt.Println("Calculation complete")
                return res
            }
        // ...same cases as above
    }

演示:https://go.dev/play/p/9jzecvl8clj

以下问题:

  1. 从作为 goroutine 启动的函数对其自身进行递归调用是一种有效的方法吗?
  2. 在所有生成的 goroutine 完成之前从 cres 读取结果的惯用方法是什么?我在某处读到,计算完成后应该关闭通道,但我就是不知道如何在这种情况下集成它。

很高兴有任何想法,谢谢!


正确答案


正如 torek 提到的,我派生了一个匿名函数,在 waitgroup 完成等待后关闭通道。还需要一些逻辑,仅在 goroutine 生成级别的递归返回后调用生成的 goroutine 的 wg.done()

总的来说,我认为这是一个有用的习语(如果我错了,请纠正我:))

演示:https://go.dev/play/p/bQjHENsZL25

func main() {
    cRes := make(chan string, 100)
    numLevels := 3
    spread := 3
    startConcurrencyAtLevel := 2
    var wg sync.WaitGroup
    nTree("", numLevels, spread, startConcurrencyAtLevel, cRes, &wg)

    go func() {
        // time.Sleep(1 * time.Second) // edit: code should work without this initial sleep
        wg.Wait()
        close(cRes)
    }()

    for r := range cRes {
        fmt.Println(r)
    }

    fmt.Println("Done!")
}

func nTree(path string, maxLevels int, spread int, startConcurrencyAtLevel int, cRes chan string, wg *sync.WaitGroup) {
    if len(path) == maxLevels {
        // some longer running task here associated with the found path
        cRes <- path
        return
    }
    for i := 1; i <= spread; i++ {
        nextPath := path + fmt.Sprint(i)
        if len(path) == startConcurrencyAtLevel {
            go nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        } else {
            nTree(nextPath, maxLevels, spread, startConcurrencyAtLevel, cRes, wg)
        }
    }
}

阅读描述和代码片段,我无法准确理解您想要实现的目标,但我对我每天使用的渠道有一些提示和模式,并认为很有帮助。

  • context 包对于以安全的方式管理 goroutines 的状态非常有帮助。在您的示例中, time.after 用于结束主程序,但在非主函数中,它可能会泄漏 goroutine:如果您使用 context.context 并将其传递到 goroutuines 中(通常传递函数的第一个参数)您将能够控制下游调用的取消。 This explains it briefly

  • 在生成消息并在通道中发送消息的函数中创建通道(并返回它们)是常见的做法。相同的函数应该负责关闭通道,例如,在完成写入时使用 defer close(channel) 。 这很方便,因为当通道被缓冲时,即使通道中仍然有数据,也可以关闭通道:go 运行时实际上会等到所有消息都被轮询后再关闭。对于无缓冲的通道,在通道的读取器准备好轮询之前,该函数将无法通过通道发送消息,因此无法退出。 This is an example (without recursion)。 在此示例中,我们可以在缓冲或未缓冲通道时 关闭 通道,因为发送将阻塞,直到主 goroutine 中通道上的 for := range 从中读取数据。 This is a variant 同样的原理,以通道作为参数传递。

  • 我们可以将 sync.waitgroup 与通道一起使用,以发出各个 goroutine 的完成信号,并让“编排”goroutine 知道通道可以关闭,因为所有消息生产者已将数据发送到通道中。与第 1 点相同的注意事项适用于 close 操作。 This is an example 显示了 waitgroup 和通道外部关闭器的使用。

  • 频道可以有方向!请注意,在示例中,我在将箭头传入/外部函数时添加/删除了通道旁边的箭头(例如 <-chan stringchan<- string)。这告诉编译器只能在该函数的范围内分别读取或写入通道。 这在两个方面有帮助:

    1. 编译器将生成更高效的代码,因为具有方向的通道将具有单个锁而不是 2 个锁。
    2. 函数的签名描述了它是否仅使用通道进行写入(可能还有 close())或读取:请记住,从带有 range 的通道读取数据会在通道关闭时自动停止迭代。
  • 您可以构建通道的通道:make(chan chan string) 是构建处理管道的有效(且有用)构造。 它的一个常见用法是扇入 goroutine,它收集一系列通道生成 goroutine 的多个输出。 This is an example 如何使用它们。

本质上,回答您最初的问题:

如果您确实需要递归,最好将其与并发代码分开处理:创建一个专用函数,以递归方式将数据发送到通道中,并协调调用者中通道的关闭.

一个很好的参考是Go Concurrency Patterns: Pipelines and cancellation:这是一篇相当旧的帖子(在标准库中存在 context 包之前),我认为 Parallel digestion 是您正在寻找解决原始问题的内容。

本篇关于《使用 goroutines 来处理函数中的递归调用,以及在所有工作 goroutine 完成后继续调用者的常规方法》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>