登录
首页 >  Golang >  Go问答

为何数据被写入通道却未被接收者 goroutine 读取?

来源:stackoverflow

时间:2024-03-26 10:42:40 446浏览 收藏

在使用 Go 语言构建数据缓冲系统时,服务 A 无法将数据推送到服务 B 的通道中。服务 B 中的缓冲子协程未能进入接收数据的情况。分析代码后发现,问题出在服务 A 中,因为当将新通道提供给服务 B 时,接收器不是指针,导致服务 A 的副本中更改了通道,而这些副本在函数退出时被丢弃。通过将接收器更改为指针并使用正确的赋值,数据能够成功从服务 A 发送到服务 B 的通道。

问题内容

我正在构建一个守护进程,并且有两个服务将相互发送数据。服务 a 产生数据,服务 b a 是数据缓冲区服务或类似队列。因此,从 main.go 文件中,服务 b 被实例化并启动。 start() 方法将执行 buffer() 函数作为 goroutine,因为该函数等待数据传递到通道,并且我不希望主进程停止等待 buffer 完成。然后服务a被实例化并启动。然后它也向服务 b“注册”。

我为服务 a 创建了一个名为 registerwithbufferservice 的方法,该方法创建两个新通道。它将把这些通道存储为它自己的属性,并将它们提供给服务 b。

func (s *servicea) registerwithbufferservice(bufservice *data.databuffer) error {
    newincomingchan := make(chan *data.dataframe, 1)
    newoutgoingchan := make(chan []byte, 1)
    s.incomingbuffchan = newincomingchan
    s.outgoingdatachannels = append(s.outgoingdatachannels, newoutgoingchan)
    bufservice.dataproviders[s.servicename()] = data.dataproviderinfo{
        incomingchan: newoutgoingchan, //our outgoing channel is their incoming
        outgoingchan: newincomingchan, // our incoming channel is their outgoing
    }
    s.databufferservice = bufservice
    bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering
    s.logger.info().msg("registeration completed.")
    return nil
}

buffer 本质上监听来自服务 a 的传入数据,使用 decode() 对其进行解码,然后将其添加到名为 buf 的切片中。如果切片的长度大于 bufferperiod,则它将在传出通道中将切片中的第一项发送回服务 a。

func (b* databuffer) buffer(bufferperiod int) {
    for {
        select {
        case newprovider := <- b.newprovider:
            b.wg.add(1)
            /*
            newprovider is a string
            dataproviders is a map the value it returns is a struct containing the incoming and 
            outgoing channels for this service
            */
            p := b.dataproviders[newprovider]
            go func(prov string, in chan []byte, out chan *dataframe) {
                defer b.wg.done()
                var buf []*dataframe
                for {
                    select {
                    case rawdata := <-in:
                        tmp := decode(rawdata) //custom decoding function. returns a *dataframe
                        buf = append(buf, tmp)
                        if len(buf) < bufferperiod {
                            b.logger.info().msg("sending decoded data out.")
                            out <- buf[0]
                            buf = buf[1:] //pop
                        }
                    case <- b.quit:
                        return
                    }
                }
            }(newprovider, p.incomingchan, p.outgoingchan)
        }
    case <- b.quit:
        return
    }
}

现在服务 a 有一个名为 record 的方法,该方法会定期将数据推送到其 outgoingdatachannels 属性中的所有通道。

func (s *servicea) record() error {
    ...
    if atomic.loadint32(&s.listeners) != 0 {
        s.logger.info().msg("sending raw data to data buffer")
        for _, outchan := range s.outgoingdatachannels {
            outchan <- databytes // the receiver (service b) is already listening and this doesn't hang
        }
        s.logger.info().msg("raw data sent and received") // the logger will output this so i know it's not hanging 
    }
}

问题是,服务 a 似乎使用 record 成功推送数据,但服务 b 从未进入 case rawdata := <-in: 情况(在 buffer 子协程中)。这是因为我有嵌套的 goroutine 吗?如果不清楚的话,当服务 b 启动时,它会调用 buffer 但因为否则它会挂起,所以我将对 buffer 的调用设为 goroutine。因此,当服务 a 调用 registerwithbufferservice 时,buffer goroutine 创建一个 goroutine 来监听来自服务 b 的新数据,并在缓冲区填满后将其推送回服务 a。我希望我解释清楚了。

