登录
首页 >  Golang >  Go教程

gRPC 流:最佳实践和性能见解

来源:dev.to

时间:2024-12-25 10:19:03 148浏览 收藏

大家好,我们又见面了啊~本文《gRPC 流:最佳实践和性能见解》的内容中将会涉及到等等。如果你正在学习Golang相关知识,欢迎关注我,以后会给大家带来更多Golang相关文章,希望我们能一起进步!下面就开始本文的正式内容~

gRPC 流:最佳实践和性能见解

介绍

grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。

在本文中,我们将探讨如何正确使用 grpc 流。

先决条件

  • grpc基础知识
  • go 编程语言的基础知识(示例代码是用 go 编写的,但这个概念也可以应用于其他语言)
  • 代码示例可在 github 上获取

良好实践

让我们检查一下使用 grpc 流的良好实践:

使用一元请求进行一元请求

一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 grpc 服务定义:

service myservice {
  rpc getsomething (somethingrequest) returns (stream somethingresponse) {}
}

如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:

service myservice {
  rpc getsomething (somethingrequest) returns (somethingresponse) {}
}

通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。

比较一元请求和流请求的 golang 代码示例:

一元请求:

type somethingunary struct {
    pb.unimplementedsomethingunaryserver
}

func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) {
    return &pb.somethingresponse{
        message: "hello " + req.name,
    }, nil
}

func testsomethingunary(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registersomethingunaryserver(s, &somethingunary{})
    })

    client := pb.newsomethingunaryclient(conn)

    response, err := client.getsomething(
        context.background(),
        &pb.somethingrequest{
            name: "test",
        },
    )
    if err != nil {
        t.fatalf("failed to get something: %v", err)
    }

    if response.message != "hello test" {
        t.errorf("unexpected response: %v", response.message)
    }
}

流式一元请求:

type somethingstream struct {
    pb.unimplementedsomethingstreamserver
}

func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error {
    if err := stream.send(&pb.somethingresponse{
        message: "hello " + req.name,
    }); err != nil {
        return err
    }

    return nil
}

func testsomethingstream(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registersomethingstreamserver(s, &somethingstream{})
    })

    client := pb.newsomethingstreamclient(conn)

    stream, err := client.getsomething(
        context.background(),
        &pb.somethingrequest{
            name: "test",
        },
    )
    if err != nil {
        t.fatalf("failed to get something stream: %v", err)
    }

    response, err := stream.recv()
    if err != nil {
        t.fatalf("failed to receive response: %v", err)
    }

    if response.message != "hello test" {
        t.errorf("unexpected response: %v", response.message)
    }
}

我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。

如果可以的话,一次发送多个文档

让我们比较一下这两个服务定义:

service bookstore {
  rpc listbooks(listbooksrequest) returns (stream book) {}
}

service bookstorebatch {
  rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {}
}

message listbooksresponse {
  repeated book books = 1;
}

bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。

如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。

让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:

书店:

type bookstore struct {
    pb.unimplementedbookstoreserver
}

func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error {
    for _, b := range bookstoredata {
        if b.author == req.author {
            if err := stream.send(&pb.book{
                title:           b.title,
                author:          b.author,
                publicationyear: int32(b.publicationyear),
                genre:           b.genre,
            }); err != nil {
                return err
            }
        }
    }
    return nil
}

func testbookstore_listbooks(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registerbookstoreserver(s, &bookstore{})
    })

    client := pb.newbookstoreclient(conn)

    stream, err := client.listbooks(
        context.background(),
        &pb.listbooksrequest{
            author: charlesdickens,
        },
    )
    if err != nil {
        t.fatalf("failed to list books: %v", err)
    }

    books := []*pb.book{}
    for {
        book, err := stream.recv()
        if err != nil {
            break
        }
        books = append(books, book)
    }

    if len(books) != charlesdickensbooks {
        t.errorf("unexpected number of books: %d", len(books))
    }
}

书店批次:

type bookstorebatch struct {
    pb.unimplementedbookstorebatchserver
}

func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error {
    const batchsize = 10
    books := make([]*pb.book, 0, batchsize)
    for _, b := range bookstoredata {
        if b.author == req.author {
            books = append(books, &pb.book{
                title:           b.title,
                author:          b.author,
                publicationyear: int32(b.publicationyear),
                genre:           b.genre,
            })

            if len(books) == batchsize {
                if err := stream.send(&pb.listbooksresponse{
                    books: books,
                }); err != nil {
                    return err
                }
                books = books[:0]
            }
        }
    }

    if len(books) > 0 {
        if err := stream.send(&pb.listbooksresponse{
            books: books,
        }); err != nil {
            return nil
        }
    }

    return nil
}

