grpc介绍(三)——流传输

定义

使用stream关键字修饰来表示流程传输。

  • 当该关键字修饰参数时,表示为客户端流式的gRPC接口;
// 普通RPC
rpc SimplePing(PingRequest) returns (PingReply);// 客户端流式RPC
rpc SimplePing(stream PingRequest) returns (PingReply);
  • 当该参数修饰返回值时,表示为服务端流式的gRPC接口;
// 服务端流式RPC
rpc SimplePing(PingRequest) returns (stream PingReply);
  • 当该参数修饰参数和返回值时,表示为双向流式的gRPC接口;
// 双向流式RPC
rpc SimplePing(stream PingRequest) returns (stream PingReply);

客户端流

1)修改.proto文件

// Product.protosyntax = "proto3";option go_package="../service";package service;message ProductRequest {int32 prod_id = 1;
}message ProductReponse {int32 prod_stock = 1;
}service ProdService {rpc GetProductStock(ProductRequest) returns(ProductReponse);// 增加客户端流定义方法rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductReponse);
}

2)修改接口实现文件

// product.gopackage serviceimport ("context""io""fmt"
)var ProductService = &productService{}type productService struct {
}// 接口实现
func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductReponse, error) {stock := p.GetStockById(request.ProdId)return &ProductReponse{ProdStock: stock}, nil
}// 新增客户端流实现接口
func (p *productService) UpdateProductStockClientStream(stream ProdService_UpdateProductStockClientStreamServer) error {count := 0for {recv, err := stream.Recv()if err != nil {if err == io.EOF {return nil}return err}fmt.Println("服务端接收到的流", recv.ProdId)count++if count > 10 {rsq := &ProductReponse{ProdStock: recv.ProdId}err := stream.SendAndClose(rsq)if err != nil {return err}return nil}}
}func (p *productService) GetStockById(id int32) int32 {return id
}

3)修改客户端文件

// grpc_client.gopackage mainimport ("google.golang.org/grpc""log""service""context""fmt""google.golang.org/grpc/credentials""crypto/tls""crypto/x509""io/ioutil""auth""time"
)func main() {// 添加公钥// creds, err0 := credentials.NewClientTLSFromFile("../cert/server.pem", "*.dong.com")// if err0 != nil {// 	log.Fatal("证书错误: ", err0)// }// 证书认证-双向认证// 从证书相关文件中读取解析信息, 得到证书公钥、密钥对cert, err0 := tls.LoadX509KeyPair("../cert/client.pem", "../cert/client.key")if err0 != nil {log.Fatal("证书读取失败", err0)}fmt.Println("证书读取成功")// 创建一个新的、空的CertPoolcertPool := x509.NewCertPool()ca, err1 := ioutil.ReadFile("../cert/ca.crt")if err1 != nil {log.Fatal("ca证书读取失败", err1)}fmt.Println("ca证书读取成功")// 尝试解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面使用certPool.AppendCertsFromPEM(ca)// 构建基于TLS的TransportCredentials选项creds := credentials.NewTLS(&tls.Config{// 设置证书链, 允许包含一个或多个Certificates: []tls.Certificate{cert},ServerName:   "*.dong.com",RootCAs:      certPool,})fmt.Println("设置TLS的TransportCredentials选项成功")token := &auth.Authentication{User:   "admin",Passwd: "admin@123",}// 创建连接conn, err := grpc.Dial(":8800", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))if err != nil {log.Fatal("服务端连接失败: ", err)}fmt.Println("证书认证通过")// 退出时关闭连接defer conn.Close()// 创建客户端实例productServiceClient := service.NewProdServiceClient(conn)// 方法请求// resq, err := productServiceClient.GetProductStock(context.Background(), &service.ProductRequest{ProdId: 233})// if err != nil {// 	log.Fatal("调用gRPC方法失败: ", err)// }// fmt.Println("调用gRPC方法成功, ProdStock = ", resq.ProdStock)// 获取流stream, err := productServiceClient.UpdateProductStockClientStream(context.Background())if err != nil {log.Fatal("获取流失败", err)}// 定义切片,设置请求rsq := make(chan struct{}, 1)go prodRequest(stream, rsq)// 等待数据接收select {case <-rsq:// 接收数据recv, err := stream.CloseAndRecv()if err != nil {log.Fatal(err)}stock := recv.ProdStockfmt.Println("客户端收到响应: ", stock)}
}// 请求接口
func prodRequest(stream service.ProdService_UpdateProductStockClientStreamClient, rsq chan struct{}) {count := 0for {request := &service.ProductRequest{ProdId: 100,}// 发送数据err := stream.Send(request)if err != nil {log.Fatal(err)}time.Sleep(time.Second)count++if count > 10 {rsq <- struct{}{}break}}
}

4)测试