编辑 1 我制作了一个最小的、可重现的示例。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var (
    defaultbufferingperiod int = 3
    defaultpollinginterval int64 = 10
)

type dataobject struct{
    data    string
}

type dataprovider interface {
    registerwithbufferservice(*databuffer) error
    servicename() string
}

type dataproviderinfo struct{
    incomingchan    chan *dataobject
    outgoingchan    chan *dataobject
}

type databuffer struct{
    running         int32 //used atomically
    dataproviders   map[string]dataproviderinfo
    quit            chan struct{}
    newprovider     chan string
    wg              sync.waitgroup
}

func newdatabuffer() *databuffer{
    var (
        wg sync.waitgroup
    )
    return &databuffer{
        dataproviders: make(map[string]dataproviderinfo),
        quit: make(chan struct{}),
        newprovider: make(chan string),
        wg: wg,
    }
}

func (b *databuffer) start() error {
    if ok := atomic.compareandswapint32(&b.running, 0, 1); !ok {
        return fmt.errorf("could not start data buffer service.")
    }
    go b.buffer(defaultbufferingperiod)
    return nil
}

func (b *databuffer) stop() error {
    if ok := atomic.compareandswapint32(&b.running, 1, 0); !ok {
        return fmt.errorf("could not stop data buffer service.")
    }
    for _, p := range b.dataproviders {
        close(p.incomingchan)
        close(p.outgoingchan)
    }
    close(b.quit)
    b.wg.wait()
    return nil
}

// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing dataframes
func (b *databuffer) buffer(bufferperiod int) {
    for {
        select {
        case newprovider := <- b.newprovider:
            fmt.println("received new data provider.")
            if _, ok := b.dataproviders[newprovider]; ok { 
                b.wg.add(1)
                p := b.dataproviders[newprovider]
                go func(prov string, in chan *dataobject, out chan *dataobject) {
                    defer b.wg.done()
                    var (
                        buf []*dataobject
                    )
                    fmt.printf("waiting for data from: %s\n", prov)
                    for {
                        select {
                        case indata := <-in:
                            fmt.printf("received data from: %s\n", prov)
                            buf = append(buf, indata)
                            if len(buf) > bufferperiod {
                                fmt.printf("queue is filled, sending data back to %s\n", prov)
                                out <- buf[0]
                                fmt.println("data sent")
                                buf = buf[1:] //pop
                            }
                        case <- b.quit:
                            return
                        }
                    }
                }(newprovider, p.incomingchan, p.outgoingchan)
            }
        case <- b.quit:
            return
        }
    }
}

type servicea struct{
    active                  int32 // atomic
    stopping                int32 // atomic
    recording               int32 // atomic
    listeners               int32 // atomic
    name                    string
    quitchan                chan struct{}
    incomingbuffchan        chan *dataobject
    outgoingbuffchans       []chan *dataobject
    databufferservice       *databuffer
}

// a compile time check to ensure servicea fully implements the dataprovider interface
var _ dataprovider = (*servicea)(nil)

func newservicea() (*servicea, error) {
    var newsliceoutchans []chan *dataobject
    return &servicea{
        quitchan:  make(chan struct{}),
        outgoingbuffchans: newsliceoutchans,
        name:   "servicea",
    }, nil
}

// start starts the service. returns an error if any issues occur
func (s *servicea) start() error {
    atomic.storeint32(&s.active, 1)
    return nil
}

// stop stops the service. returns an error if any issues occur
func (s *servicea) stop() error {
    atomic.storeint32(&s.stopping, 1)
    close(s.quitchan)
    return nil
}

func (s *servicea) startrecording(pol_int int64) error {
    if ok := atomic.compareandswapint32(&s.recording, 0, 1); !ok {
        return fmt.errorf("could not start recording. data recording already started")
    }
    ticker := time.newticker(time.duration(pol_int) * time.second)
    go func() {
        for {
            select {
            case <-ticker.c:
                fmt.println("time to record...")
                err := s.record()
                if err != nil {
                    return
                }
            case <-s.quitchan:
                ticker.stop()
                return
            }
        }
    }()
    return nil
}