func testbookstorebatch_listbooks(t *testing.t) {
    conn := newserver(t, func(s grpc.serviceregistrar) {
        pb.registerbookstorebatchserver(s, &bookstorebatch{})
    })

    client := pb.newbookstorebatchclient(conn)

    stream, err := client.listbooks(
        context.background(),
        &pb.listbooksrequest{
            author: charlesdickens,
        },
    )
    if err != nil {
        t.fatalf("failed to list books: %v", err)
    }

    books := []*pb.book{}
    for {
        response, err := stream.recv()
        if err != nil {
            break
        }
        books = append(books, response.books...)
    }

    if len(books) != charlesdickensbooks {
        t.errorf("unexpected number of books: %d", len(books))
    }
}

从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:

书店基准:

func benchmarkbookstore_listbooks(b *testing.b) {
    conn := newserver(b, func(s grpc.serviceregistrar) {
        pb.registerbookstoreserver(s, &bookstore{})
    })

    client := pb.newbookstoreclient(conn)

    var benchinnerbooks []*pb.book
    b.resettimer()
    for i := 0; i < b.n; i++ {
        stream, err := client.listbooks(
            context.background(),
            &pb.listbooksrequest{
                author: charlesdickens,
            },
        )
        if err != nil {
            b.fatalf("failed to list books: %v", err)
        }

        books := []*pb.book{}
        for {
            book, err := stream.recv()
            if err != nil {
                break
            }
            books = append(books, book)
        }

        benchinnerbooks = books
    }

    benchbooks = benchinnerbooks
}

bookstorebatch 基准:

func benchmarkbookstorebatch_listbooks(b *testing.b) {
    conn := newserver(b, func(s grpc.serviceregistrar) {
        pb.registerbookstorebatchserver(s, &bookstorebatch{})
    })

    client := pb.newbookstorebatchclient(conn)

    var benchinnerbooks []*pb.book
    b.resettimer()
    for i := 0; i < b.n; i++ {
        stream, err := client.listbooks(
            context.background(),
            &pb.listbooksrequest{
                author: charlesdickens,
            },
        )
        if err != nil {
            b.fatalf("failed to list books: %v", err)
        }

        books := []*pb.book{}
        for {
            response, err := stream.recv()
            if err != nil {
                break
            }
            books = append(books, response.books...)
        }

        benchinnerbooks = books
    }

    benchbooks = benchinnerbooks
}

基准测试结果:

benchmarkbookstore_listbooks
benchmarkbookstore_listbooks-12                      732           1647454 ns/op           85974 b/op       1989 allocs/op
benchmarkbookstorebatch_listbooks
benchmarkbookstorebatch_listbooks-12                1202            937491 ns/op           61098 b/op        853 allocs/op

多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。

但是为什么 bookstorebatch 比 bookstore 快?

服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。

在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。

使用双向流来控制流量

书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。

双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。

如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。

让我们看一下下面的protobuf定义:

service sensor {
  rpc watch(stream watchrequest) returns (stream watchresponse) {}
}

message watchrequest {
  oneof request {
    watchcreaterequest create_request = 1;
    watchcancelrequest cancel_request = 2;
    watchnowrequest now_request = 3;
  }
}

message watchcreaterequest {
  // sensor_id contains the sensor id to watch.
  string sensor_id = 1;
}

message watchcancelrequest {
  // sensor_id contains the sensor id to cancel.
  string sensor_id = 1;
}

message watchnowrequest {
  // sensor_id contains the sensor id to get the current value.
  string sensor_id = 1;
}

message watchresponse {
  // sensor_id contains the sensor id for the current response.
  string sensor_id = 1;
  // created is true if the watch was created successfully.
  bool created = 2;
  // canceleted is true if the watch was canceled successfully or if the creation failed.
  bool canceleted = 3;
  // error contains the error message if something went wrong.
  string error = 4;
  // timestamp contains the timestamp of the value.
  google.protobuf.timestamp timestamp = 5;
  // value contains the value of the sensor.
  int32 value = 6;
}

请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。

传感器的 golang 代码将被忽略,但您可以在这里找到它

serverstream 包装流和传感器数据,使其更易于使用。