先后启动服务端和客户端,打印如下:

服务端:

在这里插入图片描述

客户端:

在这里插入图片描述

服务端流

1)修改.proto文件

//新增服务端流定义方法syntax = "proto3";option go_package="../service";package service;message ProductRequest {int32 prod_id = 1;
}message ProductReponse {int32 prod_stock = 1;
}service ProdService {rpc GetProductStock(ProductRequest) returns(ProductReponse);// 客户端流定义方法rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductReponse);// 服务端流定义方法rpc GetProductStockServerStream(ProductRequest) returns(stream ProductReponse); 
}

2)修改接口实现文件

// product.gopackage serviceimport ("context""io""fmt""time"
)var ProductService = &productService{}type productService struct {
}// 接口实现
func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductReponse, error) {stock := p.GetStockById(request.ProdId)return &ProductReponse{ProdStock: stock}, nil
}// 新增客户端接口实现
func (p *productService) UpdateProductStockClientStream(stream ProdService_UpdateProductStockClientStreamServer) error {count := 0for {recv, err := stream.Recv()if err != nil {if err == io.EOF {return nil}return err}fmt.Println("服务端接收到的流", recv.ProdId)count++if count > 10 {rsq := &ProductReponse{ProdStock: recv.ProdId}err := stream.SendAndClose(rsq)if err != nil {return err}return nil}}
}// 新增服务端接口实现
func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProdService_GetProductStockServerStreamServer) error {count := 0for {rsp := &ProductReponse{ProdStock: request.ProdId}err := stream.Send(rsp)if err != nil {return err}time.Sleep(time.Second)count++if count > 10 {return nil}}
}func (p *productService) GetStockById(id int32) int32 {return id
}

3)修改客户端文件

