diff --git a/etcd_registry.go b/etcd_registry.go index c7b3507..39c7ced 100644 --- a/etcd_registry.go +++ b/etcd_registry.go @@ -22,18 +22,23 @@ type EtcdRegistryConfig struct { // EtcdRegistry etcd 服务注册(带自动重连) type EtcdRegistry struct { - config EtcdRegistryConfig - etcdClient *EtcdClient - cli *clientv3.Client - key string - val string - serviceName string - ttl int64 - leaseID clientv3.LeaseID - keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc + config EtcdRegistryConfig + etcdClient *EtcdClient + cli *clientv3.Client + key string + val string + serviceName string + ttl int64 + leaseID clientv3.LeaseID + keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + watcherCtx context.Context + watcherCancel context.CancelFunc + watcherMu sync.Mutex + lastRegisterTime time.Time + registerMu sync.Mutex } // NewEtcdRegistry 创建带重连功能的 etcd 服务注册器 @@ -53,12 +58,15 @@ func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) { return nil, err } + watcherCtx, watcherCancel := context.WithCancel(context.Background()) return &EtcdRegistry{ - config: cfg, - etcdClient: etcdClient, - cli: etcdClient.GetClient(), - ctx: ctx, - cancel: cancel, + config: cfg, + etcdClient: etcdClient, + cli: etcdClient.GetClient(), + ctx: ctx, + cancel: cancel, + watcherCtx: watcherCtx, + watcherCancel: watcherCancel, }, nil } @@ -75,6 +83,16 @@ func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error { } func (r *EtcdRegistry) registerWithKV(ttl int64) error { + // 防抖:防止短时间内重复注册 + r.registerMu.Lock() + now := time.Now() + if !r.lastRegisterTime.IsZero() && now.Sub(r.lastRegisterTime) < 2*time.Second { + r.registerMu.Unlock() + return fmt.Errorf("register too frequently, please wait") + } + r.lastRegisterTime = now + r.registerMu.Unlock() + r.mu.RLock() cli := r.cli key := r.key @@ -149,9 +167,8 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error { return fmt.Errorf("failed to put key: %w", err) } - ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel4() - keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID) + // KeepAlive 需要使用长期有效的 context,不能使用带超时的 context + keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID) if err != nil { return fmt.Errorf("failed to start keep-alive: %w", err) } @@ -160,18 +177,31 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error { r.keepAliveCh = keepAliveCh r.mu.Unlock() - go r.watcher(ttl) + // 停止旧的 watcher(如果存在) + r.watcherMu.Lock() + if r.watcherCancel != nil { + r.watcherCancel() + } + // 创建新的 watcher context + r.watcherCtx, r.watcherCancel = context.WithCancel(context.Background()) + watcherCtx := r.watcherCtx + r.watcherMu.Unlock() + + go r.watcher(ttl, watcherCtx) log.Printf("[EtcdRegistry] 服务注册成功: %s", key) return nil } // watcher 监听续约 -func (r *EtcdRegistry) watcher(ttl int64) { +func (r *EtcdRegistry) watcher(ttl int64, watcherCtx context.Context) { for { select { case <-r.ctx.Done(): log.Println("[EtcdRegistry] context done, watcher exiting.") return + case <-watcherCtx.Done(): + log.Println("[EtcdRegistry] watcher context done, exiting.") + return case ka, ok := <-r.keepAliveCh: if !ok { log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...") @@ -185,6 +215,8 @@ func (r *EtcdRegistry) watcher(ttl int64) { select { case <-r.ctx.Done(): return + case <-watcherCtx.Done(): + return default: } @@ -197,15 +229,21 @@ func (r *EtcdRegistry) watcher(ttl int64) { r.cli = r.etcdClient.GetClient() r.mu.Unlock() - // 等待一小段时间确保连接稳定 - time.Sleep(100 * time.Millisecond) + // 等待一段时间确保连接稳定,避免立即重新注册 + time.Sleep(1 * time.Second) // 尝试重新注册 if err := r.registerWithKV(ttl); err == nil { log.Println("[EtcdRegistry] 服务重新注册成功") return // 退出当前 goroutine,让新的 watcher 启动 } else { - log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err) + // 如果是防抖错误,继续等待 + if err.Error() == "register too frequently, please wait" { + log.Println("[EtcdRegistry] 注册过于频繁,继续等待...") + time.Sleep(2 * time.Second) + } else { + log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err) + } } } } @@ -223,6 +261,8 @@ func (r *EtcdRegistry) watcher(ttl int64) { select { case <-r.ctx.Done(): return + case <-watcherCtx.Done(): + return default: } @@ -237,6 +277,12 @@ func (r *EtcdRegistry) watcher(ttl int64) { } if err := r.registerWithKV(ttl); err != nil { + // 如果是防抖错误,等待更长时间 + if err.Error() == "register too frequently, please wait" { + log.Println("[EtcdRegistry] 注册过于频繁,等待更长时间...") + time.Sleep(3 * time.Second) + continue + } log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay) time.Sleep(delay) delay *= 2 @@ -257,7 +303,14 @@ func (r *EtcdRegistry) watcher(ttl int64) { // UnRegister 注销服务 func (r *EtcdRegistry) UnRegister() { - r.cancel() // 停止 watcher + r.cancel() // 停止主 context + + // 停止 watcher + r.watcherMu.Lock() + if r.watcherCancel != nil { + r.watcherCancel() + } + r.watcherMu.Unlock() r.mu.RLock() cli := r.cli