gRPC 的截止时间与命名解析
文章目录
- 截止时间
- 简介
- 程序示例
- 命名解析器
- 简介
- 程序示例
截止时间
简介
在分布式计算中,截止时间(deadline)和超时时间(timeout)是两个常用的模式。超时时间可以指定客户端应用程序等待 RPC 完成的时间(之后会以错误结束),它通常会以持续时长的方式来指定,并且在每个客户端本地进行应用。
例如,一个请求可能会由多个下游 RPC 组成,它们会将多个服务链接在一起。因此,可以在每个服务调用上,针对每个 RPC 都指定超时时间。这意味着超时时间不能直接应用于请求的整个生命周期,这时需要使用截止时间。
截止时间以请求开始的绝对时间来表示(即使 API 将它们表示为持续时间偏移),并且应用于多个服务调用。发起请求的应用程序设置截止时间,整个请求链需要在截止时间之前进行响应。gRPC API 支持为 RPC使用截止时间,出于多种原因,在 gRPC 应用程序中使用截止时间始终是一种最佳实践。
由于 gRPC 通信是在网络上发生的,因此在 RPC 和响应之间会有延迟。另外,在一些特定的场景中,gRPC 服务本身可能要花费更多的时间来响应,这取决于服务的业务逻辑。如果客户端应用程序在开发时没有指定截止时间,那么它们会无限期地等待自己所发起的 RPC 请求的响应,而资源都会被正在处理的请求所占用。这会让服务和客户端都面临资源耗尽的风险,增加服务的延迟,甚至可能导致整个 gRPC 服务崩溃。
例如,在下图中 gRPC 客户端应用程序调用商品管理服务,而商品管理服务又调用库存服务。

