kratos-RPC服务原理

首先,客户端发起RPC调用请求

// SayHello say hello.
func (d *dao) SayHello(c context.Context, req *api.HelloReq) (resp *empty.Empty, err error) {
    if resp, err = d.demoClient.SayHello(c, req); err != nil {
        err = errors.Wrapf(err, "%v", req.Name)
    }
    return
}

RPC服务将会在注册方法中寻找对应方法然后调用

type demoClient struct {
	cc *grpc.ClientConn
}
func (c *demoClient) SayHelloURL(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) {
	out := new(HelloResp)
	err := c.cc.Invoke(ctx, "/demo.service.v1.Demo/SayHelloURL", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

接下来是Kratos的RPC服务初始化流程

首先Warden将各种Handler注册进RPC服务中

var _Demo_serviceDesc = grpc.ServiceDesc{
	ServiceName: "demo.service.v1.Demo",
	HandlerType: (*DemoServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "Ping",
			Handler:    _Demo_Ping_Handler,
		},
		{
			MethodName: "SayHello",
			Handler:    _Demo_SayHello_Handler,
		},
		{
			MethodName: "SayHelloURL",
			Handler:    _Demo_SayHelloURL_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "api.proto",
}

让我们看一下服务端拦截器grpc.UnaryServerInterceptor的声明

// UnaryServerInfo consists of various information about a unary RPC on
// server side. All per-rpc information may be mutated by the interceptor.
type UnaryServerInfo struct {
	// Server is the service implementation the user provides. This is read-only.
	Server interface{}
	// FullMethod is the full RPC method string, i.e., /package.service/method.
	FullMethod string
}

// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
// execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
// the status message of the RPC.
type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)

// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
// of the service method implementation. It is the responsibility of the interceptor to invoke handler
// to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

看起来很简单包括:

  • 一个UnaryServerInfo结构体用于Server和FullMethod字段传递,Server为gRPC server的对象实例,FullMethod为请求方法的全名
  • 一个UnaryHandler方法用于传递Handler,就是基于proto文件service内声明而生成的方法
  • 一个UnaryServerInterceptor用于拦截Handler方法,可在Handler执行前后插入拦截代码

为了更形象的说明拦截器的执行过程,请看基于proto生成的以下代码:

func _Demo_SayHelloURL_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(HelloReq)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(DemoServer).SayHelloURL(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/demo.service.v1.Demo/SayHelloURL",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(DemoServer).SayHelloURL(ctx, req.(*HelloReq))
	}
	return interceptor(ctx, in, info, handler)
}

这个_Demo_SayHelloURL_Handler方法是关键,该方法会被包装为grpc.ServiceDesc结构,被注册到gRPC内部,具体可在生成的pb.go代码内查找s.RegisterService(&_Demo_serviceDesc, srv)。

  • 当gRPC server收到一次请求时,首先根据请求方法从注册到server内的grpc.ServiceDesc找到该方法对应的Handler如:_Demo_SayHello_Handler并执行
  • _Demo_SayHelloURL_Handler执行过程请看上面具体代码,当interceptor不为nil时,会将SayHelloURL包装为grpc.UnaryHandler结构传递给interceptor 这样就完成了UnaryServerInterceptor的执行过程。

那么_Demo_SayHelloURL_Handler内的interceptor是如何注入到gRPC server内,则看下面这段代码

// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
	return func(o *options) {
		if o.unaryInt != nil {
			panic("The unary server interceptor was already set and may not be reset.")
		}
		o.unaryInt = i
	}
}

如此完整的服务端拦截器逻辑就串联完成。 > 回到之前声明服务,GRPC将会注册这个声明的服务

var _Demo_serviceDesc = grpc.ServiceDesc{
	ServiceName: "demo.service.v1.Demo",
	HandlerType: (*DemoServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "Ping",
			Handler:    _Demo_Ping_Handler,
		},
		{
			MethodName: "SayHello",
			Handler:    _Demo_SayHello_Handler,
		},
		{
			MethodName: "SayHelloURL",
			Handler:    _Demo_SayHelloURL_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "api.proto",
}
func RegisterDemoServer(s *grpc.Server, srv DemoServer) {
	s.RegisterService(&_Demo_serviceDesc, srv)
}

再上一层级,初始化项目业务中

server, err := grpc.New(serviceService)
	if err != nil {
		cleanup5()
		cleanup4()
		cleanup3()
		cleanup2()
		cleanup()
		return nil, nil, err
	}

将service注入到New方法中,使得Handler去引用service

// New new a grpc server.
func New(svc pb.DemoServer) (ws *warden.Server, err error) {
	var (
		cfg warden.ServerConfig
		ct paladin.TOML
	)
	if err = paladin.Get("grpc.toml").Unmarshal(&ct); err != nil {
		return
	}
	if err = ct.Get("Server").UnmarshalTOML(&cfg); err != nil {
		return
	}
	ws = warden.NewServer(&cfg)
	pb.RegisterDemoServer(ws.Server(), svc)
	ws, err = ws.Start()
	return
}

好好学习,天天向上