登录
首页 >  Golang >  Go教程

Golang中实现fan-infan-out模式的方法

时间:2026-02-09 18:08:34 238浏览 收藏

从现在开始,努力学习吧!本文《Golang fan-in fan-out模式实现方法》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

fan-in fan-out是一种Go并发模式,先将任务分发给多个goroutine并行执行(fan-out),再从多个通道收集结果合并为单一输出(fan-in),提升处理效率。

如何在Golang中实现fan-in fan-out模式_Golang fan-in fan-out模式实践方法汇总

在Go语言中,fan-in fan-out 是一种常见的并发设计模式,用于提升程序的处理效率。它通过多个goroutine并行处理任务(fan-out),再将结果汇总到一个通道中(fan-in),特别适合I/O密集型或可并行计算的场景。

什么是fan-in fan-out?

该模式包含两个阶段:

  • Fan-out:将输入数据分发给多个工作goroutine,并发执行任务。
  • Fan-in:从多个输出通道收集结果,合并到一个通道中供后续处理。

这种结构能有效利用多核资源,提高吞吐量,同时保持代码简洁。

使用无缓冲通道实现基础fan-out

假设我们要处理一批URL请求,可以启动多个worker并发执行HTTP调用。

func fetch(url string, ch chan<- string) {
    resp, _ := http.Get(url)
    ch <- fmt.Sprintf("fetched %s: %d", url, resp.StatusCode)
}
<p>func main() {
urls := []string{"<a target='_blank'  href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXpusdyfpJl5fdnHe6nQhNCyr5q5fZqrq32ev4CNYH99emi_t9FojnaA3bJme86Fzc9th7qYnK-slJ6ys4acfpCGar7d0XU' rel='nofollow'>http://example.com</a>", "<a target='_blank'  href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXpusdyfppZkcNPGe4bZkZWdrYHffZqrq36fv42JZH19i6S0t7ijgZx8mL2hft2Ht8tuh62FqrBkfa6ya41hibOTorS0o3U' rel='nofollow'>http://google.com</a>", "<a target='_blank'  href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXpusdyfppWKidTIi3rZkZWdrYHffZqrq36fv42JZH19i6S0t7ijgZx8mL2hft2Ht8tuh62FqrBkfa6ya41hibOTorS0o3U' rel='nofollow'>http://github.com</a>"}
resultCh := make(chan string, len(urls))</p><pre class="brush:php;toolbar:false;">// Fan-out: 每个URL启动一个goroutine
for _, url := range urls {
    go fetch(url, resultCh)
}

// 收集所有结果
for i := 0; i &lt; len(urls); i++ {
    fmt.Println(&lt;-resultCh)
}

}

这种方式简单直接,但无法动态控制worker数量,容易导致资源耗尽。

带worker池的可控fan-out

更实用的做法是固定worker数量,从任务通道读取输入。

func worker(tasks <-chan int, results chan<- int, id int) {
    for num := range tasks {
        time.Sleep(time.Millisecond * 100) // 模拟耗时操作
        results <- num * num
        fmt.Printf("worker %d processed %d\n", id, num)
    }
}
<p>func main() {
tasks := make(chan int, 10)
results := make(chan int, 10)</p><pre class="brush:php;toolbar:false;">// 启动3个worker
for i := 0; i &lt; 3; i++ {
    go worker(tasks, results, i)
}

// 发送任务
for i := 1; i &lt;= 5; i++ {
    tasks &lt;- i
}
close(tasks)

// 收集结果
for i := 0; i &lt; 5; i++ {
    fmt.Println("result:", &lt;-results)
}

}

通过限制worker数,避免系统过载,适用于高并发任务调度。

实现fan-in合并多个输出通道

当每个worker有自己的输出通道时,需要fan-in函数统一收集。

func fanIn(channels ...<-chan string) <-chan string {
    out := make(chan string)
    for _, ch := range channels {
        go func(c <-chan string) {
            for val := range c {
                out <- val
            }
        }(ch)
    }
<pre class="brush:php;toolbar:false;">// 所有goroutine启动后关闭out(注意:此处简化处理)
go func() {
    for _, ch := range channels {
        for range ch {}
    }
    close(out)
}()

return out

}

更安全的方式是使用wg sync.WaitGroup等待所有worker完成后再关闭通道。

完整fan-in fan-out示例

结合以上思路,构建一个完整的流程:

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}
<p>func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}</p><p>func main() {
// Fan-out: 分发任务
nums := generate(1, 2, 3, 4, 5)</p><pre class="brush:php;toolbar:false;">// 多个worker并行处理
c1 := square(nums)
c2 := square(nums)

// Fan-in: 合并结果
merged := merge(c1, c2)

// 输出结果
for v := range merged {
    fmt.Println(v)
}

}

// merge函数合并多个通道 func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int)

output := func(c &lt;-chan int) {
    for n := range c {
        out &lt;- n
    }
    wg.Done()
}

wg.Add(len(cs))
for _, c := range cs {
    go output(c)
}

go func() {
    wg.Wait()
    close(out)
}()
return out

}

这个例子展示了典型的管道模式,数据流清晰,易于扩展和测试。

基本上就这些。fan-in fan-out的核心在于合理划分任务与结果收集,配合channel和goroutine实现高效并发。实际应用中可根据业务需求调整worker数量、缓冲大小和错误处理机制。

今天关于《Golang中实现fan-infan-out模式的方法》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

前往漫画官网入口并下载 ➜
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>