本文主要介绍了 gRPC 的 NameResolver 及其简单使用,同时从源码层面对其核心原理进行了分析。

基于[email protected]对原文做了修正。

原文作者:意琦行

原文链接:gRPC(Go)教程(十一)—NameResolver 实战及原理分析 | 指月小筑|意琦行的个人博客

1. 概述

具体可以参考官方文档-Name Resolver

gRPC 中的默认 name-system 是 DNS,同时在客户端以插件形式提供了自定义 name-system 的机制。

gRPC NameResolver 会根据 name-system 选择对应的解析器,用以解析用户提供的服务器名,最后返回具体地址列表(IP+端口号)。

例如:默认使用 DNS name-system,我们只需要提供服务器的域名,NameResolver 就会使用 DNS 解析出域名对应的 IP 列表并返回。

目前大部分 gRPC Name Resolver 都采用 ETCD 来实现,通过引入 ETCD Client sdk,和 ETCD Server 之间通过 gRPC 双向流的方式进行数据交互。服务端定时上报服务名称、实例数据至 ETCD 实现服务注册,客户端进行监听指定服务名称对应实例变化来实现服务发现。

2. Demo

首先用一个 Demo 来介绍一个 gRPC 的 NameResolver 如何使用。

2.1 Server

服务端代码比较简单,没有什么需要注意的地方。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
    "context"
    "flag"
    "fmt"
    pb "golang_study/grpc_study/features/proto/echo"
    "google.golang.org/grpc"
    "log"
    "net"
)

var port = flag.Int("port", 50051, "port number")

type ecServer struct {
    pb.UnimplementedEchoServer
    addr string
}

func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
    return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}

func main() {
    flag.Parse()
    address := fmt.Sprintf("127.0.0.1:%v", *port)

    listen, err := net.Listen("tcp", address)

    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    grpcServer := grpc.NewServer()

    pb.RegisterEchoServer(grpcServer, &ecServer{addr: address})
    log.Printf("serving on %s\n", address)
    if err = grpcServer.Serve(listen); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

2.2 Client

原作者grpc版本为v1.35.0,以17x为自定义scheme,于当前[email protected]无法正常运行,因为以数字开头的scheme无法被url.Parse正常解析。

错误地址见net.url库源代码net\url\url.go 514行

Scheme must be [a-zA-Z][a-zA-Z0-9+-.]*

请求url格式为[scheme]://[authority]/endpoint

例如:custom:///resolver.custom.test.com

客户端需要注意的是,这里建立连接时使用我们自定义的 Scheme,而不是默认的 dns,所以需要有和这个自定义的 Scheme 对应的 Resolver 来解析才行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    pb "golang_study/grpc_study/features/proto/echo"
    "google.golang.org/grpc"

    "google.golang.org/grpc/resolver"
)

const (
    myScheme      = "custom"
    myServiceName = "resolver.custom.test.com"

    backendAddr = "localhost:50051"
)

func callUnaryEcho(c pb.EchoClient, message string) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }
    fmt.Println(r.Message)
}

func makeRPCs(cc *grpc.ClientConn, n int) {
    hwc := pb.NewEchoClient(cc)
    for i := 0; i < n; i++ {
        callUnaryEcho(hwc, "this is examples/name_resolving")
    }
}

func main() {
    passthroughConn, err := grpc.Dial(
        // passthrough 也是gRPC内置的一个scheme
        fmt.Sprintf("passthrough:///%s", backendAddr), // Dial to "passthrough:///localhost:50051"
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer passthroughConn.Close()

    fmt.Printf("--- calling helloworld.Greeter/SayHello to \"passthrough:///%s\"\n", backendAddr)
    makeRPCs(passthroughConn, 10)

    fmt.Println()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    defer cancel()
    exampleConn, err := grpc.DialContext(
        ctx,
        fmt.Sprintf("%s:///%s", myScheme, myServiceName), // Dial to "custom:///resolver.custom.test.com"
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer exampleConn.Close()

    fmt.Printf("--- calling helloworld.Greeter/SayHello to \"%s:///%s\"\n", myScheme, myServiceName)
    makeRPCs(exampleConn, 10)
}

具体 Resolver 实现如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Following is an example name resolver. It includes a
// ResolverBuilder(https://godoc.org/google.golang.org/grpc/resolver#Builder)
// and a Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
//
// A ResolverBuilder is registered for a scheme (in this example, "example" is
// the scheme). When a ClientConn is created for this scheme, the
// ResolverBuilder will be picked to build a Resolver. Note that a new Resolver
// is built for each ClientConn. The Resolver will watch the updates for the
// target, and send updates to the ClientConn.

// exampleResolverBuilder is a
// ResolverBuilder(https://godoc.org/google.golang.org/grpc/resolver#Builder).
type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    r := &exampleResolver{
        target: target,
        cc:     cc,
        addrsStore: map[string][]string{
            myServiceName: {backendAddr},
        },
    }
    r.ResolveNow(resolver.ResolveNowOptions{})
    return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return myScheme }

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {
    // 直接从map中取出对于的addrList
    addrStrs := r.addrsStore[r.target.Endpoint]
    addrs := make([]resolver.Address, len(addrStrs))
    for i, s := range addrStrs {
        addrs[i] = resolver.Address{Addr: s}
    }
    r.cc.UpdateState(resolver.State{Addresses: addrs})
}

func (*exampleResolver) Close() {}

func init() {
    // Register the example ResolverBuilder. This is usually done in a package's
    // init() function.
    resolver.Register(&exampleResolverBuilder{})
}

resolver 包括 ResolverBuilderResolver两个部分。

分别需要实现BuilderResolver接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}


