登录
首页 >  Golang >  Go问答

为 Golang GRPC 创建持续流(pubsub)服务器的指南

来源:stackoverflow

时间:2024-02-23 15:36:25 321浏览 收藏

在IT行业这个发展更新速度很快的行业,只有不停止的学习,才不会被行业所淘汰。如果你是Golang学习者,那么本文《为 Golang GRPC 创建持续流(pubsub)服务器的指南》就很适合你!本篇内容主要包括##content_title##,希望对大家的知识积累有所帮助,助力实战开发!

问题内容

我正在构建需要以发布/订阅方式向所有订阅的消费者发送事件的服务,例如。向所有当前连接的客户端发送一个事件。

我使用 protobuf 来实现以下原型定义:

service eventsservice {
  rpc listenforevents (agentprocess) returns (stream event) {}
}

服务器和客户端都是用 go 编写的。

我的问题是,当客户端启动连接时,流的寿命不长,例如。当服务器从 listenforevents 方法返回时:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
    //persist listener here so it can be used later when backend needs to send some messages to client

    return nil
}

然后客户端几乎立即收到 eof 错误,这意味着服务器可能关闭了连接。

如何才能让客户端长时间订阅服务器? 主要问题是,当客户端调用服务器上的 listenforevents 方法时,我可能没有任何内容可以发送给客户端,这就是为什么我希望此流长期存在稍后可以发送消息。


解决方案


当您从服务器函数返回时,流终止。相反,您应该以某种方式接收事件,并将它们发送到客户端而不从服务器返回。可能有很多方法可以做到这一点。下面是一种方法的草图。

这依赖于在单独的 goroutine 上运行的服务器连接。有一个 broadcast() 函数,它将向所有连接的客户端发送消息。它看起来像这样:

var allregisteredclients map[*pb.agentprocess]chan message
var clientslock sync.rwmutex{}

func broadcast(msg message) {
  clientslock.rlock()
  for _,x:=range allregisteredclients {
      x<-msg
  }
  clientslock.runlock()
}

然后,您的客户必须自行注册并处理消息:

func (e EventsService) ListenForEvents(process *pb.AgentProcess, listener pb.EventsService_ListenForEventsServer) error {
   clientsLock.Lock()
   ch:=make(chan Message)
   allRegisteredClients[process]=ch
   clientsLock.Unlock()

   for msg:=range ch {
       // send message
       // Deal with errors
       // Deal with client terminations
   }
   clientsLock.Lock()
   delete(allRegisteredClients,process)
   clientsLock.Unlock()
}

正如我所说,这只是这个想法的一个草图。

今天关于《为 Golang GRPC 创建持续流(pubsub)服务器的指南》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>