为何数据被写入通道却未被接收者 goroutine 读取?
来源:stackoverflow
时间:2024-03-26 10:42:40 446浏览 收藏
在使用 Go 语言构建数据缓冲系统时,服务 A 无法将数据推送到服务 B 的通道中。服务 B 中的缓冲子协程未能进入接收数据的情况。分析代码后发现,问题出在服务 A 中,因为当将新通道提供给服务 B 时,接收器不是指针,导致服务 A 的副本中更改了通道,而这些副本在函数退出时被丢弃。通过将接收器更改为指针并使用正确的赋值,数据能够成功从服务 A 发送到服务 B 的通道。
我正在构建一个守护进程,并且有两个服务将相互发送数据。服务 a 产生数据,服务 b a 是数据缓冲区服务或类似队列。因此,从 main.go
文件中,服务 b 被实例化并启动。 start()
方法将执行 buffer()
函数作为 goroutine,因为该函数等待数据传递到通道,并且我不希望主进程停止等待 buffer
完成。然后服务a被实例化并启动。然后它也向服务 b“注册”。
我为服务 a 创建了一个名为 registerwithbufferservice
的方法,该方法创建两个新通道。它将把这些通道存储为它自己的属性,并将它们提供给服务 b。
func (s *servicea) registerwithbufferservice(bufservice *data.databuffer) error { newincomingchan := make(chan *data.dataframe, 1) newoutgoingchan := make(chan []byte, 1) s.incomingbuffchan = newincomingchan s.outgoingdatachannels = append(s.outgoingdatachannels, newoutgoingchan) bufservice.dataproviders[s.servicename()] = data.dataproviderinfo{ incomingchan: newoutgoingchan, //our outgoing channel is their incoming outgoingchan: newincomingchan, // our incoming channel is their outgoing } s.databufferservice = bufservice bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering s.logger.info().msg("registeration completed.") return nil }
buffer 本质上监听来自服务 a 的传入数据,使用 decode()
对其进行解码,然后将其添加到名为 buf
的切片中。如果切片的长度大于 bufferperiod
,则它将在传出通道中将切片中的第一项发送回服务 a。
func (b* databuffer) buffer(bufferperiod int) { for { select { case newprovider := <- b.newprovider: b.wg.add(1) /* newprovider is a string dataproviders is a map the value it returns is a struct containing the incoming and outgoing channels for this service */ p := b.dataproviders[newprovider] go func(prov string, in chan []byte, out chan *dataframe) { defer b.wg.done() var buf []*dataframe for { select { case rawdata := <-in: tmp := decode(rawdata) //custom decoding function. returns a *dataframe buf = append(buf, tmp) if len(buf) < bufferperiod { b.logger.info().msg("sending decoded data out.") out <- buf[0] buf = buf[1:] //pop } case <- b.quit: return } } }(newprovider, p.incomingchan, p.outgoingchan) } case <- b.quit: return } }
现在服务 a 有一个名为 record
的方法,该方法会定期将数据推送到其 outgoingdatachannels
属性中的所有通道。
func (s *servicea) record() error { ... if atomic.loadint32(&s.listeners) != 0 { s.logger.info().msg("sending raw data to data buffer") for _, outchan := range s.outgoingdatachannels { outchan <- databytes // the receiver (service b) is already listening and this doesn't hang } s.logger.info().msg("raw data sent and received") // the logger will output this so i know it's not hanging } }
问题是,服务 a 似乎使用 record
成功推送数据,但服务 b 从未进入 case rawdata := <-in:
情况(在 buffer
子协程中)。这是因为我有嵌套的 goroutine 吗?如果不清楚的话,当服务 b 启动时,它会调用 buffer
但因为否则它会挂起,所以我将对 buffer
的调用设为 goroutine。因此,当服务 a 调用 registerwithbufferservice
时,buffer
goroutine 创建一个 goroutine 来监听来自服务 b 的新数据,并在缓冲区填满后将其推送回服务 a。我希望我解释清楚了。
编辑 1 我制作了一个最小的、可重现的示例。
package main import ( "fmt" "sync" "sync/atomic" "time" ) var ( defaultbufferingperiod int = 3 defaultpollinginterval int64 = 10 ) type dataobject struct{ data string } type dataprovider interface { registerwithbufferservice(*databuffer) error servicename() string } type dataproviderinfo struct{ incomingchan chan *dataobject outgoingchan chan *dataobject } type databuffer struct{ running int32 //used atomically dataproviders map[string]dataproviderinfo quit chan struct{} newprovider chan string wg sync.waitgroup } func newdatabuffer() *databuffer{ var ( wg sync.waitgroup ) return &databuffer{ dataproviders: make(map[string]dataproviderinfo), quit: make(chan struct{}), newprovider: make(chan string), wg: wg, } } func (b *databuffer) start() error { if ok := atomic.compareandswapint32(&b.running, 0, 1); !ok { return fmt.errorf("could not start data buffer service.") } go b.buffer(defaultbufferingperiod) return nil } func (b *databuffer) stop() error { if ok := atomic.compareandswapint32(&b.running, 1, 0); !ok { return fmt.errorf("could not stop data buffer service.") } for _, p := range b.dataproviders { close(p.incomingchan) close(p.outgoingchan) } close(b.quit) b.wg.wait() return nil } // buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing dataframes func (b *databuffer) buffer(bufferperiod int) { for { select { case newprovider := <- b.newprovider: fmt.println("received new data provider.") if _, ok := b.dataproviders[newprovider]; ok { b.wg.add(1) p := b.dataproviders[newprovider] go func(prov string, in chan *dataobject, out chan *dataobject) { defer b.wg.done() var ( buf []*dataobject ) fmt.printf("waiting for data from: %s\n", prov) for { select { case indata := <-in: fmt.printf("received data from: %s\n", prov) buf = append(buf, indata) if len(buf) > bufferperiod { fmt.printf("queue is filled, sending data back to %s\n", prov) out <- buf[0] fmt.println("data sent") buf = buf[1:] //pop } case <- b.quit: return } } }(newprovider, p.incomingchan, p.outgoingchan) } case <- b.quit: return } } } type servicea struct{ active int32 // atomic stopping int32 // atomic recording int32 // atomic listeners int32 // atomic name string quitchan chan struct{} incomingbuffchan chan *dataobject outgoingbuffchans []chan *dataobject databufferservice *databuffer } // a compile time check to ensure servicea fully implements the dataprovider interface var _ dataprovider = (*servicea)(nil) func newservicea() (*servicea, error) { var newsliceoutchans []chan *dataobject return &servicea{ quitchan: make(chan struct{}), outgoingbuffchans: newsliceoutchans, name: "servicea", }, nil } // start starts the service. returns an error if any issues occur func (s *servicea) start() error { atomic.storeint32(&s.active, 1) return nil } // stop stops the service. returns an error if any issues occur func (s *servicea) stop() error { atomic.storeint32(&s.stopping, 1) close(s.quitchan) return nil } func (s *servicea) startrecording(pol_int int64) error { if ok := atomic.compareandswapint32(&s.recording, 0, 1); !ok { return fmt.errorf("could not start recording. data recording already started") } ticker := time.newticker(time.duration(pol_int) * time.second) go func() { for { select { case <-ticker.c: fmt.println("time to record...") err := s.record() if err != nil { return } case <-s.quitchan: ticker.stop() return } } }() return nil } func (s *servicea) record() error { current_time := time.now() ct := fmt.sprintf("%02d-%02d-%d", current_time.day(), current_time.month(), current_time.year()) dataobject := &dataobject{ data: ct, } if atomic.loadint32(&s.listeners) != 0 { fmt.println("sending data to data buffer...") for _, outchan := range s.outgoingbuffchans { outchan <- dataobject // the receivers should already be listening } fmt.println("data sent.") } return nil } // registerwithbufferservice satisfies the dataprovider interface. it provides the bufservice with new incoming and outgoing channels along with a polling interval func (s servicea) registerwithbufferservice(bufservice *databuffer) error { if _, ok := bufservice.dataproviders[s.servicename()]; ok { return fmt.errorf("%v data provider already registered with data buffer.", s.servicename()) } newincomingchan := make(chan *dataobject, 1) newoutgoingchan := make(chan *dataobject, 1) s.incomingbuffchan = newincomingchan s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan) bufservice.dataproviders[s.servicename()] = dataproviderinfo{ incomingchan: newoutgoingchan, //our outgoing channel is their incoming outgoingchan: newincomingchan, // our incoming channel is their outgoing } s.databufferservice = bufservice bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering return nil } // servicename satisfies the dataprovider interface. it returns the name of the service. func (s servicea) servicename() string { return s.name } func main() { var bufferedservices []dataprovider fmt.println("instantiating and starting data buffer service...") bufservice := newdatabuffer() err := bufservice.start() if err != nil { panic(fmt.sprintf("%v", err)) } defer bufservice.stop() fmt.println("data buffer service successfully started.") fmt.println("instantiating and starting service a...") servicea, err := newservicea() if err != nil { panic(fmt.sprintf("%v", err)) } bufferedservices = append(bufferedservices, *servicea) err = servicea.start() if err != nil { panic(fmt.sprintf("%v", err)) } defer servicea.stop() fmt.println("service a successfully started.") fmt.println("registering services with data buffer...") for _, s := range bufferedservices { _ = s.registerwithbufferservice(bufservice) // ignoring error msgs for base case } fmt.println("registration complete.") fmt.println("beginning recording...") _ = atomic.addint32(&servicea.listeners, 1) err = servicea.startrecording(defaultpollinginterval) if err != nil { panic(fmt.sprintf("%v", err)) } for { select { case rtd := <-servicea.incomingbuffchan: fmt.println(rtd) case <-servicea.quitchan: atomic.storeint32(&servicea.listeners, 0) bufservice.quit<-struct{}{} } } }
在 go 1.17 上运行。运行示例时,它应该每 10 秒打印以下内容:
Time to record... Sending data to Data buffer... Data sent.
但是数据缓冲区永远不会进入 indata := <-in
情况。
正确答案
为了诊断这个问题,我将 fmt.println("sending data to data buffer...")
更改为 fmt.println("sending data to data buffer...", s.outgoingbuffchans)
,输出为:
time to record... sending data to data buffer... []
所以您实际上并没有将数据发送到任何通道。原因是:
func (s servicea) registerwithbufferservice(bufservice *databuffer) error {
由于当您执行 s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan)
时,接收器不是指针,因此您将在 servicea
的副本中更改 s.outgoingbuffchans
,该副本在函数退出时将被丢弃。要修复此更改:
func (s servicea) registerwithbufferservice(bufservice *databuffer) error {
至
func (s *servicea) registerwithbufferservice(bufservice *databuffer) error {
和
bufferedservices = append(bufferedservices, *servicea)
至
bufferedservices = append(bufferedservices, servicea)
修改后的版本输出:
Time to record... Sending data to Data buffer... [0xc0000d8060] Data sent. Received data from: SERVICEA Time to record... Sending data to Data buffer... [0xc0000d8060] Data sent. Received data from: SERVICEA
因此,这解决了报告的问题(如果存在其他问题,我不会感到惊讶,但希望这能为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能会遇到另一个问题(但在这种情况下很难对代码片段进行评论)。
终于介绍完啦!小伙伴们,这篇关于《为何数据被写入通道却未被接收者 goroutine 读取?》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!
-
502 收藏
-
502 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
139 收藏
-
204 收藏
-
325 收藏
-
477 收藏
-
486 收藏
-
439 收藏
-
357 收藏
-
352 收藏
-
101 收藏
-
440 收藏
-
212 收藏
-
143 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习