type Resolver interface {
    ResolveNow(ResolveNowOptions)
    Close()
}

Resolver 是整个功能最核心的代码,用于将服务名解析成对应实例。

Builder 则采用 Builder 模式在包初始化时创建并注册构造自定义 Resolver 实例。当客户端通过 Dial 方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其 Build() 方法构建自定义 Resolver。

2.3 Test

分别启动服务端和客户端进行测试:

1
2
grpc_study/features/name_resolving/server$ go run main.go 
2022/04/29 15:54:14 serving on 127.0.0.1:50051
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
grpc_study/features/name_resolving/client$ go run main.go 
--- calling helloworld.Greeter/SayHello to "passthrough:///localhost:50051"
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)

--- calling helloworld.Greeter/SayHello to "custom:///resolver.custom.test.com"
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)
this is examples/name_resolving (from 127.0.0.1:50051)

一切正常,说明我们的自定义 Resolver 是可以运行的,那么接下来从源码层面来分析一下 gRPC 中 Resolver 具体是如何工作的。

3. 源码分析

以下分析基于 grpc-go v1.45.0 版本

首先客户端调用grpc.Dial()方法建立连接,会进入DialContext()方法。

1
2
3
4
5
// clientconn.go 103 行
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    return DialContext(context.Background(), target, opts...)
}

DialContext() 内容比较多,这里只关注 Resolver 相关的代码:

这一段是通过解析 target 获取 scheme,然后根据 scheme 找到对应的 resolverBuilder

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// clientconn.go 1622行
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
    channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)

    var rb resolver.Builder

    // 首先解析target
    parsedTarget, err := parseTarget(cc.target)
    if err != nil {
        channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
    } else {
        channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
        // 然后根据scheme从全局Resolver列表中找到对应的resolverBuilder
        rb = cc.getResolver(parsedTarget.Scheme)
        if rb != nil {
            cc.parsedTarget = parsedTarget
            return rb, nil
        }
    }

    // 如果指定的scheme找不到对应的resolverBuilder那就用defaultScheme
    // 默认协议为 `passthrough`,它会从用户解析的 target 中直接读取 endpoint 地址
    // We are here because the user's dial target did not contain a scheme or
    // specified an unregistered scheme. We should fallback to the default
    // scheme, except when a custom dialer is specified in which case, we should
    // always use passthrough scheme.
    defScheme := resolver.GetDefaultScheme()
    channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
    canonicalTarget := defScheme + ":///" + cc.target

    parsedTarget, err = parseTarget(canonicalTarget)
    if err != nil {
        channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
        return nil, err
    }
    channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
    rb = cc.getResolver(parsedTarget.Scheme)
    if rb == nil {
        return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
    }
    cc.parsedTarget = parsedTarget
    return rb, nil
}

具体获取 resolver 的逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// clientconn.go 1601行
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
    for _, rb := range cc.dopts.resolvers {
        if scheme == rb.Scheme() {
            return rb
        }
    }
    return resolver.Get(scheme)
}

// resolver.go 55行
// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
    if b, ok := m[scheme]; ok {
        return b
    }
    return nil
}

可以看到最终是去 m 这个 map 中获取的 resolverBuilder。

那么这个 map m 中的 resolverBuilder 是从哪儿来的呢?

这个 resolver 就是客户端代码中的 init 方法注册进去的,全局 resolverBuild 都存放一个 map 中,key 为 scheme,value 为对应的 resolverBuilder。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func init() {
    // Register the example ResolverBuilder. This is usually done in a package's
    // init() function.
    resolver.Register(&exampleResolverBuilder{})
}

// resolver.go 48行
func Register(b Builder) {
    m[b.Scheme()] = b
}

接下来就通过 resolverBuilder 构建一个 Resolver 实例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// clientconn.go 296行
// Build the resolver.
    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    if err != nil {
        return nil, fmt.Errorf("failed to build resolver: %v", err)
    }
    cc.mu.Lock()
    cc.resolverWrapper = rWrapper
    cc.mu.Unlock()      

// resolver_conn_wapper.go 48行
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    ccr := &ccResolverWrapper{
        cc:   cc,
        done: grpcsync.NewEvent(),
    }

    var credsClone credentials.TransportCredentials
    if creds := cc.dopts.copts.TransportCredentials; creds != nil {
        credsClone = creds.Clone()
    }
    rbo := resolver.BuildOptions{
        DisableServiceConfig: cc.dopts.disableServiceConfig,
        DialCreds:            credsClone,
        CredsBundle:          cc.dopts.copts.CredsBundle,
        Dialer:               cc.dopts.copts.Dialer,
    }

    var err error
    // We need to hold the lock here while we assign to the ccr.resolver field
    // to guard against a data race caused by the following code path,
    // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
    // accessing ccr.resolver which is being assigned here.
    ccr.resolverMu.Lock()
    defer ccr.resolverMu.Unlock()  
    // 调用resolverBuilder的Build方法构建Resolver
    ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
    if err != nil {
        return nil, err
    }
    return ccr, nil
}

