grpc Go Client 源码分析

Go语言精选

共 12994字,需浏览 26分钟

 ·

2021-05-15 13:32

grpc client 代码非常简洁,分三步

1,获取连接

2,初始化客户端

3,发送请求

conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())defer conn.Close()
c := pb.NewGreeterClient(conn)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})

首先看下发送请求

type GreeterClient interface {  // Sends a greeting  SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {  out := new(HelloReply)  err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)  if err != nil {    return nil, err  }  return out, nil}

调用了cc的Invoke方法

type greeterClient struct {  cc grpc.ClientConnInterface}

ClientConnInterface的定义如下

type ClientConnInterface interface {  // Invoke performs a unary RPC and returns after the response is received  // into reply.  Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error  // NewStream begins a streaming RPC.  NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)}

接口包含了两个方法Invoke和NewStream定义在clientconn.go中


接着看下client初始化的代码

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {  return &greeterClient{cc}}

仅仅把connet interface传给了client


最后看下获取连接的实现

func Dial(target string, opts ...DialOption) (*ClientConn, error) {  return DialContext(context.Background(), target, opts...)}

clientconn.go的Dial方法返回了ClientConn指针

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {  cc := &ClientConn{    target:            target,    csMgr:             &connectivityStateManager{},    conns:             make(map[*addrConn]struct{}),    dopts:             defaultDialOptions(),    blockingpicker:    newPickerWrapper(),    czData:            new(channelzData),    firstResolveEvent: grpcsync.NewEvent(),  }  ....cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)  ....  cc.balancerBuildOpts = balancer.BuildOptions{    DialCreds:        credsClone,    CredsBundle:      cc.dopts.copts.CredsBundle,    Dialer:           cc.dopts.copts.Dialer,    CustomUserAgent:  cc.dopts.copts.UserAgent,    ChannelzParentID: cc.channelzID,    Target:           cc.parsedTarget,  }  .....rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) }

其中ClientConn的结构体定义如下

type ClientConn struct {  ctx    context.Context  cancel context.CancelFunc
target string parsedTarget resolver.Target authority string dopts dialOptions csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
mu sync.RWMutex resolverWrapper *ccResolverWrapper sc *ServiceConfig conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string balancerWrapper *ccBalancerWrapper retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number czData *channelzData
lceMu sync.Mutex // protects lastConnectionError lastConnectionError error}

可以看出Dial仅仅做了connection的初始化


call.go里定义了Invoke方法

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {  // allow interceptor to see all applicable call options, which means those  // configured as defaults from dial option as well as per-call options  opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...)}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {  cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)  if err != nil {    return err  }  if err := cs.SendMsg(req); err != nil {    return err  }  return cs.RecvMsg(reply)}

里面分了三步,建立连接,发送请求,获取结果

newClientStream 函数定义在stream.go文件里

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {  rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})    callHdr := &transport.CallHdr{    Host:           cc.authority,    Method:         method,    ContentSubtype: c.contentSubtype,  }  cs := &clientStream{    callHdr:      callHdr,    ctx:          ctx,    methodConfig: &mc,    opts:         opts,    callInfo:     c,    cc:           cc,    desc:         desc,    codec:        c.codec,    cp:           cp,    comp:         comp,    cancel:       cancel,    beginTime:    beginTime,    firstAttempt: true,    onCommit:     onCommit,  }  if err := cs.newAttemptLocked(sh, trInfo); err != nil {    cs.finish(err)    return nil, err  }}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {  newAttempt := &csAttempt{    cs:           cs,    dc:           cs.cc.dopts.dc,    statsHandler: sh,    trInfo:       trInfo,  }  t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method) }

getTransport 定义在clientconn.go中

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {  t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{    Ctx:            ctx,    FullMethodName: method,  })  if err != nil {    return nil, nil, toRPCErr(err)  }  return t, done, nil}

pick函数定义在picker_warper.go

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {for{pickResult, err := p.Pick(info)if t, ok := acw.getAddrConn().getReadyTransport(); ok {}}}

在stream.go文件里定义了ClientStream的接口

