登录
首页 >  Golang >  Go问答

Goroutines 使用通道同步的问题

来源:stackoverflow

时间:2024-04-16 16:36:34 212浏览 收藏

最近发现不少小伙伴都对Golang很感兴趣,所以今天继续给大家介绍Golang相关的知识,本文《Goroutines 使用通道同步的问题》主要内容涉及到等等知识点,希望能帮到你!当然如果阅读本文时存在不同想法,可以在评论中表达,但是请勿使用过激的措辞~

问题内容

我正在使用下面的代码来同步 goroutine。最近在调查一个错误时,我发现下面的代码并不总是有效。大约五分之一的失败。频道 quit 在我的 out 频道之前获取消息。我能够在本地(不是在 go-演示中)和 k8s 环境中一致地重现此问题。作为解决方法,我现在使用 sync.map 进行同步。

有办法修复下面的代码吗?

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    //test setup
    filePaths := []string{
        path.Join(os.TempDir(), fmt.Sprint("f1-", time.Now().Nanosecond())),
        path.Join(os.TempDir(), fmt.Sprint("f2-", time.Now().Nanosecond())),
        path.Join(os.TempDir(), fmt.Sprint("f3-", time.Now().Nanosecond())),
        path.Join(os.TempDir(), fmt.Sprint("f4-", time.Now().Nanosecond())),
        path.Join(os.TempDir(), fmt.Sprint("f5-", time.Now().Nanosecond())),
        path.Join(os.TempDir(), fmt.Sprint("f6-", time.Now().Nanosecond())),
    }
    for _, filePath := range filePaths {
        f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
        if err != nil {
            log.Fatal(err)
        }
        _, err = f.WriteString("There are many variations of passages of Lorem Ipsum available, but the majority have suffered alteration in some form, by injected humour, or randomised words which don't look even slightly believable. If you are going to use a passage of Lorem Ipsum, you need to be sure there isn't anything embarrassing hidden in the middle of text. All the Lorem Ipsum generators on the Internet tend to repeat predefined chunks as necessary, making this the first true generator on the Internet. It uses a dictionary of over 200 Latin words, combined with a handful of model sentence structures, to generate Lorem Ipsum which looks reasonable. The generated Lorem Ipsum is therefore always free from repetition, injected humour, or non-characteristic words etc.")
        if err != nil {
            log.Fatal(err)
        }
        err = f.Close()
        if err != nil {
            log.Fatal(err)
        }
    }
    for  {
        responses, err := getContents(filePaths)
        if err != nil {
            log.Fatal(err)
        }
        if len(responses) != len(filePaths) {
            log.Fatalf("Responses Does not Match, need %d Got %d",len(filePaths), len(responses))
        }
        time.Sleep(1 * time.Second)
    }

}


func  getContents(fileNames []string) ([][]byte, error) {
    wg := sync.WaitGroup{}
    var responseBytes [][]byte
    out := make(chan []byte, 10)
    defer close(out)
    quit := make(chan int)
    var opsFileRead uint64
    var opsChannelGot uint64
    defer close(quit)
    go func() {
        for {
            select {
            case bts := <-out:
                if len(bts) > 0 {
                    atomic.AddUint64(&opsChannelGot, 1)
                    responseBytes = append(responseBytes, bts)
                }
                break
            case <-quit:
                fmt.Printf("I quit, i read %d, i got %d\n", opsFileRead, opsChannelGot)
                return
            }
        }
    }()
    for _, fileName := range fileNames {
        wg.Add(1)
        go func(fName string, out chan []byte, wg *sync.WaitGroup) {
            defer wg.Done()
            data, err := ioutil.ReadFile(fName)
            if err != nil {
                log.Fatal(err)
            }
            out <- data
            atomic.AddUint64(&opsFileRead, 1)
        }(fileName, out, &wg)
    }
    wg.Wait()
    quit <- 1
    return responseBytes, nil
}

正确答案


当收到退出时,输出通道可以包含值。通过创建一个无缓冲的通道来修复:

out := make(chan []byte)

这可以确保在退出之前收到从工作人员发送的值:

  • 无缓冲通道上的发送/接收发生在调用 wg.done() 之前
  • wg.done() 的所有调用都发生在 wg.wait() 返回之前
  • wg.wait() 在将值发送到 quit 之前返回

因此,在将值发送到 quit 之前,先从 out 接收值。

另一种方法是关闭 out 通道,向结果收集器发出信号:工作人员已完成:

func getContents(fileNames []string) ([][]byte, error) {
    wg := sync.WaitGroup{}
    var responseBytes [][]byte
    out := make(chan []byte)
    var opsFileRead uint64
    var opsChannelGot uint64

    for _, fileName := range fileNames {
        wg.Add(1)
        go func(fName string, out chan []byte, wg *sync.WaitGroup) {
            defer wg.Done()
            data, err := ioutil.ReadFile(fName)
            if err != nil {
                log.Fatal(err)
            }
            out <- data
            atomic.AddUint64(&opsFileRead, 1)
        }(fileName, out, &wg)
    }

    // Close out after workers are done.
    go func() {
        wg.Wait()
        close(out)
    }()

    // Loop over outputs until done.
    for bts := range out {
        if len(bts) > 0 {
            atomic.AddUint64(&opsChannelGot, 1)
            responseBytes = append(responseBytes, bts)
        }
    }

    fmt.Printf("I quit, i read %d, i got %d\n", opsFileRead, opsChannelGot)

    return responseBytes, nil
}

文中关于的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《Goroutines 使用通道同步的问题》文章吧,也可关注golang学习网公众号了解相关技术文章。

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>