-
客户端应用程序的截止时间设置为 50 毫秒(截止时间 = 当前时间 + 偏移量)。
-
客户端和 ProductMgt 服务之间的网络延迟为 0 毫秒,ProductMgt 服务的处理延迟为 20 毫秒。
-
商品管理服务(ProductMgt 服务)必须将截止时间的偏移量设置为 30 毫秒。因为库存服务(Inventory 服务)需要 30 毫秒来响应,所以截止时间的事件会在两个客户端上发生(ProductMgt 调用 Inventory 服务和客户端应用程序)。
-
ProductMgt 服务的业务逻辑将延迟时间增加了 20 毫秒。随后,ProductMgt 服务的调用逻辑触发了超出截止时间的场景,并且传播回客户端应用程序。因此,在使用截止时间时,要明确它们适用于所有服务场景。
在 Go 语言中,设置 gRPC 应用程序的截止时间是通过调用 context 包的 context.WithDeadline() 函数设置。context 包通常用来向下传递通用的数据,使其能够在整个下游操作中使用,当 gRPC 客户端应用程序发起调用时,客户端的 gRPC 库就会创建所需的 gRPC 头信息,用来表述客户端应用程序和服务器端应用程序之间的截止时间。当 RPC 发送之后,客户端应用程序会在截止时间所声明的时间范围内等待,如果在该时间内 RPC 没有返回,那么该 RPC 会以 DEADLINE_EXCEEDED 错误的形式终止。
程序示例
(1)在任意目录下,分别创建 server 和 client 目录存放服务端和客户端文件,proto 目录用于编写 IDL 的 deadline.proto 文件,cert 目录存放证书文件,具体的目录结构如下所示:
Deadline
├── client
│ ├── cert
│ └── proto
│ └── deadline.proto
└── server├── cert└── proto└── deadline.proto
deadline.proto 文件的具体内容如下所示:
syntax = "proto3"; // 版本声明,使用 Protocol Buffers v3 版本option go_package = "../proto"; // 指定生成的 Go 代码在项目中的导入路径package deadline; // 包名// 定义服务
service Greeter {// SayHello 方法rpc SayHello (HelloRequest) returns (HelloResponse) {}
}// 请求消息
message HelloRequest {string name = 1;
}// 响应消息
message HelloResponse {string reply = 1;
}
(2)移动以下相应的证书文件到 cert 文件夹下,进入 proto 目录生成 gRPC 源代码程序,在 proto 目录下执行以下的命令:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative *.proto
正确生成后的目录结构如下所示:
Deadline
├── client
│ ├── cert
│ │ ├── ca.crt
│ │ ├── server.key
│ │ └── server.pem
│ └── proto
│ ├── deadline_grpc.pb.go
│ ├── deadline.pb.go
│ └── deadline.proto
└── server├── cert│ ├── ca.crt│ ├── server.key│ └── server.pem└── proto├── deadline_grpc.pb.go├── deadline.pb.go└── deadline.proto
(3)在 server 目录下初始化项目( go mod init server ),编写 Server 端程序重写定义的方法,该程序的具体代码如下:
package mainimport ("fmt""net"pb "server/proto""golang.org/x/net/context""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/credentials" // 引入grpc认证包"log""google.golang.org/grpc/metadata" // 引入grpc meta包
)const (// Address gRPC 服务地址Address = "127.0.0.1:50052"
)// 定义 helloService 并实现约定的接口
type helloService struct{pb.UnimplementedGreeterServer
}// SayHello 实现Hello服务接口
func (h *helloService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {// 解析 metadata 中的信息并验证md, ok := metadata.FromIncomingContext(ctx)if !ok {return nil, grpc.Errorf(codes.Unauthenticated, "无 Token 认证信息")}var (appid stringappkey string)if val, ok := md["appid"]; ok {appid = val[0]}if val, ok := md["appkey"]; ok {appkey = val[0]}if appid != "101010" || appkey != "i am key" {return nil, grpc.Errorf(codes.Unauthenticated, "Token 认证信息无效: appid=%s, appkey=%s", appid, appkey)}resp := new(pb.HelloReply)resp.Message = fmt.Sprintf("Hello %s.\nToken info: appid=%s,appkey=%s", in.Name, appid, appkey)return resp, nil
}func main() {listen, err := net.Listen("tcp", Address)if err != nil {log.Fatalf("failed to listen: %v", err)}// TLS 认证creds, err := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key")if err != nil {log.Fatalf("Failed to generate credentials %v", err)}// 实例化 grpc Server , 并开启 TLS 认证s := grpc.NewServer(grpc.Creds(creds))// 注册 HelloServicepb.RegisterGreeterServer(s, &helloService{})log.Println("Listen on " + Address + " with TLS + Token")s.Serve(listen)
}
(4)在 client 目录下,编写 Client 端程序调用服务,该程序的具体代码如下:
package mainimport (pb "client/proto" // 引入 proto 包"golang.org/x/net/context""google.golang.org/grpc""google.golang.org/grpc/credentials" // 引入 grpc 认证包"log""time""google.golang.org/grpc/status"
)const (// Address gRPC 服务地址Address = "127.0.0.1:50052"// OpenTLS 是否开启 TLS 认证OpenTLS = true
)// customCredential 自定义认证
type customCredential struct{}// GetRequestMetadata 实现自定义认证接口
func (c customCredential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {return map[string]string{"appid": "101010","appkey": "i am key",}, nil
}// RequireTransportSecurity 自定义认证是否开启 TLS
func (c customCredential) RequireTransportSecurity() bool {return OpenTLS
}func main() {var err errorvar opts []grpc.DialOptionif OpenTLS {// TLS 连接creds, err := credentials.NewClientTLSFromFile("cert/server.pem", "*.cqupthao.com")if err != nil {log.Fatalf("Failed to create TLS credentials %v", err)}opts = append(opts, grpc.WithTransportCredentials(creds))} else {opts = append(opts, grpc.WithInsecure())}// 使用自定义认证opts = append(opts, grpc.WithPerRPCCredentials(new(customCredential)))conn, err := grpc.Dial(Address, opts...)if err != nil {log.Fatalln(err)}defer conn.Close()// 初始化客户端client := pb.NewGreeterClient(conn)// 调用方法clientDeadline := time.Now().Add(time.Duration(2 * time.Second))ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)defer cancel()// 设置延迟时间// time.Sleep(3 * time.Second)req := &pb.HelloRequest{Name: "gRPC"}res, err := client.SayHello(ctx, req)if err != nil {got := status.Code(err)log.Println(err)log.Fatalln("Error occured: %v ", got)}log.Println(res.Message)
}
执行 Server 端和 Client 端的程序,输出如下的结果:
2023/02/27 19:32:43 Hello gRPC.
Token info: appid=101010,appkey=i am key
若调用服务超过截止时间,则输出如下的结果:
2023/02/27 19:33:52 rpc error: code = DeadlineExceeded desc = context deadline exceeded
2023/02/27 19:33:52 Error occured: %v DeadlineExceeded
exit status 1
命名解析器
简介
命名解析器(name resolver)接受一个服务的名称并返回后端 IP 的列表,可以看作是一个 map[service-name][]backend-ip ,它接收一个服务名称并返回后端的 IP 列表,gRPC 应用程序中根据目标字符串中的 scheme 选择名称解析器。
- DNS 解析器
gRPC 应用程序中默认使用的名称解析器是 DNS ,即在 gRPC 客户端执行 grpc.Dial() 时提供域名,默认会将 DNS 解析出对应的 IP 列表返回,使用默认的 DNS 解析器的名称语法为:dns:[//authority/]host[:port] ,例如以下的代码:
conn, err := grpc.Dial("dns:///localhost:8972",grpc.WithTransportCredentials(insecure.NewCredentials()),
)
- consul reslover
社区里有对应不同注册中心的 resolver ,例如下面使用 consul 作为注册中心的示例,其中使用了第三方的 grpc-consul-resolver 库作为 consul resolver :
package mainimport _ "github.com/mbobakov/grpc-consul-resolver"// ...conn, err := grpc.Dial(// consul 服务"consul://192.168.1.11:8500/hello?wait=14s",grpc.WithTransportCredentials(insecure.NewCredentials()),)
- 自定义解析器
除了使用内置和社区提供的名称解析器,还可以自定义一套自己的名称解析器,实现方式如以下关键的程序代码:
(1)核心接口
//该接口实时监听指定目标的状态,并及时更新配置
type Resolver interface {// ResolveNow will be called by gRPC to try to resolve the target name// again. It's just a hint, resolver can ignore this if it's not necessary.// It could be called multiple times concurrently.ResolveNow(ResolveNowOptions)// Close closes the resolver.Close()
}
// 建立 scheme 与 service.name 之间的关系;并绑定到客户端连接上
type Builder interface {Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)// 返回命名解析所支持的 scheme 信息 Scheme() string
}
(2)常量定义
const (exampleScheme = "example"exampleServiceName = "resolver.example.grpc.io"backendAddr = "localhost:50051"
)
// 最终的命名解析地址为:example:///resolver.example.grpc.io ;后端 node 一个节点:localhost:50051
(3)自定义 Resolver
// 命名解析器的结构
type exampleResolver struct {target resolver.Targetcc resolver.ClientConnaddrsStore map[string][]string
}func (r *exampleResolver) start() {addrStrs := r.addrsStore[r.target.Endpoint]addrs := make([]resolver.Address, len(addrStrs))for i, s := range addrStrs {addrs[i] = resolver.Address{Addr: s}}r.cc.UpdateState(resolver.State{Addresses: addrs})
}func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*exampleResolver) Close()
(4)自定义 Builder
// 命名解析器构建器
type exampleResolverBuilder struct{}func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {// // 创建解析 lb.example.grpc.io 的示例解析器r := &exampleResolver{target: target,cc: cc,addrsStore: map[string][]string{exampleServiceName: {backendAddr}, // 将 lb.example.grpc.io 解析为 localhost:50051 和 localhost:50052},}r.start()return r, nil
}// 为 example 模式创建的解析器
func (*exampleResolverBuilder) Scheme() string { return exampleScheme
}
(5)加载命名服务
func init() {// 这一步非常关键,否则就会出现解析不了的情况,错误信息如下// Unavailable desc = connection error: desc = "transport: Error while dialing dial tcp: lookup tcpresolver.example.grpc.io: nodename nor servname provided, or not known"resolver.Register(&exampleResolverBuilder{})
}
基于这个命名解析器实现,可以为所选的任意服务注册中心实现解析器( Consul、etcd 和 Zookeeper 等)。
程序示例
(1)在任意目录下,分别创建 server 和 client 目录存放服务端和客户端文件,proto 目录用于编写 IDL 的 nameresoler.proto 文件,cert 目录存放证书文件,具体的目录结构如下所示:
NameResoler
├── client
│ └── proto
│ └── nameresoler.proto
└── server└── proto└── nameresoler.proto
nameresoler.proto 文件的具体内容如下所示:
syntax = "proto3"; // 版本声明,使用 Protocol Buffers v3 版option go_package = "../proto"; // 指定生成的 Go 代码在项目中的导入路径package nameresoler; // 包名// 定义服务
service Greeter {// SayHello 方法rpc SayHello (HelloRequest) returns (HelloResponse) {}
}// 请求消息
message HelloRequest {string message = 1;
}// 响应消息
message HelloResponse {string message = 1;
}
(2)进入 proto 目录生成 gRPC 源代码程序,在 proto 目录下执行以下的命令:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative *.proto
正确生成后的目录结构如下所示:
NameResoler
├── client
│ └── proto
│ ├── nameresoler_grpc.pb.go
│ ├── nameresoler.pb.go
│ └── nameresoler.proto
└── server└── proto├── nameresoler_grpc.pb.go├── nameresoler.pb.go└── nameresoler.proto
(3)在 server 目录下初始化项目( go mod init server ),编写 Server 端程序重写定义的方法,该程序的具体代码如下:
package mainimport ("context""fmt""log"pb "server/proto""net""google.golang.org/grpc"
)const addr = "localhost:50051"type server struct {pb.UnimplementedGreeterServeraddr string
}func (s *server) SayHello(ctx context.Context, req *pb.HelloRequest) (*pb.HelloResponse, error) {return &pb.HelloResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}func main() {// 监听本地端口lis, err := net.Listen("tcp", addr)if err != nil {log.Printf("failed to listen: %v", err)}s := grpc.NewServer() // 创建gRPC服务器pb.RegisterGreeterServer(s, &server{addr: addr}) // 在gRPC服务端注册服务log.Println("Serving on %s ", addr)// 启动服务err = s.Serve(lis)if err != nil {log.Printf("failed to serve: %v", err)}
}
(4)在 client 目录下,编写 Client 端程序调用服务,该程序的具体代码如下:
package mainimport ("context""fmt""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "client/proto""google.golang.org/grpc/resolver"
)const (exampleScheme = "example"exampleServiceName = "resolver.example.grpc.io"backendAddr = "localhost:50051"
)func callUnarySayHello(c pb.GreeterClient, message string) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()r, err := c.SayHello(ctx, &pb.HelloRequest{Message: message})if err != nil {log.Fatalf("could not greet: %v", err)}fmt.Println(r.Message)
}func makeRPCs(cc *grpc.ClientConn, n int) {hwc := pb.NewGreeterClient(cc)for i := 0; i < n; i++ {callUnarySayHello(hwc, "this is examples/name_resolving")}
}func main() {passthroughConn, err := grpc.Dial(fmt.Sprintf("passthrough:///%s", backendAddr), // Dial to "passthrough:///localhost:50051"grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {log.Fatalf("did not connect: %v", err)}defer passthroughConn.Close()fmt.Printf("--- calling helloworld.Greeter/SayHello to \"passthrough:///%s\"\n", backendAddr)makeRPCs(passthroughConn, 3)fmt.Println()exampleConn, err := grpc.Dial(fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName), // Dial to "example:///resolver.example.grpc.io"grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {log.Fatalf("did not connect: %v", err)}defer exampleConn.Close()fmt.Printf("--- calling helloworld.Greeter/SayHello to \"%s:///%s\"\n", exampleScheme, exampleServiceName)makeRPCs(exampleConn, 3)
}type exampleResolverBuilder struct{}func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {r := &exampleResolver{target: target,cc: cc,addrsStore: map[string][]string{exampleServiceName: {backendAddr},},}r.start()return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {target resolver.Targetcc resolver.ClientConnaddrsStore map[string][]string
}func (r *exampleResolver) start() {addrStrs := r.addrsStore[r.target.Endpoint()]addrs := make([]resolver.Address, len(addrStrs))for i, s := range addrStrs {addrs[i] = resolver.Address{Addr: s}}r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close() {}func init() {// Register the example ResolverBuilder. This is usually done in a package's// init() function.resolver.Register(&exampleResolverBuilder{})
}
执行 Server 端和 Client 端的程序,输出如下的结果:
--- calling helloworld.Greeter/SayHello to "passthrough:///localhost:50051"
this is examples/name_resolving (from localhost:50051)
this is examples/name_resolving (from localhost:50051)
this is examples/name_resolving (from localhost:50051)--- calling helloworld.Greeter/SayHello to "example:///resolver.example.grpc.io"
this is examples/name_resolving (from localhost:50051)
this is examples/name_resolving (from localhost:50051)
this is examples/name_resolving (from localhost:50051)
-
参考链接:gRPC 教程
-
参考链接:gRPC 官网
-
参考书籍:《gRPC与云原生应用开发:以Go和Java为例》([斯里兰卡] 卡山 • 因德拉西里 丹尼什 • 库鲁普 著)
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
