gRPC 流:最佳实践和性能见解
来源:dev.to
时间:2024-12-25 10:19:03 148浏览 收藏
大家好,我们又见面了啊~本文《gRPC 流:最佳实践和性能见解》的内容中将会涉及到等等。如果你正在学习Golang相关知识,欢迎关注我,以后会给大家带来更多Golang相关文章,希望我们能一起进步!下面就开始本文的正式内容~
介绍
grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。
在本文中,我们将探讨如何正确使用 grpc 流。
先决条件
- grpc基础知识
- go 编程语言的基础知识(示例代码是用 go 编写的,但这个概念也可以应用于其他语言)
- 代码示例可在 github 上获取
良好实践
让我们检查一下使用 grpc 流的良好实践:
使用一元请求进行一元请求
一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 grpc 服务定义:
service myservice { rpc getsomething (somethingrequest) returns (stream somethingresponse) {} }
如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:
service myservice { rpc getsomething (somethingrequest) returns (somethingresponse) {} }
通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。
比较一元请求和流请求的 golang 代码示例:
一元请求:
type somethingunary struct { pb.unimplementedsomethingunaryserver } func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) { return &pb.somethingresponse{ message: "hello " + req.name, }, nil } func testsomethingunary(t *testing.t) { conn := newserver(t, func(s grpc.serviceregistrar) { pb.registersomethingunaryserver(s, &somethingunary{}) }) client := pb.newsomethingunaryclient(conn) response, err := client.getsomething( context.background(), &pb.somethingrequest{ name: "test", }, ) if err != nil { t.fatalf("failed to get something: %v", err) } if response.message != "hello test" { t.errorf("unexpected response: %v", response.message) } }
流式一元请求:
type somethingstream struct { pb.unimplementedsomethingstreamserver } func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error { if err := stream.send(&pb.somethingresponse{ message: "hello " + req.name, }); err != nil { return err } return nil } func testsomethingstream(t *testing.t) { conn := newserver(t, func(s grpc.serviceregistrar) { pb.registersomethingstreamserver(s, &somethingstream{}) }) client := pb.newsomethingstreamclient(conn) stream, err := client.getsomething( context.background(), &pb.somethingrequest{ name: "test", }, ) if err != nil { t.fatalf("failed to get something stream: %v", err) } response, err := stream.recv() if err != nil { t.fatalf("failed to receive response: %v", err) } if response.message != "hello test" { t.errorf("unexpected response: %v", response.message) } }
我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。
如果可以的话,一次发送多个文档
让我们比较一下这两个服务定义:
service bookstore { rpc listbooks(listbooksrequest) returns (stream book) {} } service bookstorebatch { rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {} } message listbooksresponse { repeated book books = 1; }
bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。
如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。
让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:
书店:
type bookstore struct { pb.unimplementedbookstoreserver } func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error { for _, b := range bookstoredata { if b.author == req.author { if err := stream.send(&pb.book{ title: b.title, author: b.author, publicationyear: int32(b.publicationyear), genre: b.genre, }); err != nil { return err } } } return nil } func testbookstore_listbooks(t *testing.t) { conn := newserver(t, func(s grpc.serviceregistrar) { pb.registerbookstoreserver(s, &bookstore{}) }) client := pb.newbookstoreclient(conn) stream, err := client.listbooks( context.background(), &pb.listbooksrequest{ author: charlesdickens, }, ) if err != nil { t.fatalf("failed to list books: %v", err) } books := []*pb.book{} for { book, err := stream.recv() if err != nil { break } books = append(books, book) } if len(books) != charlesdickensbooks { t.errorf("unexpected number of books: %d", len(books)) } }
书店批次:
type bookstorebatch struct { pb.unimplementedbookstorebatchserver } func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error { const batchsize = 10 books := make([]*pb.book, 0, batchsize) for _, b := range bookstoredata { if b.author == req.author { books = append(books, &pb.book{ title: b.title, author: b.author, publicationyear: int32(b.publicationyear), genre: b.genre, }) if len(books) == batchsize { if err := stream.send(&pb.listbooksresponse{ books: books, }); err != nil { return err } books = books[:0] } } } if len(books) > 0 { if err := stream.send(&pb.listbooksresponse{ books: books, }); err != nil { return nil } } return nil } func testbookstorebatch_listbooks(t *testing.t) { conn := newserver(t, func(s grpc.serviceregistrar) { pb.registerbookstorebatchserver(s, &bookstorebatch{}) }) client := pb.newbookstorebatchclient(conn) stream, err := client.listbooks( context.background(), &pb.listbooksrequest{ author: charlesdickens, }, ) if err != nil { t.fatalf("failed to list books: %v", err) } books := []*pb.book{} for { response, err := stream.recv() if err != nil { break } books = append(books, response.books...) } if len(books) != charlesdickensbooks { t.errorf("unexpected number of books: %d", len(books)) } }
从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:
书店基准:
func benchmarkbookstore_listbooks(b *testing.b) { conn := newserver(b, func(s grpc.serviceregistrar) { pb.registerbookstoreserver(s, &bookstore{}) }) client := pb.newbookstoreclient(conn) var benchinnerbooks []*pb.book b.resettimer() for i := 0; i < b.n; i++ { stream, err := client.listbooks( context.background(), &pb.listbooksrequest{ author: charlesdickens, }, ) if err != nil { b.fatalf("failed to list books: %v", err) } books := []*pb.book{} for { book, err := stream.recv() if err != nil { break } books = append(books, book) } benchinnerbooks = books } benchbooks = benchinnerbooks }
bookstorebatch 基准:
func benchmarkbookstorebatch_listbooks(b *testing.b) { conn := newserver(b, func(s grpc.serviceregistrar) { pb.registerbookstorebatchserver(s, &bookstorebatch{}) }) client := pb.newbookstorebatchclient(conn) var benchinnerbooks []*pb.book b.resettimer() for i := 0; i < b.n; i++ { stream, err := client.listbooks( context.background(), &pb.listbooksrequest{ author: charlesdickens, }, ) if err != nil { b.fatalf("failed to list books: %v", err) } books := []*pb.book{} for { response, err := stream.recv() if err != nil { break } books = append(books, response.books...) } benchinnerbooks = books } benchbooks = benchinnerbooks }
基准测试结果:
benchmarkbookstore_listbooks benchmarkbookstore_listbooks-12 732 1647454 ns/op 85974 b/op 1989 allocs/op benchmarkbookstorebatch_listbooks benchmarkbookstorebatch_listbooks-12 1202 937491 ns/op 61098 b/op 853 allocs/op
多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。
但是为什么 bookstorebatch 比 bookstore 快?
服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。
在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。
使用双向流来控制流量
书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。
双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。
如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。
让我们看一下下面的protobuf定义:
service sensor { rpc watch(stream watchrequest) returns (stream watchresponse) {} } message watchrequest { oneof request { watchcreaterequest create_request = 1; watchcancelrequest cancel_request = 2; watchnowrequest now_request = 3; } } message watchcreaterequest { // sensor_id contains the sensor id to watch. string sensor_id = 1; } message watchcancelrequest { // sensor_id contains the sensor id to cancel. string sensor_id = 1; } message watchnowrequest { // sensor_id contains the sensor id to get the current value. string sensor_id = 1; } message watchresponse { // sensor_id contains the sensor id for the current response. string sensor_id = 1; // created is true if the watch was created successfully. bool created = 2; // canceleted is true if the watch was canceled successfully or if the creation failed. bool canceleted = 3; // error contains the error message if something went wrong. string error = 4; // timestamp contains the timestamp of the value. google.protobuf.timestamp timestamp = 5; // value contains the value of the sensor. int32 value = 6; }
请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。
传感器的 golang 代码将被忽略,但您可以在这里找到它
serverstream 包装流和传感器数据,使其更易于使用。
type serverstream struct { s *sensorservice // service stream pb.sensor_watchserver // stream sendch chan *pb.watchresponse // control channel sensorch chan sensordata // data channel sensorwatch map[string]int // map of sensor id to watch id }
如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。
接收消息:
func (ss *serverstream) recvloop() error { defer ss.close() for { req, err := ss.stream.recv() if errors.is(err, io.eof) { return nil } if err != nil { return err } switch req := req.request.(type) { case *pb.watchrequest_createrequest: // ignore validation (check the full code) // create a channel to send data to the client id := sensor.watch(ss.sensorch) ss.sensorwatch[sensorid] = id // send created message ss.sendch <- &pb.watchresponse{ sensorid: sensorid, created: true, } case *pb.watchrequest_cancelrequest: // ignore validation (check the full code) // cancel the watch ss.s.sensors[sensorid].cancel(id) delete(ss.sensorwatch, sensorid) ss.sendch <- &pb.watchresponse{ sensorid: sensorid, canceleted: true, } case *pb.watchrequest_nowrequest: // ignore validation (check the full code) // send current value ss.sendch <- &pb.watchresponse{ sensorid: sensorid, timestamp: timestamppb.now(), value: int32(sensor.read()), } } } }
switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留recvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。
发送消息:
func (ss *serverstream) sendloop() { for { select { case m, ok := <-ss.sendch: if !ok { return } // send message if err := ss.stream.send(m); err != nil { return } case data, ok := <-ss.sensorch: if !ok { return } // send data if err := ss.stream.send(&pb.watchresponse{ sensorid: data.id, timestamp: timestamppb.new(data.time), value: int32(data.val), }); err != nil { return } case <-ss.stream.context().done(): return } } }
sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。
最后,传感器服务的快乐路径测试:
func TestSensor(t *testing.T) { conn := newServer(t, func(s grpc.ServiceRegistrar) { pb.RegisterSensorServer(s, &sensorService{ sensors: newSensors(), }) }) client := pb.NewSensorClient(conn) stream, err := client.Watch(context.Background()) if err != nil { t.Fatalf("failed to watch: %v", err) } response := make(chan *pb.WatchResponse) // Go routine to read from the stream go func() { defer close(response) for { resp, err := stream.Recv() if errors.Is(err, io.EOF) { return } if err != nil { return } response <- resp } }() createRequest(t, stream, "temp") waitUntilCreated(t, response, "temp") waitForSensorData(t, response, "temp") createRequest(t, stream, "pres") waitUntilCreated(t, response, "pres") waitForSensorData(t, response, "pres") waitForSensorData(t, response, "temp") waitForSensorData(t, response, "pres") // invalid sensor createRequest(t, stream, "invalid") waitUntilCanceled(t, response, "invalid") nowRequest(t, stream, "light") waitForSensorData(t, response, "light") // Wait for 2 seconds to make sure we don't receive any data for light waitForNoSensorData(t, response, "light", 2*time.Second) cancelRequest(t, stream, "temp") waitUntilCanceled(t, response, "temp") waitForSensorData(t, response, "pres") // Wait for 2 seconds to make sure we don't receive any data for temp waitForNoSensorData(t, response, "temp", 2*time.Second) err = stream.CloseSend() if err != nil { t.Fatalf("failed to close send: %v", err) } }
从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。
挑战自己
- 使用 grpc 流实现聊天应用程序。
- 修改传感器服务以一次发送多个值以节省往返次数。
- 嗅探网络流量以查看一元请求和流式请求之间的区别。
结论
grpc 流是一种用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。
保持联系
如果您有任何问题或反馈,请随时在 linkedin 上与我联系。
到这里,我们也就讲完了《gRPC 流:最佳实践和性能见解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
-
505 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
501 收藏
-
438 收藏
-
280 收藏
-
181 收藏
-
371 收藏
-
236 收藏
-
416 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 立即学习 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 立即学习 507次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 立即学习 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 立即学习 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 立即学习 484次学习