登录
首页 >  Golang >  Go问答

利用 goroutine 和上下文实现可取消的工作线程

来源:stackoverflow

时间:2024-02-13 08:36:24 135浏览 收藏

“纵有疾风来,人生不言弃”,这句话送给正在学习Golang的朋友们,也希望在阅读本文《利用 goroutine 和上下文实现可取消的工作线程》后,能够真的帮助到大家。我也会在后续的文章中,陆续更新Golang相关的技术文章,有好的建议欢迎大家在评论留言,非常感谢!

问题内容

我试图了解如何正确使用 goroutine 以及通道和上下文,以创建可取消的后台工作线程。

我熟悉使用显式调用时可以取消的上下文,将其附加到工作 goroutine 应该可以让我停止工作。

但我不知道如何使用它来实现这一目标。

下面的示例说明了一个从通道“urls”获取数据的工作 goroutine,并且它还带有可取消的上下文。

//worker.go
func worker(id int, client *http.client, urls chan string, ctx context.context, wg *sync.waitgroup) {
    fmt.printf("worker %d is starting\n", id)
    select {
    // placeholder for a channel writing the data from the url
    case url := <-urls:
        fmt.printf("worker :%d received url :%s\n", id, url)
    // checking if the process is cancelled
    case <-ctx.done():
        fmt.printf("worker :%d exitting..\n", id)
    }
    fmt.printf("worker :%d done..\n", id)
    wg.done()
}

这对我不起作用有两个原因,

  1. 对于无缓冲通道,在没有可供读取的 goroutine 的情况下对其进行写入将会阻塞它,因此一旦将更多数据添加到 urls 通道,发送方就会阻塞。
  2. 一旦两个通道中的任何一个返回,它就会立即返回。

我还尝试将选择包装在无限循环中,但在上下文引发错误后添加一个中断。

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    for {
        select {
        // placeholder for a channel writing the data from the URL
        case url := <-urls:
            fmt.Printf("Worker :%d received url :%s\n", id, url)
        // checking if the process is cancelled
        case <-ctx.Done():
            fmt.Printf("Worker :%d exitting..\n", id)
            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
        }
    }
    fmt.Printf("Worker :%d done..\n", id) // code is unreachable
    wg.Done()
}

实现这样的事情的正确方法是什么?

ps:任何有关设计此类工作流程的资源/参考文献也会很有帮助。


正确答案


您可以用 return 代替break,代码就可以工作。

但是,更好的方法可以是:

  1. 工作线程在 for / range 循环中消耗通道
  2. 生产者应负责检测取消并关闭通道。 for循环将在级联中停止

我专门为此制作了一个 go 包。您可以在这里找到它:https://github.com/MicahParks/ctxerrpool

以下是项目 readme.md 中的示例:

package main

import (
    "bytes"
    "context"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/MicahParks/ctxerrpool"
)

func main() {

    // Create an error handler that logs all errors.
    var errorHandler ctxerrpool.ErrorHandler
    errorHandler = func(pool ctxerrpool.Pool, err error) {
        log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
    }

    // Create a worker pool with 4 workers.
    pool := ctxerrpool.New(4, errorHandler)

    // Create some variables to inherit through a closure.
    httpClient := &http.Client{}
    u := "https://golang.org"
    logger := log.New(os.Stdout, "status codes: ", 0)

    // Create the worker function.
    var work ctxerrpool.Work
    work = func(ctx context.Context) (err error) {

        // Create the HTTP request.
        var req *http.Request
        if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
            return err
        }

        // Do the HTTP request.
        var resp *http.Response
        if resp, err = httpClient.Do(req); err != nil {
            return err
        }

        // Log the status code.
        logger.Println(resp.StatusCode)

        return nil
    }

    // Do the work 16 times.
    for i := 0; i < 16; i++ {

        // Create a context for the work.
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        // Send the work to the pool.
        pool.AddWorkItem(ctx, work)
    }

    // Wait for the pool to finish.
    pool.Wait()
}

到这里,我们也就讲完了《利用 goroutine 和上下文实现可取消的工作线程》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>