type ClientStream interface {// Header returns the header metadata received from the server if there  // is any. It blocks if the metadata is not ready to read.  Header() (metadata.MD, error)  // Trailer returns the trailer metadata from the server, if there is any.  // It must only be called after stream.CloseAndRecv has returned, or  // stream.Recv has returned a non-nil error (including io.EOF).  Trailer() metadata.MD  // CloseSend closes the send direction of the stream. It closes the stream  // when non-nil error is met. It is also not safe to call CloseSend  // concurrently with SendMsg.  CloseSend() error  // Context returns the context for this stream.  //  // It should not be called until after Header or RecvMsg has returned. Once  // called, subsequent client-side retries are disabled.  Context() context.Context  // SendMsg is generally called by generated code. On error, SendMsg aborts  // the stream. If the error was generated by the client, the status is  // returned directly; otherwise, io.EOF is returned and the status of  // the stream may be discovered using RecvMsg.  //  // SendMsg blocks until:  //   - There is sufficient flow control to schedule m with the transport, or  //   - The stream is done, or  //   - The stream breaks.  //  // SendMsg does not wait until the message is received by the server. An  // untimely stream closure may result in lost messages. To ensure delivery,  // users should ensure the RPC completed successfully using RecvMsg.  //  // It is safe to have a goroutine calling SendMsg and another goroutine  // calling RecvMsg on the same stream at the same time, but it is not safe  // to call SendMsg on the same stream in different goroutines. It is also  // not safe to call CloseSend concurrently with SendMsg.  SendMsg(m interface{}) error  // RecvMsg blocks until it receives a message into m or the stream is  // done. It returns io.EOF when the stream completes successfully. On  // any other error, the stream is aborted and the error contains the RPC  // status.  //  // It is safe to have a goroutine calling SendMsg and another goroutine  // calling RecvMsg on the same stream at the same time, but it is not  // safe to call RecvMsg on the same stream in different goroutines.  RecvMsg(m interface{}) error}

clientstream实现了上述接口

type clientStream struct {  callHdr  *transport.CallHdr  opts     []CallOption  callInfo *callInfo  cc       *ClientConn  desc     *StreamDesc
codec baseCodec cp Compressor comp encoding.Compressor
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream beginTime time.Time
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog *binarylog.MethodLogger // Binary logger, can be nil. // serverHeaderBinlogged is a boolean for whether server header has been // logged. Server header will be logged when the first time one of those // happens: stream.Header(), stream.Recv(). // // It's only read and used by Recv() and Header(), so it doesn't need to be // synchronized. serverHeaderBinlogged bool
mu sync.Mutex firstAttempt bool // if true, transparent retry is valid numRetries int // exclusive of transparent retry attempt(s) numRetriesSincePushback int // retries since pushback; to reset backoff finished bool // TODO: replace with atomic cmpxchg or sync.Once? // attempt is the active client stream attempt. // The only place where it is written is the newAttemptLocked method and this method never writes nil. // So, attempt can be nil only inside newClientStream function when clientStream is first created. // One of the first things done after clientStream's creation, is to call newAttemptLocked which either // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, // then newClientStream calls finish on the clientStream and returns. So, finish method is the only // place where we need to check if the attempt is nil. attempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. committed bool // active attempt committed for retry? onCommit func() buffer []func(a *csAttempt) error // operations to replay on retry bufferSize int // current size of buffer}

实现了SendMsg和RecvMsg个方法

func (cs *clientStream) SendMsg(m interface{}) (err error) {hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)  op := func(a *csAttempt) error {    err := a.sendMsg(m, hdr, payload, data)    // nil out the message and uncomp when replaying; they are only needed for    // stats which is disabled for subsequent attempts.    m, data = nil, nil    return err  }}

func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {  data, err = encode(codec, m)  compData, err := compress(data, cp, comp)  hdr, payload = msgHeader(data, compData)  }

实现了数据的编码压缩

紧接着发送数据

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {  if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {  }  if a.statsHandler != nil {    a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))  }  }

最后是接受消息

func (cs *clientStream) RecvMsg(m interface{}) error {  err := cs.withRetry(func(a *csAttempt) error {    return a.recvMsg(m, recvInfo)  }, cs.commitAttemptLocked)}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{      Client:   true,      RecvTime: time.Now(),      Payload:  m,      // TODO truncate large payload.      Data:       payInfo.uncompressedBytes,      WireLength: payInfo.wireLength + headerLen,      Length:     len(payInfo.uncompressedBytes),    })err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)    }
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {  d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)  if err != nil {    return err  }  if err := c.Unmarshal(d, m); err != nil {    return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)  }  if payInfo != nil {    payInfo.uncompressedBytes = d  }  return nil}


推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

浏览 9
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报