登录
首页 >  Golang >  Go问答

将流式传输消息发送到连接的所有客户端的 GRPC golang 服务中

来源:stackoverflow

时间:2024-02-16 19:09:19 251浏览 收藏

亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《将流式传输消息发送到连接的所有客户端的 GRPC golang 服务中》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下,希望所有认真读完的童鞋们,都有实质性的提高。

问题内容

我有一个带有这样的 .proto 文件的 grpc 服务器..

syntax = "proto3";

package chatserver;

message fromclient {

    string name = 1;
    string body = 2;
}

message fromserver {

    string name = 1;
    string body = 2; 
}

service services {

    rpc chatservice(stream fromclient) returns (stream fromserver){};
}

server.go 代码如下所示..

package chatserver

import (
    "fmt"
    "log"
    "math/rand"
    "sync"
    "time"
)

type messageUnit struct {
    ClientName        string
    MessageBody       string
    MessageUniqueCode int
    ClientUniqueCode  int
}

type messageHandle struct {
    MQue []messageUnit
    mu   sync.Mutex
}

var messageHandleObject = messageHandle{}

type ChatServer struct {
}

//define ChatService
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {

    clientUniqueCode := rand.Intn(1e6)
    errch := make(chan error)
    println(csi.Context())

    // receive messages - init a go routine
    go receiveFromStream(csi, clientUniqueCode, errch)

    // send messages - init a go routine
    go sendToStream(csi, clientUniqueCode, errch)

    return <-errch

}

//receive messages
func receiveFromStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {

    //implement a loop
    for {
        mssg, err := csi_.Recv()
        if err != nil {
            log.Printf("Error in receiving message from client :: %v", err)
            errch_ <- err
        } else {

            messageHandleObject.mu.Lock()

            messageHandleObject.MQue = append(messageHandleObject.MQue, messageUnit{
                ClientName:        mssg.Name,
                MessageBody:       mssg.Body,
                MessageUniqueCode: rand.Intn(1e8),
                ClientUniqueCode:  clientUniqueCode_,
            })

            log.Printf("%v", messageHandleObject.MQue[len(messageHandleObject.MQue)-1])

            messageHandleObject.mu.Unlock()

        }
    }
}

//send message
func sendToStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {

    //implement a loop
    for {

        //loop through messages in MQue
        for {

            time.Sleep(500 * time.Millisecond)

            messageHandleObject.mu.Lock()

            if len(messageHandleObject.MQue) == 0 {
                messageHandleObject.mu.Unlock()
                break
            }

            senderUniqueCode := messageHandleObject.MQue[0].ClientUniqueCode
            senderName4Client := messageHandleObject.MQue[0].ClientName
            message4Client := messageHandleObject.MQue[0].MessageBody

            messageHandleObject.mu.Unlock()

            //send message to designated client (do not send to the same client)
            if senderUniqueCode != clientUniqueCode_ {

                err := csi_.Send(&FromServer{Name: senderName4Client, Body: message4Client})

                if err != nil {
                    errch_ <- err
                }

                messageHandleObject.mu.Lock()

                if len(messageHandleObject.MQue) > 1 {
                    messageHandleObject.MQue = messageHandleObject.MQue[1:] // delete the message at index 0 after sending to receiver
                    fmt.Println("message greater then 1")
                } else {
                    messageHandleObject.MQue = []messageUnit{}
                }

                messageHandleObject.mu.Unlock()

            }

        }

        time.Sleep(100 * time.Millisecond)
    }
}

现在,例如三个客户端连接到服务器。如果一个客户端发送消息,则该消息不会被转发到其他两个客户端。它仅发送给其他两个客户端之一。我可以以某种方式将该消息广播给所有其他客户端吗?或者我可以根据某个客户端 id 指定哪个客户端将接收消息?


正确答案


您必须保留一个包含所有连接流的全局数据结构并循环遍历它们。 您还需要为每一个生成一个唯一的标识符以便区分。 我没有看到任何其他方法。

以上就是《将流式传输消息发送到连接的所有客户端的 GRPC golang 服务中》的详细内容,更多关于的资料请关注golang学习网公众号!

声明:本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>