grpc Go Client 源码分析
grpc client 代码非常简洁,分三步
1,获取连接
2,初始化客户端
3,发送请求
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
c := pb.NewGreeterClient(conn)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
首先看下发送请求
type GreeterClient interface {
// Sends a greeting
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
调用了cc的Invoke方法
type greeterClient struct {
cc grpc.ClientConnInterface
}
ClientConnInterface的定义如下
type ClientConnInterface interface {
// Invoke performs a unary RPC and returns after the response is received
// into reply.
Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
// NewStream begins a streaming RPC.
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
接口包含了两个方法Invoke和NewStream定义在clientconn.go中
接着看下client初始化的代码
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
仅仅把connet interface传给了client
最后看下获取连接的实现
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
clientconn.go的Dial方法返回了ClientConn指针
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
....
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
....
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
.....
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
}
其中ClientConn的结构体定义如下
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
可以看出Dial仅仅做了connection的初始化
call.go里定义了Invoke方法
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
里面分了三步,建立连接,发送请求,获取结果
newClientStream 函数定义在stream.go文件里
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
}
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
onCommit: onCommit,
}
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
cs.finish(err)
return nil, err
}
}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
newAttempt := &csAttempt{
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
}
getTransport 定义在clientconn.go中
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
if err != nil {
return nil, nil, toRPCErr(err)
}
return t, done, nil
}
pick函数定义在picker_warper.go
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
for{
pickResult, err := p.Pick(info)
if t, ok := acw.getAddrConn().getReadyTransport(); ok {}
}
}
在stream.go文件里定义了ClientStream的接口
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
}
clientstream实现了上述接口
type clientStream struct {
callHdr *transport.CallHdr
opts []CallOption
callInfo *callInfo
cc *ClientConn
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream
beginTime time.Time
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog *binarylog.MethodLogger // Binary logger, can be nil.
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
//
// It's only read and used by Recv() and Header(), so it doesn't need to be
// synchronized.
serverHeaderBinlogged bool
mu sync.Mutex
firstAttempt bool // if true, transparent retry is valid
numRetries int // exclusive of transparent retry attempt(s)
numRetriesSincePushback int // retries since pushback; to reset backoff
finished bool // TODO: replace with atomic cmpxchg or sync.Once?
// attempt is the active client stream attempt.
// The only place where it is written is the newAttemptLocked method and this method never writes nil.
// So, attempt can be nil only inside newClientStream function when clientStream is first created.
// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
}
实现了SendMsg和RecvMsg两个方法
func (cs *clientStream) SendMsg(m interface{}) (err error) {
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
}
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
data, err = encode(codec, m)
compData, err := compress(data, cp, comp)
hdr, payload = msgHeader(data, compData)
}
实现了数据的编码压缩
紧接着发送数据
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
}
}
最后是接受消息
func (cs *clientStream) RecvMsg(m interface{}) error {
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength + headerLen,
Length: len(payInfo.uncompressedBytes),
})
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
if err != nil {
return err
}
if err := c.Unmarshal(d, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
if payInfo != nil {
payInfo.uncompressedBytes = d
}
return nil
}
推荐阅读
评论