package reconnect import ( "context" "fmt" "log" "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // EtcdRegistryConfig etcd 服务注册配置 type EtcdRegistryConfig struct { // Endpoints etcd 服务地址列表 Endpoints []string // DialTimeout 连接超时时间 DialTimeout time.Duration // ReconnectConfig 重连配置 ReconnectConfig Config } // EtcdRegistry etcd 服务注册(带自动重连) type EtcdRegistry struct { config EtcdRegistryConfig etcdClient *EtcdClient cli *clientv3.Client key string val string serviceName string ttl int64 leaseID clientv3.LeaseID keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse mu sync.RWMutex ctx context.Context cancel context.CancelFunc } // NewEtcdRegistry 创建带重连功能的 etcd 服务注册器 func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) { ctx, cancel := context.WithCancel(context.Background()) // 创建带重连的 etcd 客户端 etcdCfg := EtcdClientConfig{ Endpoints: cfg.Endpoints, DialTimeout: cfg.DialTimeout, ReconnectConfig: cfg.ReconnectConfig, } etcdClient, err := NewEtcdClient(etcdCfg) if err != nil { cancel() return nil, err } return &EtcdRegistry{ config: cfg, etcdClient: etcdClient, cli: etcdClient.GetClient(), ctx: ctx, cancel: cancel, }, nil } // Register 注册服务 func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error { r.mu.Lock() r.serviceName = serviceName r.ttl = ttl r.key = fmt.Sprintf("/%s/%s", serviceName, addr) r.val = addr r.mu.Unlock() return r.registerWithKV(ttl) } func (r *EtcdRegistry) registerWithKV(ttl int64) error { r.mu.RLock() cli := r.cli key := r.key val := r.val r.mu.RUnlock() // 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端 if cli == nil { if r.etcdClient != nil { // 检查连接状态 state := r.etcdClient.State() if state != StateConnected { return fmt.Errorf("etcd client is not connected, current state: %s", state) } r.mu.Lock() r.cli = r.etcdClient.GetClient() cli = r.cli r.mu.Unlock() } if cli == nil { return fmt.Errorf("etcd client is nil") } } else { // 即使 cli 不为 nil,也要检查连接状态,因为可能连接已断开但 cli 引用还在 if r.etcdClient != nil { state := r.etcdClient.State() if state != StateConnected { // 连接已断开,尝试更新 cli 引用 r.mu.Lock() r.cli = r.etcdClient.GetClient() cli = r.cli r.mu.Unlock() if cli == nil { return fmt.Errorf("etcd client is not connected, current state: %s", state) } } } } // 使用带超时的 context 来避免长时间阻塞 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := cli.Grant(ctx, ttl) if err != nil { // 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用 if r.etcdClient != nil { r.mu.Lock() r.cli = r.etcdClient.GetClient() cli = r.cli r.mu.Unlock() if cli != nil { // 使用新的 client 重试 ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) defer cancel2() resp, err = cli.Grant(ctx2, ttl) } } if err != nil { return fmt.Errorf("failed to grant lease: %w", err) } } r.mu.Lock() r.leaseID = resp.ID r.mu.Unlock() ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second) defer cancel3() _, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID)) if err != nil { return fmt.Errorf("failed to put key: %w", err) } ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second) defer cancel4() keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID) if err != nil { return fmt.Errorf("failed to start keep-alive: %w", err) } r.mu.Lock() r.keepAliveCh = keepAliveCh r.mu.Unlock() go r.watcher(ttl) log.Printf("[EtcdRegistry] 服务注册成功: %s", key) return nil } // watcher 监听续约 func (r *EtcdRegistry) watcher(ttl int64) { for { select { case <-r.ctx.Done(): log.Println("[EtcdRegistry] context done, watcher exiting.") return case ka, ok := <-r.keepAliveCh: if !ok { log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...") // 等待连接恢复,而不是立即重试 maxWaitTime := 60 * time.Second checkInterval := 500 * time.Millisecond waited := time.Duration(0) for waited < maxWaitTime { select { case <-r.ctx.Done(): return default: } // 检查连接状态 if r.etcdClient != nil { state := r.etcdClient.State() if state == StateConnected { // 连接已恢复,更新 cli 引用 r.mu.Lock() r.cli = r.etcdClient.GetClient() r.mu.Unlock() // 等待一小段时间确保连接稳定 time.Sleep(100 * time.Millisecond) // 尝试重新注册 if err := r.registerWithKV(ttl); err == nil { log.Println("[EtcdRegistry] 服务重新注册成功") return // 退出当前 goroutine,让新的 watcher 启动 } else { log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err) } } } time.Sleep(checkInterval) waited += checkInterval } // 如果等待超时,使用指数退避重试 log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...") delay := 2 * time.Second maxDelay := 30 * time.Second for { select { case <-r.ctx.Done(): return default: } // 再次检查连接状态 if r.etcdClient != nil { state := r.etcdClient.State() if state == StateConnected { r.mu.Lock() r.cli = r.etcdClient.GetClient() r.mu.Unlock() } } if err := r.registerWithKV(ttl); err != nil { log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay) time.Sleep(delay) delay *= 2 if delay > maxDelay { delay = maxDelay } } else { log.Println("[EtcdRegistry] 服务重新注册成功") return // 退出当前 goroutine,让新的 watcher 启动 } } } // 续约成功 _ = ka } } } // UnRegister 注销服务 func (r *EtcdRegistry) UnRegister() { r.cancel() // 停止 watcher r.mu.RLock() cli := r.cli leaseID := r.leaseID key := r.key r.mu.RUnlock() if cli != nil { cli.Revoke(context.Background(), leaseID) cli.Delete(context.Background(), key) log.Printf("[EtcdRegistry] 服务注销成功: %s", key) } if r.etcdClient != nil { r.etcdClient.Close() } } // State 获取连接状态 func (r *EtcdRegistry) State() ConnectionState { if r.etcdClient != nil { return r.etcdClient.State() } return StateDisconnected }