登录
首页 >  Golang >  Go问答

消息计数在golang生产者-消费者模式中的应用

来源:stackoverflow

时间:2024-03-14 13:36:19 177浏览 收藏

哈喽!大家好,很高兴又见面了,我是golang学习网的一名作者,今天由我给大家带来一篇《消息计数在golang生产者-消费者模式中的应用》,本文主要会讲到等等知识点,希望大家一起学习进步,也欢迎大家关注、点赞、收藏、转发! 下面就一起来看看吧!

问题内容

我用golang编写了生产者-消费者模式。读取多个csv文件并处理记录。我正在一次性读取csv文件的所有记录。

我想以总记录(包括所有 csv 文件)的 5% 为间隔记录处理完成百分比。例如,我有 3 个 csv 需要处理,每个 csv 有 20,30,50 行/记录(因此总共需要处理 100 条记录),希望在处理 5 条记录时记录进度。

2882​​55149137

正确答案


我使用了原子变量和计数器通道,其中消费者将在处理记录时推送计数,其他 goroutine 将从通道读取并计算总处理记录百分比。

var progressPercentageStep float64 = 5.0
var totalRecordsToProcess int32

func processData(inputCSVFiles []string) {
        producerCount := len(inputCSVFiles)
        consumerCount := producerCount
        link := make(chan []string, 100)
        counter := make(chan int, 100)
        defer close(counter)
        wp := &sync.WaitGroup{}
        wc := &sync.WaitGroup{}
    
        wp.Add(producerCount)
        wc.Add(consumerCount)
    
        for i := 0; i < producerCount; i++ {
            go produce(link, inputCSVFiles[i], wp)
        }

        go progressStats(counter)

        for i := 0; i < consumerCount; i++ {
            go consume(link, wc)
        }
        wp.Wait()
        close(link)
        wc.Wait()
        
    }
    
    func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {
        defer wg.Done()
        records := readCsvFile(filePath)
        atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))
        for _, record := range records {
            link <- record
        }
    }
    
    func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {
        defer wg.Done()
        for record := range link {
            // process csv record
            counter <- 1
        }
    }
    
func progressStats(counter <-chan int) {
    var feedbackThreshold = progressPercentageStep
    for count := range counter {
        totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)
        donePercent := 100.0 * processed / totalRemaining
        // log progress
        if donePercent >= feedbackThreshold {
            log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)
            feedbackThreshold += progressPercentageStep
        }
    }
}

今天关于《消息计数在golang生产者-消费者模式中的应用》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>