对grpc的连接、数据传输和重连相关做下知识整理,以下

基础知识

grpc是基于http2的,http2作为应用层协议相比http1能很好的支持多路复用,在一个连接上做更多的数据传输任务并且多个数据传输互不干扰

1. 帧

http2的数据传输以帧为单位。帧的传输必须在流中进行

2. 流

在建立连接后,帧的传输是在流中进行的,一个流的建立、传输和关闭,类似于一次http请求可以看做是一个完整的数据传输处理过程

3. 简单理解

假设现有一个go程序,此程序中有20个goroutine,10个类型为chan []bytechannel,每两个goroutine操作一个channelchannel中发送和接收数据。那么对应http2,则go程序可以看成是一个http2连接,10个channel可以看成是这个连接中10个流,每个流都有一个客户端和服务端既goroutinegoroutine发送的[]byte类型的数据可以看成http2,帧里面存储的是需要传输的数据和协议。

连接和重连

连接示例:

func dial(addr string) (*grpc.ClientConn, error) {
	dialOpts := []grpc.DialOption{
		grpc.WithInsecure(),
		grpc.WithBlock(),
	}
	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
	rpcConn, err := grpc.DialContext(ctx, addr, dialOpts...)
	return rpcConn, err
}

和grpc服务建立连接很简单,只需要调用函数Dail或者DialContext即可,Dial实际也就是调用了DialContext。

google.golang.org/grpc/clientconn.go


// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
	return DialContext(context.Background(), target, opts...)
}

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    ...
    
    //配置负载均衡,默认是passthrough解析器
    if cc.dopts.resolverBuilder == nil {
		// Only try to parse target when resolver builder is not already set.
		cc.parsedTarget = parseTarget(cc.target)
		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
		if cc.dopts.resolverBuilder == nil {
			// 如果解析器是空的则使用默认的负载均衡解析器
			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
			cc.parsedTarget = resolver.Target{
				Scheme:   resolver.GetDefaultScheme(),
				Endpoint: target,
			}
			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
		}
	} else {
		cc.parsedTarget = resolver.Target{Endpoint: target}
	}
    ...

}

查看源码DialContext,grpc的连接默认是异步处理,既调用此方法实际并没有做连接操作,只是配置了下给定的和默认的配置然后返回了个ClientConn的指针而已。如果在调用此方法时传入了WithBlocks方法,则会在程序末尾一直等到连接成功后才会结束方法。连接操作是新在此方法的配置好负载均衡解析器之后调用了rb.Build方法并且传入ccResolverWrapper对象后开启连接,如下代码

google.golang.org/grpc/resolver_conn_wrapper.go


//根据用户的选择生成对应的负载均衡解析器并执行Build方法
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
	rb := cc.dopts.resolverBuilder
	if rb == nil {
		return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
	}

	ccr := &ccResolverWrapper{
		cc:     cc,
		addrCh: make(chan []resolver.Address, 1),
		scCh:   make(chan string, 1),
	}

	var err error
	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
	if err != nil {
		return nil, err
	}
	return ccr, nil
}

默认解析器passthrough.Build方法会生成一个对象并开启start方法,此方法会调用ccResolverWrapper实现的UpdateState方法初始化负载均衡。

google.golang.org/grpc/balancer_conn_wrappers.go

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
	ccb := &ccBalancerWrapper{
		cc:               cc,
		stateChangeQueue: newSCStateUpdateBuffer(),
		ccUpdateCh:       make(chan *balancer.ClientConnState, 1),
		done:             make(chan struct{}),
		subConns:         make(map[*acBalancerWrapper]struct{}),
	}
	go ccb.watcher()
	ccb.balancer = b.Build(ccb, bopts)
	return ccb
}

初始化负载均衡后会新建goroutine调用watcher方法,在此方法里会调用ccBalancerWrapper.UpdateBalancerState方法修改连接状态以及调用acBalancerWrapper.acBalancerWrapper方法和grpc服务建立连接

google.golang.org/grpc/pickfirst.go
func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
	if err != nil {
		if grpclog.V(2) {
			grpclog.Infof("pickfirstBalancer: HandleResolvedAddrs called with error %v", err)
		}
		return
	}
	if b.sc == nil {
		b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
		if err != nil {
			//TODO(yuxuanli): why not change the cc state to Idle?
			if grpclog.V(2) {
				grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
			}
			return
		}
		b.cc.UpdateBalancerState(connectivity.Idle, &picker{sc: b.sc})
		b.sc.Connect()
	} else {
		b.sc.UpdateAddresses(addrs)
		b.sc.Connect()
	}
}

建立连接前再次修改状态,后开启goroutine运行resetTransport方法

google.golang.org/grpc/clientconn.go


// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {
	ac.mu.Lock()
	if ac.state == connectivity.Shutdown {
		ac.mu.Unlock()
		return errConnClosing
	}
	if ac.state != connectivity.Idle {
		ac.mu.Unlock()
		return nil
	}
	// Update connectivity state within the lock to prevent subsequent or
	// concurrent calls from resetting the transport more than once.
	ac.updateConnectivityState(connectivity.Connecting)
	ac.mu.Unlock()

	// Start a goroutine connecting to the server asynchronously.
	go ac.resetTransport()
	return nil
}

resetTransport方法是个死循环,会在程序刚启动以及运行时服务断开连接需要重连时执行循环体内容调用tryAllAddrs方法,连接成功后会阻塞在循环底部,

google.golang.org/grpc/clientconn.go


func (ac *addrConn) resetTransport() {
	for i := 0; ; i++ {
		if i > 0 {
			ac.cc.resolveNow(resolver.ResolveNowOption{})
		}

		ac.mu.Lock()
		if ac.state == connectivity.Shutdown {
			ac.mu.Unlock()
			return
		}

		addrs := ac.addrs
        ···
		connectDeadline := time.Now().Add(dialDuration)

		ac.updateConnectivityState(connectivity.Connecting)
		ac.transport = nil
		ac.mu.Unlock()

		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
		if err != nil {
			// After exhausting all addresses, the addrConn enters
			// TRANSIENT_FAILURE.
			ac.mu.Lock()
			if ac.state == connectivity.Shutdown {
				ac.mu.Unlock()
				return
			}
			ac.updateConnectivityState(connectivity.TransientFailure)

		}

		<-reconnect.Done()
		hcancel()
        ···
	}
}

tryAllAddrs调用addrConn.createTransport方法完成连接

google.golang.org/grpc/clientconn.go


func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
	prefaceReceived := make(chan struct{})
	onCloseCalled := make(chan struct{})
	reconnect := grpcsync.NewEvent()

	target := transport.TargetInfo{
		Addr:      addr.Addr,
		Metadata:  addr.Metadata,
		Authority: ac.cc.authority,
	}

	once := sync.Once{}
	onGoAway := func(r transport.GoAwayReason) {
		ac.mu.Lock()
		ac.adjustParams(r)
		once.Do(func() {
			if ac.state == connectivity.Ready {
				// Prevent this SubConn from being used for new RPCs by setting its
				// state to Connecting.
				//
				// TODO: this should be Idle when grpc-go properly supports it.
				ac.updateConnectivityState(connectivity.Connecting)
			}
		})
		ac.mu.Unlock()
		reconnect.Fire()
	}

	onClose := func() {
		ac.mu.Lock()
		once.Do(func() {
			if ac.state == connectivity.Ready {
				// Prevent this SubConn from being used for new RPCs by setting its
				// state to Connecting.
				//
				// TODO: this should be Idle when grpc-go properly supports it.
				ac.updateConnectivityState(connectivity.Connecting)
			}
		})
		ac.mu.Unlock()
		close(onCloseCalled)
		reconnect.Fire()
	}

	onPrefaceReceipt := func() {
		close(prefaceReceived)
	}

	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
	defer cancel()
	if channelz.IsOn() {
		copts.ChannelzParentID = ac.channelzID
	}

	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
	if err != nil {
		// newTr is either nil, or closed.
		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
		return nil, nil, err
	}

	select {
	case <-time.After(connectDeadline.Sub(time.Now())):
		// We didn't get the preface in time.
		newTr.Close()
		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
		return nil, nil, errors.New("timed out waiting for server handshake")
	case <-prefaceReceived:
		// We got the preface - huzzah! things are good.
	case <-onCloseCalled:
		// The transport has already closed - noop.
		return nil, nil, errors.New("connection closed")
		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
	}
	return newTr, reconnect, nil
}

如果连接建立成功则程序正常执行,连接失败则报错误信息,能看到这里会注册两个事件方法,一个是onGoAway当服务端下发`GOAWAY帧时执行此方法(服务端会不再接受新流,且完成之前创建的新流)关于GOAWAY帧的介绍,一个是onClose当连接关闭时执行此方法,两个方法都会修改连接的状态,并且调用grpcsync.Fire方法close掉内部的通知channel c,此channel只是用于通知resetTransport方法:此链接需要重新连接。

数据传输

使用protoc命令可以把proto文件生成类似的代码,下方为一元请求的代码

func (c *T1ServiceClient) GetFirst(ctx context.Context, in *T1Request, opts ...grpc.CallOption) (*Response, error) {
	out := new(Response)
	err := c.cc.Invoke(ctx, "/t1.t1Service/GetFirst", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

其中invoke方法调用newClientStream创建一个流,并且在发送数据后阻塞等待接收数据响应。

资料

http2笔记之流和多路复用

关于GOAWAY帧的介绍