登录
首页 >  Golang >  Go问答

并发处理程序阻塞

来源:stackoverflow

时间:2024-02-10 10:45:22 352浏览 收藏

golang学习网今天将给大家带来《并发处理程序阻塞》,感兴趣的朋友请继续看下去吧!以下内容将会涉及到等等知识点,如果你是正在学习Golang或者已经是大佬级别了,都非常欢迎也希望大家都能给我建议评论哈~希望能帮助到大家!

问题内容

我们发现一个 mqtt.messagehandler 无法正常工作。在处理程序中,我们将过滤即将到来的消息,然后将有效事件传递给一个函数进行处理。该函数的实现如下:

func processEvent(i models.Foo) (string, error) {
    var wg sync.WaitGroup
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)

    err := func1()
    if err != nil {
        return err
    }

    switch strings.ToUpper(i.Status) {
    case "OK":
        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask1()
            ch := done
            if err != nil {
                log.Error("%s", err.Error())
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask2()
            ch := done
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        result := "processed"
        count := 0
        for {
            select {
            case err := <-errc:
                close(quit)
                log.Info("event: %s, %s", "", err.Error())
                return "", err
            case <-done:
                count++
                if count == 4 { // why 4???
                    return result, nil
                }
            }
        }

        wg.Wait()

        if err != nil {
            log.Info("event: %s, %s", result, err.Error())
            return result, err
        }
        close(quit)
        close(errc)
        close(done)
        return result, nil
    default:
        return "", nil
    }

    return "", nil
}

我明白,它正在尝试同步 longtimetask1() 和 longtimetask2()。但对我来说理解起来相当复杂。 count 和 count == 4 的目的是什么?为什么最后关门了?代码提示无法访问 wg.wait()。 在这个功能运行良好之前。但最近 longtimetask1()longtimetask2() 可能会返回一些错误,这会破坏代码,该函数似乎被完全阻止。您能帮我理解代码并找到潜在的问题并重构这部分吗?


正确答案


查看 count,代码似乎期望从 done 通道接收四条消息。然而,这段代码最多可以从两个 goroutine 生成两条这样的消息,所以这是一个错误。

此外,如果任何 goroutine 返回错误,它不会写入 done 通道,所以这是另一个错误。

另一种写法可能是:

...
result := "processed"
for {
    select {
       case err := <-errc:
          close(quit) // tell the goroutines to terminate
          log.info("event: %s, %s", "", err.error())
          wg.wait() // wait for them to finish
          return "", err
  
       case <-done:
          count++
          if count == 2 {
              wg.wait()
              return result, nil
          }    
}

这正是 errgroup 包设计的那种 fork-and-join 并发性:

func processEvent(ctx context.Context, i models.Foo) (string, error) {
    err := func1()
    if err != nil {
        return "", err
    }

    g, ctx := errgroup.WithContext(ctx)

    if strings.ToUpper(i.Status) != "OK" {
        return "", nil
    }

    g.Go(func() error { return longTimeTask1(ctx) })
    g.Go(func() error { return longTimeTask2(ctx) })

    if err := g.Wait(); err != nil {
        log.Printf("event: %v", err)
        return "", err
    }
    return "processed", nil
}

https://play.golang.org/p/JNMKftQTLGs

今天关于《并发处理程序阻塞》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>