diff --git a/etcd_registry.go b/etcd_registry.go index 6332907..c7b3507 100644 --- a/etcd_registry.go +++ b/etcd_registry.go @@ -81,9 +81,14 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error { val := r.val r.mu.RUnlock() + // 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端 if cli == nil { - // 尝试重新获取客户端 if r.etcdClient != nil { + // 检查连接状态 + state := r.etcdClient.State() + if state != StateConnected { + return fmt.Errorf("etcd client is not connected, current state: %s", state) + } r.mu.Lock() r.cli = r.etcdClient.GetClient() cli = r.cli @@ -92,25 +97,63 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error { if cli == nil { return fmt.Errorf("etcd client is nil") } + } else { + // 即使 cli 不为 nil,也要检查连接状态,因为可能连接已断开但 cli 引用还在 + if r.etcdClient != nil { + state := r.etcdClient.State() + if state != StateConnected { + // 连接已断开,尝试更新 cli 引用 + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + cli = r.cli + r.mu.Unlock() + if cli == nil { + return fmt.Errorf("etcd client is not connected, current state: %s", state) + } + } + } } - resp, err := cli.Grant(context.Background(), ttl) + // 使用带超时的 context 来避免长时间阻塞 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := cli.Grant(ctx, ttl) if err != nil { - return err + // 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用 + if r.etcdClient != nil { + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + cli = r.cli + r.mu.Unlock() + if cli != nil { + // 使用新的 client 重试 + ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + resp, err = cli.Grant(ctx2, ttl) + } + } + if err != nil { + return fmt.Errorf("failed to grant lease: %w", err) + } } r.mu.Lock() r.leaseID = resp.ID r.mu.Unlock() - _, err = cli.Put(context.Background(), key, val, clientv3.WithLease(resp.ID)) + ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel3() + _, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID)) if err != nil { - return err + return fmt.Errorf("failed to put key: %w", err) } - keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID) + ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel4() + keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID) if err != nil { - return err + return fmt.Errorf("failed to start keep-alive: %w", err) } r.mu.Lock() @@ -131,17 +174,49 @@ func (r *EtcdRegistry) watcher(ttl int64) { return case ka, ok := <-r.keepAliveCh: if !ok { - log.Println("[EtcdRegistry] keep-alive channel closed, attempting to re-register...") + log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...") - // 更新 cli 引用 - if r.etcdClient != nil { - r.mu.Lock() - r.cli = r.etcdClient.GetClient() - r.mu.Unlock() + // 等待连接恢复,而不是立即重试 + maxWaitTime := 60 * time.Second + checkInterval := 500 * time.Millisecond + waited := time.Duration(0) + + for waited < maxWaitTime { + select { + case <-r.ctx.Done(): + return + default: + } + + // 检查连接状态 + if r.etcdClient != nil { + state := r.etcdClient.State() + if state == StateConnected { + // 连接已恢复,更新 cli 引用 + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + r.mu.Unlock() + + // 等待一小段时间确保连接稳定 + time.Sleep(100 * time.Millisecond) + + // 尝试重新注册 + if err := r.registerWithKV(ttl); err == nil { + log.Println("[EtcdRegistry] 服务重新注册成功") + return // 退出当前 goroutine,让新的 watcher 启动 + } else { + log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err) + } + } + } + + time.Sleep(checkInterval) + waited += checkInterval } - // 使用指数退避重试 - delay := time.Second + // 如果等待超时,使用指数退避重试 + log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...") + delay := 2 * time.Second maxDelay := 30 * time.Second for { @@ -151,6 +226,16 @@ func (r *EtcdRegistry) watcher(ttl int64) { default: } + // 再次检查连接状态 + if r.etcdClient != nil { + state := r.etcdClient.State() + if state == StateConnected { + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + r.mu.Unlock() + } + } + if err := r.registerWithKV(ttl); err != nil { log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay) time.Sleep(delay) @@ -159,6 +244,7 @@ func (r *EtcdRegistry) watcher(ttl int64) { delay = maxDelay } } else { + log.Println("[EtcdRegistry] 服务重新注册成功") return // 退出当前 goroutine,让新的 watcher 启动 } } @@ -197,4 +283,3 @@ func (r *EtcdRegistry) State() ConnectionState { } return StateDisconnected } - diff --git a/manager.go b/manager.go index 7d6913d..ba190e8 100644 --- a/manager.go +++ b/manager.go @@ -223,4 +223,3 @@ func (cm *ConnectionManager) Close() error { func (cm *ConnectionManager) TriggerReconnect() { cm.reconnect() } -