package mainimport ("google.golang.org/grpc""log""service""context""fmt""google.golang.org/grpc/credentials""crypto/tls""crypto/x509""io/ioutil""io""auth""time"
)func main() {// 添加公钥// creds, err0 := credentials.NewClientTLSFromFile("../cert/server.pem", "*.dong.com")// if err0 != nil {// 	log.Fatal("证书错误: ", err0)// }// 证书认证-双向认证// 从证书相关文件中读取解析信息, 得到证书公钥、密钥对cert, err0 := tls.LoadX509KeyPair("../cert/client.pem", "../cert/client.key")if err0 != nil {log.Fatal("证书读取失败", err0)}fmt.Println("证书读取成功")// 创建一个新的、空的CertPoolcertPool := x509.NewCertPool()ca, err1 := ioutil.ReadFile("../cert/ca.crt")if err1 != nil {log.Fatal("ca证书读取失败", err1)}fmt.Println("ca证书读取成功")// 尝试解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面使用certPool.AppendCertsFromPEM(ca)// 构建基于TLS的TransportCredentials选项creds := credentials.NewTLS(&tls.Config{// 设置证书链, 允许包含一个或多个Certificates: []tls.Certificate{cert},ServerName:   "*.dong.com",RootCAs:      certPool,})fmt.Println("设置TLS的TransportCredentials选项成功")token := &auth.Authentication{User:   "admin",Passwd: "admin@123",}// 创建连接conn, err := grpc.Dial(":8800", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))if err != nil {log.Fatal("服务端连接失败: ", err)}fmt.Println("证书认证通过")// 退出时关闭连接defer conn.Close()// 创建客户端实例productServiceClient := service.NewProdServiceClient(conn)// 方法请求// resq, err := productServiceClient.GetProductStock(context.Background(), &service.ProductRequest{ProdId: 233})// if err != nil {// 	log.Fatal("调用gRPC方法失败: ", err)// }// fmt.Println("调用gRPC方法成功, ProdStock = ", resq.ProdStock)// 获取流// stream, err := productServiceClient.UpdateProductStockClientStream(context.Background())// if err != nil {// 	log.Fatal("获取流失败", err)// }// rsq := make(chan struct{}, 1)// go prodRequest(stream, rsq)// select {// case <-rsq:// 	recv, err := stream.CloseAndRecv()// 	if err != nil {// 		log.Fatal(err)// 	}// 	stock := recv.ProdStock// 	fmt.Println("客户端收到响应: ", stock)// }request := &service.ProductRequest{ProdId: 100,}// 调用服务端接口获取流stream, err := productServiceClient.GetProductStockServerStream(context.Background(), request)if err != nil {log.Fatal("获取流失败")}// 循环获取流for {recv, err := stream.Recv()if err != nil {// 流数据接收完成标志if err == io.EOF {fmt.Println("客户端接收数据完成")stream.CloseSend()break}log.Fatal(err)}fmt.Println("客户端收到的流", recv.ProdStock)}
}func prodRequest(stream service.ProdService_UpdateProductStockClientStreamClient, rsq chan struct{}) {count := 0for {request := &service.ProductRequest{ProdId: 100,}err := stream.Send(request)if err != nil {log.Fatal(err)}time.Sleep(time.Second)count++if count > 10 {rsq <- struct{}{}break}}
}

4)测试

先后启动服务端和客户端,打印如下:

服务端:

在这里插入图片描述

客户端

在这里插入图片描述

双向流

1)修改.proto文件

// 说明: 新增双向流定义方法syntax = "proto3";option go_package="../service";package service;message ProductRequest {int32 prod_id = 1;
}message ProductReponse {int32 prod_stock = 1;
}service ProdService {rpc GetProductStock(ProductRequest) returns(ProductReponse);// 客户端流定义方法rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductReponse);// 服务端流定义方法rpc GetProductStockServerStream(ProductRequest) returns(stream ProductReponse); // 双向流定义方法rpc SayHelloStream(stream ProductRequest) returns(stream ProductReponse); 
}

2)修改接口实现文件

package serviceimport ("context""io""fmt""time"
)var ProductService = &productService{}type productService struct {
}// 接口实现
func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductReponse, error) {stock := p.GetStockById(request.ProdId)return &ProductReponse{ProdStock: stock}, nil
}func (p *productService) UpdateProductStockClientStream(stream ProdService_UpdateProductStockClientStreamServer) error {count := 0for {recv, err := stream.Recv()if err != nil {if err == io.EOF {return nil}return err}fmt.Println("服务端接收到的流", recv.ProdId)count++if count > 10 {rsq := &ProductReponse{ProdStock: recv.ProdId}err := stream.SendAndClose(rsq)if err != nil {return err}return nil}}
}func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProdService_GetProductStockServerStreamServer) error {count := 0for {rsp := &ProductReponse{ProdStock: request.ProdId}err := stream.Send(rsp)if err != nil {return err}time.Sleep(time.Second)count++if count > 10 {return nil}}
}func (p *productService) SayHelloStream(stream ProdService_SayHelloStreamServer) error {for {// 接收消息recv, err := stream.Recv()if err != nil {return nil}fmt.Println("服务端接收客户端的消息", recv.ProdId)time.Sleep(time.Second)// 发送消息rsp := &ProductReponse{ProdStock: recv.ProdId}err = stream.Send(rsp)if err != nil {return err}}
}func (p *productService) GetStockById(id int32) int32 {return id
}

3)修改客户端文件