type serverstream struct {
    s           *sensorservice         // service
    stream      pb.sensor_watchserver  // stream
    sendch      chan *pb.watchresponse // control channel
    sensorch    chan sensordata        // data channel
    sensorwatch map[string]int         // map of sensor id to watch id
}

如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。

接收消息:

func (ss *serverstream) recvloop() error {
    defer ss.close()
    for {
        req, err := ss.stream.recv()
        if errors.is(err, io.eof) {
            return nil
        }
        if err != nil {
            return err
        }

        switch req := req.request.(type) {
        case *pb.watchrequest_createrequest:
            // ignore validation (check the full code)

            // create a channel to send data to the client
            id := sensor.watch(ss.sensorch)
            ss.sensorwatch[sensorid] = id

            // send created message
            ss.sendch <- &pb.watchresponse{
                sensorid: sensorid,
                created:  true,
            }

        case *pb.watchrequest_cancelrequest:
            // ignore validation (check the full code)

            // cancel the watch
            ss.s.sensors[sensorid].cancel(id)
            delete(ss.sensorwatch, sensorid)

            ss.sendch <- &pb.watchresponse{
                sensorid:   sensorid,
                canceleted: true,
            }

        case *pb.watchrequest_nowrequest:
            // ignore validation (check the full code)

            // send current value
            ss.sendch <- &pb.watchresponse{
                sensorid:  sensorid,
                timestamp: timestamppb.now(),
                value:     int32(sensor.read()),
            }
        }
    }
}

switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留r​​ecvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。

发送消息:

func (ss *serverstream) sendloop() {
    for {
        select {
        case m, ok := <-ss.sendch:
            if !ok {
                return
            }

            // send message
            if err := ss.stream.send(m); err != nil {
                return
            }

        case data, ok := <-ss.sensorch:
            if !ok {
                return
            }

            // send data
            if err := ss.stream.send(&pb.watchresponse{
                sensorid:  data.id,
                timestamp: timestamppb.new(data.time),
                value:     int32(data.val),
            }); err != nil {
                return
            }

        case <-ss.stream.context().done():
            return
        }
    }
}

sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。

最后,传感器服务的快乐路径测试:

func TestSensor(t *testing.T) {
    conn := newServer(t, func(s grpc.ServiceRegistrar) {
        pb.RegisterSensorServer(s, &sensorService{
            sensors: newSensors(),
        })
    })

    client := pb.NewSensorClient(conn)

    stream, err := client.Watch(context.Background())
    if err != nil {
        t.Fatalf("failed to watch: %v", err)
    }

    response := make(chan *pb.WatchResponse)
    // Go routine to read from the stream
    go func() {
        defer close(response)
        for {
            resp, err := stream.Recv()
            if errors.Is(err, io.EOF) {
                return
            }
            if err != nil {
                return
            }
            response <- resp
        }
    }()

    createRequest(t, stream, "temp")
    waitUntilCreated(t, response, "temp")
    waitForSensorData(t, response, "temp")

    createRequest(t, stream, "pres")
    waitUntilCreated(t, response, "pres")
    waitForSensorData(t, response, "pres")

    waitForSensorData(t, response, "temp")
    waitForSensorData(t, response, "pres")

    // invalid sensor
    createRequest(t, stream, "invalid")
    waitUntilCanceled(t, response, "invalid")

    nowRequest(t, stream, "light")
    waitForSensorData(t, response, "light")
    // Wait for 2 seconds to make sure we don't receive any data for light
    waitForNoSensorData(t, response, "light", 2*time.Second)

    cancelRequest(t, stream, "temp")
    waitUntilCanceled(t, response, "temp")

    waitForSensorData(t, response, "pres")
    // Wait for 2 seconds to make sure we don't receive any data for temp
    waitForNoSensorData(t, response, "temp", 2*time.Second)

    err = stream.CloseSend()
    if err != nil {
        t.Fatalf("failed to close send: %v", err)
    }
}

从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。

挑战自己

  • 使用 grpc 流实现聊天应用程序。
  • 修改传感器服务以一次发送多个值以节省往返次数。
  • 嗅探网络流量以查看一元请求和流式请求之间的区别。

结论

grpc 流是一种用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。

保持联系

如果您有任何问题或反馈,请随时在 linkedin 上与我联系。

到这里,我们也就讲完了《gRPC 流:最佳实践和性能见解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!

声明:本文转载于:dev.to 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>