接来下我们看一下 gRPC 内置的 ResolverBuilder 是 Build 方法是怎么实现的,就拿 DNSResolverBuilder 为例,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// dns_resolver.go 118行
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    // 首先依旧是解析target,获取格式化后的 host + port
    host, port, err := parseTarget(target.Endpoint, defaultPort)
    if err != nil {
        return nil, err
    }


    // 对host进行IP格式化处理 
    // 如果是IP地址则直接调用cc.UpdateState更新连接信息后返回 不走后续的dns解析逻辑了
    // IP address.
    if ipAddr, ok := formatIP(host); ok {
        addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
        cc.UpdateState(resolver.State{Addresses: addr})
        return deadResolver{}, nil
    }

    // 如果是域名则需要进行dns解析
    // DNS address (non-IP).
    ctx, cancel := context.WithCancel(context.Background())
    d := &dnsResolver{
        host:                 host,
        port:                 port,
        ctx:                  ctx,
        cancel:               cancel,
        cc:                   cc,
        rn:                   make(chan struct{}, 1),
        disableServiceConfig: opts.DisableServiceConfig,
    }

    // 根据 Authority 判定使用默认Resolver还是自定义AuthorityResolver
    if target.Authority == "" {
        d.resolver = defaultResolver
    } else {
        d.resolver, err = customAuthorityResolver(target.Authority)
        if err != nil {
            return nil, err
        }
    }

    d.wg.Add(1)
    // 单独开一个 goroutine watcher 给定域名的 dns 信息变化
    go d.watcher()
    return d, nil
}

需要继续跟进 Resolver.watcher() 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (d *dnsResolver) watcher() {
    defer d.wg.Done()
    backoffIndex := 1
    for {  
        // 进行dns解析 如果成功就调用UpdateState方法更新连接信息
        state, err := d.lookup()
        if err != nil {
            // Report error to the underlying grpc.ClientConn.
            d.cc.ReportError(err)
        } else {
            err = d.cc.UpdateState(*state)
        }

        // 用一个 timer 来限制dns更新频率
        var timer *time.Timer
        if err == nil {
            // Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
            // to prevent constantly re-resolving.
            backoffIndex = 1
            timer = newTimerDNSResRate(minDNSResRate)
            select {
            case <-d.ctx.Done():
                timer.Stop()
                return
            case <-d.rn:
            }
        } else {
            // Poll on an error found in DNS Resolver or an error received from ClientConn.
            timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
            backoffIndex++
        }
        select {
        case <-d.ctx.Done():
            timer.Stop()
            return
        case <-timer.C:
        }
    }
}

这里通过err的值来判断如何进行时间限制。d.rn提供ResolveNow() 调用强制进行解析的功能,可避免之后解析的timer限制。

这里还需要继续跟进d.cc.UpdateState方法,看下具体是怎么更新的,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// resolver_conn_wapper.go 94行
// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
    ccr.incomingMu.Lock()
    defer ccr.incomingMu.Unlock()
    if ccr.done.HasFired() {
        return nil
    }
    channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
    if channelz.IsOn() {
        ccr.addChannelzTraceEvent(s)
    }
    ccr.curState = s
    if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
        return balancer.ErrBadResolverState
    }
    return nil
}

4. 小结

  1. 客户端启动时,注册自定义的 resolver 。

    • 一般在 init() 方法,构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中(其实是一个全局 map,key 为协议名,value 为构造的 resolveBuilder)。
  2. 客户端启动时通过自定义 Dail() 方法构造 grpc.ClientConn 单例

    • grpc.DialContext() 方法内部解析 URI,分析协议类型,并从 resolveBuilder 表中查找协议对应的 resolverBuilder。
    • 找到指定的 resolveBuilder 后,调用 resolveBuilder 的 Build() 方法,构建自定义 resolver,同时开启协程,通过此 resolver 更新被调服务实例列表。
    • Dial() 方法接收主调服务名和被调服务名,并根据自定义的协议名,基于这两个参数构造服务的 URI
    • Dial() 方法内部使用构造的 URI,调用 grpc.DialContext() 方法对指定服务进行拨号
  3. grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。

到这里在回头看 Demo 中的自定义 Resolver 应该就没什么问题了。由于只是个 Demo 所以真的非常简单。直接在 Build 中通过 map 存储addr,然后 ResolveNow 时直接从 map 中取出来更新服务实例列表,连 watcher 都省略了。

5. 参考

https://github.com/grpc/grpc-go

https://github.com/grpc/grpc/blob/master/doc/naming.md

https://blog.csdn.net/ra681t58cjxsgckj31/article/details/104079070