package mainimport ("google.golang.org/grpc""log""service""context""fmt""google.golang.org/grpc/credentials""crypto/tls""crypto/x509""io/ioutil"//	"io""auth""time"
)func main() {// 添加公钥// creds, err0 := credentials.NewClientTLSFromFile("../cert/server.pem", "*.dong.com")// if err0 != nil {// 	log.Fatal("证书错误: ", err0)// }// 证书认证-双向认证// 从证书相关文件中读取解析信息, 得到证书公钥、密钥对cert, err0 := tls.LoadX509KeyPair("../cert/client.pem", "../cert/client.key")if err0 != nil {log.Fatal("证书读取失败", err0)}fmt.Println("证书读取成功")// 创建一个新的、空的CertPoolcertPool := x509.NewCertPool()ca, err1 := ioutil.ReadFile("../cert/ca.crt")if err1 != nil {log.Fatal("ca证书读取失败", err1)}fmt.Println("ca证书读取成功")// 尝试解析所传入的PEM编码的证书,如果解析成功会将其加到CertPool中,便于后面使用certPool.AppendCertsFromPEM(ca)// 构建基于TLS的TransportCredentials选项creds := credentials.NewTLS(&tls.Config{// 设置证书链, 允许包含一个或多个Certificates: []tls.Certificate{cert},ServerName:   "*.dong.com",RootCAs:      certPool,})fmt.Println("设置TLS的TransportCredentials选项成功")token := &auth.Authentication{User:   "admin",Passwd: "admin@123",}// 创建连接conn, err := grpc.Dial(":8800", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token))if err != nil {log.Fatal("服务端连接失败: ", err)}fmt.Println("证书认证通过")// 退出时关闭连接defer conn.Close()// 创建客户端实例productServiceClient := service.NewProdServiceClient(conn)// 方法请求// resq, err := productServiceClient.GetProductStock(context.Background(), &service.ProductRequest{ProdId: 233})// if err != nil {// 	log.Fatal("调用gRPC方法失败: ", err)// }// fmt.Println("调用gRPC方法成功, ProdStock = ", resq.ProdStock)// 获取流// stream, err := productServiceClient.UpdateProductStockClientStream(context.Background())// if err != nil {// 	log.Fatal("获取流失败", err)// }// rsq := make(chan struct{}, 1)// go prodRequest(stream, rsq)// select {// case <-rsq:// 	recv, err := stream.CloseAndRecv()// 	if err != nil {// 		log.Fatal(err)// 	}// 	stock := recv.ProdStock// 	fmt.Println("客户端收到响应: ", stock)// }// request := &service.ProductRequest{// 	ProdId: 100,// }// stream, err := productServiceClient.GetProductStockServerStream(context.Background(), request)// if err != nil {// 	log.Fatal("获取流失败")// }// for {// 	recv, err := stream.Recv()// 	if err != nil {// 		if err == io.EOF {// 			fmt.Println("客户端接收数据完成")// 			stream.CloseSend()// 			break// 		}// 		log.Fatal(err)// 	}// 	fmt.Println("客户端收到的流", recv.ProdStock)// }// 获取双向流stream, err := productServiceClient.SayHelloStream(context.Background())for {// 发送消息request := &service.ProductRequest{ProdId: 100,}err = stream.Send(request)if err != nil {log.Fatal(err)}time.Sleep(time.Second)recv, err := stream.Recv()if err != nil {log.Fatal(err)}fmt.Println("客户端接收服务端的消息", recv.ProdStock)}
}func prodRequest(stream service.ProdService_UpdateProductStockClientStreamClient, rsq chan struct{}) {count := 0for {request := &service.ProductRequest{ProdId: 100,}err := stream.Send(request)if err != nil {log.Fatal(err)}time.Sleep(time.Second)count++if count > 10 {rsq <- struct{}{}break}}
}

4)测试

先后启动服务端和客户端,打印如下:

服务端:

在这里插入图片描述

客户端:

在这里插入图片描述

类似于心跳的功能。

部分参考:
HTTP/2介绍:https://blog.csdn.net/qq_34827674/article/details/115188949


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部