func (s *servicea) record() error {
    current_time := time.now()
    ct := fmt.sprintf("%02d-%02d-%d", current_time.day(), current_time.month(), current_time.year())
    dataobject := &dataobject{
        data: ct,
    }
    if atomic.loadint32(&s.listeners) != 0 {
        fmt.println("sending data to data buffer...")
        for _, outchan := range s.outgoingbuffchans {
            outchan <- dataobject // the receivers should already be listening
        }
        fmt.println("data sent.")
    }
    return nil
}

// registerwithbufferservice satisfies the dataprovider interface. it provides the bufservice with new incoming and outgoing channels along with a polling interval
func (s servicea) registerwithbufferservice(bufservice *databuffer) error {
    if _, ok := bufservice.dataproviders[s.servicename()]; ok {
        return fmt.errorf("%v data provider already registered with data buffer.", s.servicename())
    }
    newincomingchan := make(chan *dataobject, 1)
    newoutgoingchan := make(chan *dataobject, 1)
    s.incomingbuffchan = newincomingchan
    s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan)
    bufservice.dataproviders[s.servicename()] = dataproviderinfo{
        incomingchan: newoutgoingchan, //our outgoing channel is their incoming
        outgoingchan: newincomingchan, // our incoming channel is their outgoing
    }
    s.databufferservice = bufservice
    bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering
    return nil
}

// servicename satisfies the dataprovider interface. it returns the name of the service.
func (s servicea) servicename() string {
    return s.name
}

func main() {
    var bufferedservices []dataprovider
    fmt.println("instantiating and starting data buffer service...")
    bufservice := newdatabuffer()
    err := bufservice.start()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    defer bufservice.stop()
    fmt.println("data buffer service successfully started.")

    fmt.println("instantiating and starting service a...")
    servicea, err := newservicea()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    bufferedservices = append(bufferedservices, *servicea)
    err = servicea.start()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    defer servicea.stop()
    fmt.println("service a successfully started.")

    fmt.println("registering services with data buffer...")
    for _, s := range bufferedservices {
        _ = s.registerwithbufferservice(bufservice) // ignoring error msgs for base case
    }
    fmt.println("registration complete.")

    fmt.println("beginning recording...")
    _ = atomic.addint32(&servicea.listeners, 1)
    err = servicea.startrecording(defaultpollinginterval)
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    for {
        select {
        case rtd := <-servicea.incomingbuffchan:
            fmt.println(rtd)
        case <-servicea.quitchan:
            atomic.storeint32(&servicea.listeners, 0)
            bufservice.quit<-struct{}{}
        }
    }
}

在 go 1.17 上运行。运行示例时,它应该每 10 秒打印以下内容:

Time to record...
Sending data to Data buffer...
Data sent.

但是数据缓冲区永远不会进入 indata := <-in 情况。


正确答案


为了诊断这个问题,我将 fmt.println("sending data to data buffer...") 更改为 fmt.println("sending data to data buffer...", s.outgoingbuffchans) ,输出为:

time to record...
sending data to data buffer... []

所以您实际上并没有将数据发送到任何通道。原因是:

func (s servicea) registerwithbufferservice(bufservice *databuffer) error {

由于当您执行 s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan) 时,接收器不是指针,因此您将在 servicea 的副本中更改 s.outgoingbuffchans ,该副本在函数退出时将被丢弃。要修复此更改:

func (s servicea) registerwithbufferservice(bufservice *databuffer) error {

func (s *servicea) registerwithbufferservice(bufservice *databuffer) error {

bufferedservices = append(bufferedservices, *servicea)

bufferedservices = append(bufferedservices, servicea)

修改后的版本输出:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

因此,这解决了报告的问题(如果存在其他问题,我不会感到惊讶,但希望这能为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能会遇到另一个问题(但在这种情况下很难对代码片段进行评论)。

终于介绍完啦!小伙伴们,这篇关于《为何数据被写入通道却未被接收者 goroutine 读取?》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>