首先,客户端发起RPC调用请求
// SayHello say hello.
func (d *dao) SayHello(c context.Context, req *api.HelloReq) (resp *empty.Empty, err error) {
if resp, err = d.demoClient.SayHello(c, req); err != nil {
err = errors.Wrapf(err, "%v", req.Name)
}
return
}
RPC服务将会在注册方法中寻找对应方法然后调用
type demoClient struct {
cc *grpc.ClientConn
}
func (c *demoClient) SayHelloURL(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) {
out := new(HelloResp)
err := c.cc.Invoke(ctx, "/demo.service.v1.Demo/SayHelloURL", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
接下来是Kratos的RPC服务初始化流程
首先Warden将各种Handler注册进RPC服务中
var _Demo_serviceDesc = grpc.ServiceDesc{
ServiceName: "demo.service.v1.Demo",
HandlerType: (*DemoServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Demo_Ping_Handler,
},
{
MethodName: "SayHello",
Handler: _Demo_SayHello_Handler,
},
{
MethodName: "SayHelloURL",
Handler: _Demo_SayHelloURL_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "api.proto",
}
让我们看一下服务端拦截器grpc.UnaryServerInterceptor的声明
// UnaryServerInfo consists of various information about a unary RPC on
// server side. All per-rpc information may be mutated by the interceptor.
type UnaryServerInfo struct {
// Server is the service implementation the user provides. This is read-only.
Server interface{}
// FullMethod is the full RPC method string, i.e., /package.service/method.
FullMethod string
}
// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
// execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
// the status message of the RPC.
type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
// of the service method implementation. It is the responsibility of the interceptor to invoke handler
// to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
看起来很简单包括:
为了更形象的说明拦截器的执行过程,请看基于proto生成的以下代码:
func _Demo_SayHelloURL_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DemoServer).SayHelloURL(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/demo.service.v1.Demo/SayHelloURL",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DemoServer).SayHelloURL(ctx, req.(*HelloReq))
}
return interceptor(ctx, in, info, handler)
}
这个_Demo_SayHelloURL_Handler方法是关键,该方法会被包装为grpc.ServiceDesc结构,被注册到gRPC内部,具体可在生成的pb.go代码内查找s.RegisterService(&_Demo_serviceDesc, srv)。
那么_Demo_SayHelloURL_Handler内的interceptor是如何注入到gRPC server内,则看下面这段代码
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return func(o *options) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
}
}
如此完整的服务端拦截器逻辑就串联完成。 > 回到之前声明服务,GRPC将会注册这个声明的服务
var _Demo_serviceDesc = grpc.ServiceDesc{
ServiceName: "demo.service.v1.Demo",
HandlerType: (*DemoServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Demo_Ping_Handler,
},
{
MethodName: "SayHello",
Handler: _Demo_SayHello_Handler,
},
{
MethodName: "SayHelloURL",
Handler: _Demo_SayHelloURL_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "api.proto",
}
func RegisterDemoServer(s *grpc.Server, srv DemoServer) {
s.RegisterService(&_Demo_serviceDesc, srv)
}
再上一层级,初始化项目业务中
server, err := grpc.New(serviceService)
if err != nil {
cleanup5()
cleanup4()
cleanup3()
cleanup2()
cleanup()
return nil, nil, err
}
将service注入到New方法中,使得Handler去引用service
// New new a grpc server.
func New(svc pb.DemoServer) (ws *warden.Server, err error) {
var (
cfg warden.ServerConfig
ct paladin.TOML
)
if err = paladin.Get("grpc.toml").Unmarshal(&ct); err != nil {
return
}
if err = ct.Get("Server").UnmarshalTOML(&cfg); err != nil {
return
}
ws = warden.NewServer(&cfg)
pb.RegisterDemoServer(ws.Server(), svc)
ws, err = ws.Start()
return
}
好好学习,天天向上