登录
首页 >  Golang >  Go问答

任务的并发划分在 Goroutine 中进行

来源:stackoverflow

时间:2024-02-14 13:00:27 236浏览 收藏

大家好,我们又见面了啊~本文《任务的并发划分在 Goroutine 中进行》的内容中将会涉及到等等。如果你正在学习Golang相关知识,欢迎关注我,以后会给大家带来更多Golang相关文章,希望我们能一起进步!下面就开始本文的正式内容~

问题内容

这段代码的作用

代码从 postgresql 数据库获取数据。所有数据中只有两个字段(session 和 text)被添加到 task struct

我的数据库中只有 2 个(每个)提到的数据,这意味着执行 len(task) 时将返回 2 作为输出。

现在从这里开始就是问题所在:

我创建了一个 buffered 通道 ch,其长度等于任务结构的长度(在本例中为 2)。

我指定允许的最大工作线程数,这里是 20。

下面的代码所做的是,当我将任务发送到通道时,发送 task 结构 中的所有元素(此处为 2),并且 task 结构中的示例代码将打印所有元素两次(= task 结构的长度) 。示例显示在最后。

我需要这个程序做什么

例如通道中有100条数据 len(task) = 100。我想将这100个数据分成20个goroutine,每个goroutine处理5个数据(我不知道这是否可行,如果无效,请提供其他解决方案)。

因此,这 100 个数据将提供给 20 个工作人员,他们每人将接收 5 个数据并与他们一起运行任务,最后通道将关闭,仅此而已。

当数据库变得越来越大时,这将很有帮助,而且目前也是如此。

让 20 个 worker 每个执行任务更好,还是让 worker 数量等于通道中的数据数量更好?

var wg sync.waitgroup

type task struct {
    fetchedsession string
    fetchedtext    string
}

func fetchalldata() {

    var task []task

    //fetch session from db
    var sess []database.usersession
    database.db.find(&sess)
    //fetch commenttext from db
    var cmt []database.commentreq
    database.db.find(&cmt)

    if len(sess) == len(cmt) {
        for i := range sess {
            task = append(task, task{fetchedsession: sess[i].session, fetchedtext: cmt[i].commenttext})
        }
    }

    //making the task channel
    ch := make(chan []task, len(task))

    max_workers := 20

    wg.add(max_workers)

    for i := 0; i < max_workers; i++ {
        go func() {
            for {
                t, ok := <-ch
                if !ok {
                    wg.done()
                    return
                }
                dotasks(t)
            }
        }()
    }

    for i := 0; i < len(task); i++ {
        ch <- task
    }

    close(ch)
    wg.wait()
}

//since total number of data in database is 2 (rows)
//currently this function takes all data from the channel and runs twice
func dotasks(t []task) {

    //total tasks (data) = 100
    //if max workers = 20, then this function will run 5 times
    //each goroutine will get 4 tasks from the channel
    // get the fetchedsession and fetchedtask and do tasks

    fmt.println(t) // this prints all data twice

    //finish one task and continue with the second
}

示例:

Example Data:
Task{FetchedSession: "EncodedString",  FetchedText: "Hello"}
Task{FetchedSession: "ExampleString",  FetchedText: "Hi"}
//Output
EncodedString
Hello
ExampleString
Hi
EncodedString
Hello
ExampleString
Hi

正确答案


  • 更改任务渠道类型。
ch := make(chan task, len(task))

这意味着通道上传递的每个值都代表一个单个任务。

  • 简化渠道迭代
for i := 0; i < max_workers; i++ {
        go func() {
            defer wg.done()
            for t := range ch {
                dotask(t)
            }
        }()
    }

wg.done() 现在将在工作线程退出时运行。 range ch 将在通道关闭且所有任务消耗完毕后停止。

  • 更改“do”函数以匹配
func DoTask(t Task) {

关于如何选择worker数量:

Run some benchmarks 为您的 fetchalldata 函数,并尝试更改 max_workers (或将其作为参数传递)。最佳值将取决于任务以及运行该函数时的可用资源,这意味着您今天的计算机上的最佳值可能不是其他人的计算机或您明天的计算机上的最佳值。基准应该可以帮助您找到一个合适的近似范围来放置该值。

本篇关于《任务的并发划分在 Goroutine 中进行》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>