package reconnect import ( "context" "log" "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" ) // GRPCEtcdClientConfig 使用 etcd 服务发现的 gRPC 客户端配置 type GRPCEtcdClientConfig struct { // EtcdEndpoints etcd 服务地址列表 EtcdEndpoints []string // EtcdDialTimeout etcd 连接超时时间 EtcdDialTimeout time.Duration // ServiceName 服务名称(在 etcd 中注册的 key 前缀) ServiceName string // DialOptions 额外的 gRPC 拨号选项 DialOptions []grpc.DialOption // ReconnectConfig 重连配置 ReconnectConfig Config } // GRPCEtcdClient 带 etcd 服务发现和重连功能的 gRPC 客户端 type GRPCEtcdClient struct { config GRPCEtcdClientConfig etcdClient *clientv3.Client conn *grpc.ClientConn manager *ConnectionManager mu sync.RWMutex } // grpcEtcdConnector gRPC + etcd 连接器 type grpcEtcdConnector struct { client *GRPCEtcdClient } func (g *grpcEtcdConnector) Connect(ctx context.Context) error { g.client.mu.Lock() defer g.client.mu.Unlock() cfg := g.client.config // 创建 etcd 客户端 etcdTimeout := cfg.EtcdDialTimeout if etcdTimeout == 0 { etcdTimeout = 5 * time.Second } cli, err := clientv3.New(clientv3.Config{ Endpoints: cfg.EtcdEndpoints, DialTimeout: etcdTimeout, }) if err != nil { return err } // 注册 etcd resolver etcdResolver := &etcdResolverBuilder{cli: cli} resolver.Register(etcdResolver) target := "etcd:///" + cfg.ServiceName + "/" // 默认 DialOptions opts := []grpc.DialOption{ grpc.WithResolvers(etcdResolver), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), } // 追加用户自定义选项 opts = append(opts, cfg.DialOptions...) conn, err := grpc.NewClient(target, opts...) if err != nil { cli.Close() return err } // 等待连接就绪 connCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() conn.Connect() for { state := conn.GetState() if state == connectivity.Ready { break } if state == connectivity.TransientFailure || state == connectivity.Shutdown { conn.Close() cli.Close() return context.DeadlineExceeded } if !conn.WaitForStateChange(connCtx, state) { conn.Close() cli.Close() return connCtx.Err() } } g.client.etcdClient = cli g.client.conn = conn return nil } func (g *grpcEtcdConnector) Close() error { g.client.mu.Lock() defer g.client.mu.Unlock() if g.client.conn != nil { g.client.conn.Close() g.client.conn = nil } if g.client.etcdClient != nil { g.client.etcdClient.Close() g.client.etcdClient = nil } return nil } // grpcEtcdHealthChecker gRPC + etcd 健康检查器 type grpcEtcdHealthChecker struct { client *GRPCEtcdClient } func (g *grpcEtcdHealthChecker) HealthCheck(ctx context.Context) error { g.client.mu.RLock() conn := g.client.conn g.client.mu.RUnlock() if conn == nil { return context.Canceled } state := conn.GetState() if state == connectivity.Ready || state == connectivity.Idle { return nil } conn.Connect() if !conn.WaitForStateChange(ctx, state) { return ctx.Err() } newState := conn.GetState() if newState != connectivity.Ready && newState != connectivity.Idle { return context.DeadlineExceeded } return nil } // NewGRPCClientWithEtcd 创建带 etcd 服务发现和重连功能的 gRPC 客户端 func NewGRPCClientWithEtcd(cfg GRPCEtcdClientConfig) (*GRPCEtcdClient, error) { client := &GRPCEtcdClient{ config: cfg, } connector := &grpcEtcdConnector{client: client} checker := &grpcEtcdHealthChecker{client: client} client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) // 首次连接 ctx := context.Background() if err := client.manager.ConnectWithRetry(ctx); err != nil { return nil, err } return client, nil } // GetConn 获取 gRPC 连接 func (c *GRPCEtcdClient) GetConn() *grpc.ClientConn { c.mu.RLock() defer c.mu.RUnlock() return c.conn } // GetEtcdClient 获取 etcd 客户端 func (c *GRPCEtcdClient) GetEtcdClient() *clientv3.Client { c.mu.RLock() defer c.mu.RUnlock() return c.etcdClient } // State 获取连接状态 func (c *GRPCEtcdClient) State() ConnectionState { return c.manager.State() } // Close 关闭客户端 func (c *GRPCEtcdClient) Close() error { return c.manager.Close() } // TriggerReconnect 手动触发重连 func (c *GRPCEtcdClient) TriggerReconnect() { c.manager.TriggerReconnect() } // ================== etcd resolver 实现 ================== // etcdResolverBuilder 实现 resolver.Builder type etcdResolverBuilder struct { cli *clientv3.Client } func (b *etcdResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r := &etcdResolver{ cli: b.cli, cc: cc, target: target, } go r.watch(target.Endpoint()) return r, nil } func (b *etcdResolverBuilder) Scheme() string { return "etcd" } // etcdResolver 实现 resolver.Resolver type etcdResolver struct { cli *clientv3.Client cc resolver.ClientConn target resolver.Target } func (r *etcdResolver) watch(keyPrefix string) { // 首次获取服务列表 resp, err := r.cli.Get(context.Background(), "/"+keyPrefix, clientv3.WithPrefix()) if err != nil { log.Printf("[etcdResolver] 获取服务列表失败: %v", err) return } addrs := make([]resolver.Address, 0, len(resp.Kvs)) for _, kv := range resp.Kvs { addrs = append(addrs, resolver.Address{Addr: string(kv.Value)}) } r.cc.UpdateState(resolver.State{Addresses: addrs}) // 监听变化 watchCh := r.cli.Watch(context.Background(), "/"+keyPrefix, clientv3.WithPrefix()) for wresp := range watchCh { for _, ev := range wresp.Events { switch ev.Type { case clientv3.EventTypePut: addrs = append(addrs, resolver.Address{Addr: string(ev.Kv.Value)}) case clientv3.EventTypeDelete: newAddrs := make([]resolver.Address, 0) for _, addr := range addrs { if addr.Addr != string(ev.Kv.Value) { newAddrs = append(newAddrs, addr) } } addrs = newAddrs } r.cc.UpdateState(resolver.State{Addresses: addrs}) } } } func (r *etcdResolver) ResolveNow(opts resolver.ResolveNowOptions) {} func (r *etcdResolver) Close() {}