本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。
你好,我是 Seekload!
前言
前一篇文章我们学习了客户端流式 RPC,客户端多次向服务端发送数据,发送结束之后,由服务端返回一个响应。与服务端流式 RPC类似,都只支持单项连续发送数据,今天我们要来学习双向流式 RPC 支持通信双方同时多次发送或接收数据。如下如所示:
新建并编译proto文件
新建 bidirectional_stream.proto 文件:
- syntax = "proto3";
-
- package proto;
-
- // 定义流式请求信息
- message StreamRequest{
- // 参数类型 参数名称 标识号
- string data = 1;
- }
-
- // 定义流响应信息
- message StreamResponse{
- int32 code = 1;
- string value = 2;
- }
-
- // 定义我们的服务(可以定义多个服务,每个服务可以定义多个接口)
- service StreamService{
- // 双向流RPC,需要在请求、响应数据前加stream
- rpc Record(stream StreamRequest) returns (stream StreamResponse){};
- }
双向流式 RPC,定义方法时需要在请求值和返回值之前加上 stream。
进入 bidirectional_stream.proto 所在的目录,使用如下命令编译文件
- protoc --go_out=plugins=grpc:. bidirectional_stream.proto
执行完成之后会生成 bidirectional_stream.pb.go 文件。
创建server端
- package main
-
- import (
- pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
- "google.golang.org/grpc"
- "io"
- "log"
- "net"
- "strconv"
- "time"
- )
-
- const (
- Address string = ":8000"
- Network string = "tcp"
- )
-
- // 定义我们的服务
- type StreamService struct{}
-
- // 实现 Record() 方法
- func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
- n := 1
- for {
- // 接收数据
- req, err := srv.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- log.Fatalf("stream get from client err: %v", err)
- return err
- }
- // 发送数据
- err = srv.Send(&pb.StreamResponse{
- Code: int32(n),
- Value: "This is the " + strconv.Itoa(n) + " message",
- })
- if err != nil {
- log.Fatalf("stream send to client err: %v", err)
- return err
- }
- n++
- log.Println("stream get from client: ", req.Data)
- time.Sleep(1 * time.Second)
- }
- return nil
- }
-
- func main() {
- // 1.监听端口
- listener, err := net.Listen(Network, Address)
- if err != nil {
- log.Fatalf("listener err: %v", err)
- }
- log.Println(Address + " net.Listing...")
-
- // 2.实例化gRPC实例
- grpcServer := grpc.NewServer()
-
- // 3.注册我们的服务
- pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
-
- // 4.启动gRPC服务端
- err = grpcServer.Serve(listener)
- if err != nil {
- log.Fatalf("grpc server err: %v", err)
- }
- }
在实现的 Record() 方法中,for() 循环里面读取客户端发送的消息并返回一个响应数据。
运行服务端:
- go run server.go
-
- 输出:
- :8000 net listening...
创建client端
- package main
-
- import (
- "context"
- pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
- "google.golang.org/grpc"
- "io"
- "log"
- "strconv"
- "time"
- )
-
- const Address = ":8000"
-
- func main() {
- // 1.连接服务端
- conn, err := grpc.Dial(Address, grpc.WithInsecure())
- if err != nil {
- log.Fatalf("grpc conn err: %v", err)
- }
- defer conn.Close()
-
- // 2.创建gRPC客户端
- grpcClient := pb.NewStreamServiceClient(conn)
-
- // 3.调用 Record() 方法获取流
- stream, err := grpcClient.Record(context.Background())
- if err != nil {
- log.Fatalf("call record err: %v", err)
- }
-
- for i := 0; i < 5; i++ {
- // 4.发送数据
- err := stream.Send(&pb.StreamRequest{
- Data: strconv.Itoa(i),
- })
- if err != nil {
- log.Fatalf("stream send to server err: %v", err)
- }
- // 5.接收服务端发送过来的数据
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Fatalf("stream get from server err: %v", err)
- }
- log.Printf("stream get from server,code:%v,value:%v", resp.GetCode(), resp.Value)
- time.Sleep(1 * time.Second)
- }
- // 6.关闭流
- err = stream.CloseSend()
- if err != nil {
- log.Fatalf("close stream err:%v", err)
- }
- }
客户端代码,在 for() 循环里面向服务端发送了 5 次消息,并接收服务端返回的数据,5次数据交互之后调用 CloseSend() 关闭流。
运行客户端:
- go run client.go
客户端输出:
- stream get from server,code:1,value:This is the 1 message
- stream get from server,code:2,value:This is the 2 message
- stream get from server,code:3,value:This is the 3 message
- stream get from server,code:4,value:This is the 4 message
- stream get from server,code:5,value:This is the 5 message
服务端输出:
- stream get from client: 0
- stream get from client: 1
- stream get from client: 2
- stream get from client: 3
- stream get from client: 4
观察仔细的同学会注意到,客户端和服务端是交替输出的。
总结
这篇文章我们简单介绍了 gRPC 的双向流式 RPC,支持通信双方同时多次发送或接收数据。