文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

gRPC入门指南之 双向流式RPC

2024-12-03 02:49

关注

本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。

你好,我是 Seekload!

前言

前一篇文章我们学习了客户端流式 RPC,客户端多次向服务端发送数据,发送结束之后,由服务端返回一个响应。与服务端流式 RPC类似,都只支持单项连续发送数据,今天我们要来学习双向流式 RPC 支持通信双方同时多次发送或接收数据。如下如所示:

新建并编译proto文件

新建 bidirectional_stream.proto 文件:

  1. syntax = "proto3"
  2.  
  3. package proto; 
  4.  
  5. // 定义流式请求信息 
  6. message StreamRequest{ 
  7.   // 参数类型 参数名称 标识号 
  8.   string data = 1; 
  9.  
  10. // 定义流响应信息 
  11. message StreamResponse{ 
  12.   int32 code = 1; 
  13.   string value = 2; 
  14.  
  15. // 定义我们的服务(可以定义多个服务,每个服务可以定义多个接口) 
  16. service StreamService{ 
  17.   // 双向流RPC,需要在请求、响应数据前加stream 
  18.   rpc Record(stream StreamRequest) returns (stream StreamResponse){}; 

双向流式 RPC,定义方法时需要在请求值和返回值之前加上 stream。

进入 bidirectional_stream.proto 所在的目录,使用如下命令编译文件

  1. protoc --go_out=plugins=grpc:. bidirectional_stream.proto 

执行完成之后会生成 bidirectional_stream.pb.go 文件。

创建server端

  1. package main 
  2.  
  3. import ( 
  4.  pb "go-grpc-example/4-bidirectional_stream_rpc/proto" 
  5.  "google.golang.org/grpc" 
  6.  "io" 
  7.  "log" 
  8.  "net" 
  9.  "strconv" 
  10.  "time" 
  11.  
  12. const ( 
  13.  Address string = ":8000" 
  14.  Network string = "tcp" 
  15.  
  16. // 定义我们的服务 
  17. type StreamService struct{} 
  18.  
  19. // 实现 Record() 方法 
  20. func (s *StreamService) Record(srv pb.StreamService_RecordServer) error { 
  21.  n := 1 
  22.  for { 
  23.   // 接收数据 
  24.   req, err := srv.Recv() 
  25.   if err == io.EOF { 
  26.    return nil 
  27.   } 
  28.   if err != nil { 
  29.    log.Fatalf("stream get from client err: %v", err) 
  30.    return err 
  31.   } 
  32.   // 发送数据 
  33.   err = srv.Send(&pb.StreamResponse{ 
  34.    Code:  int32(n), 
  35.    Value: "This is the " + strconv.Itoa(n) + " message"
  36.   }) 
  37.   if err != nil { 
  38.    log.Fatalf("stream send to client err: %v", err) 
  39.    return err 
  40.   } 
  41.   n++ 
  42.   log.Println("stream get from client: ", req.Data) 
  43.   time.Sleep(1 * time.Second
  44.  } 
  45.  return nil 
  46.  
  47. func main() { 
  48.  // 1.监听端口 
  49.  listener, err := net.Listen(Network, Address) 
  50.  if err != nil { 
  51.   log.Fatalf("listener err: %v", err) 
  52.  } 
  53.  log.Println(Address + " net.Listing..."
  54.  
  55.  // 2.实例化gRPC实例 
  56.  grpcServer := grpc.NewServer() 
  57.  
  58.  // 3.注册我们的服务 
  59.  pb.RegisterStreamServiceServer(grpcServer, &StreamService{}) 
  60.  
  61.  // 4.启动gRPC服务端 
  62.  err = grpcServer.Serve(listener) 
  63.  if err != nil { 
  64.   log.Fatalf("grpc server err: %v", err) 
  65.  } 

在实现的 Record() 方法中,for() 循环里面读取客户端发送的消息并返回一个响应数据。

运行服务端:

  1. go run server.go 
  2.  
  3. 输出: 
  4. :8000  net listening... 

创建client端

  1. package main 
  2.  
  3. import ( 
  4.  "context" 
  5.  pb "go-grpc-example/4-bidirectional_stream_rpc/proto" 
  6.  "google.golang.org/grpc" 
  7.  "io" 
  8.  "log" 
  9.  "strconv" 
  10.  "time" 
  11.  
  12. const Address = ":8000" 
  13.  
  14. func main() { 
  15.  // 1.连接服务端 
  16.  conn, err := grpc.Dial(Address, grpc.WithInsecure()) 
  17.  if err != nil { 
  18.   log.Fatalf("grpc conn err: %v", err) 
  19.  } 
  20.  defer conn.Close() 
  21.  
  22.  // 2.创建gRPC客户端 
  23.  grpcClient := pb.NewStreamServiceClient(conn) 
  24.  
  25.  // 3.调用 Record() 方法获取流 
  26.  stream, err := grpcClient.Record(context.Background()) 
  27.  if err != nil { 
  28.   log.Fatalf("call record err: %v", err) 
  29.  } 
  30.  
  31.  for i := 0; i < 5; i++ { 
  32.   // 4.发送数据 
  33.   err := stream.Send(&pb.StreamRequest{ 
  34.    Data: strconv.Itoa(i), 
  35.   }) 
  36.   if err != nil { 
  37.    log.Fatalf("stream send to server err: %v", err) 
  38.   } 
  39.   // 5.接收服务端发送过来的数据 
  40.   resp, err := stream.Recv() 
  41.   if err == io.EOF { 
  42.    break 
  43.   } 
  44.   if err != nil { 
  45.    log.Fatalf("stream get from server err: %v", err) 
  46.   } 
  47.   log.Printf("stream get from server,code:%v,value:%v", resp.GetCode(), resp.Value) 
  48.   time.Sleep(1 * time.Second
  49.  } 
  50.  // 6.关闭流 
  51.  err = stream.CloseSend() 
  52.  if err != nil { 
  53.   log.Fatalf("close stream err:%v", err) 
  54.  } 

客户端代码,在 for() 循环里面向服务端发送了 5 次消息,并接收服务端返回的数据,5次数据交互之后调用 CloseSend() 关闭流。

运行客户端:

  1. go run client.go 

客户端输出:

  1. stream get from server,code:1,value:This is the 1 message 
  2. stream get from server,code:2,value:This is the 2 message 
  3. stream get from server,code:3,value:This is the 3 message 
  4. stream get from server,code:4,value:This is the 4 message 
  5. stream get from server,code:5,value:This is the 5 message 

服务端输出:

  1. stream get from client:  0 
  2. stream get from client:  1 
  3. stream get from client:  2 
  4. stream get from client:  3 
  5. stream get from client:  4 

观察仔细的同学会注意到,客户端和服务端是交替输出的。

总结

 

这篇文章我们简单介绍了 gRPC 的双向流式 RPC,支持通信双方同时多次发送或接收数据。

 

来源:Golang来啦内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