package reconnect import ( "context" "fmt" "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // EtcdClient etcd客户端包装器 type EtcdClient struct { config clientv3.Config client *clientv3.Client mu sync.RWMutex } // NewEtcdClient 创建新的etcd客户端 func NewEtcdClient(config clientv3.Config) *EtcdClient { return &EtcdClient{ config: config, } } // Connect 建立etcd连接 func (e *EtcdClient) Connect(ctx context.Context) error { e.mu.Lock() defer e.mu.Unlock() if e.client != nil { _ = e.client.Close() } client, err := clientv3.New(e.config) if err != nil { return fmt.Errorf("etcd new client failed: %w", err) } // 测试连接 timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() _, err = client.Status(timeoutCtx, e.config.Endpoints[0]) if err != nil { _ = client.Close() return fmt.Errorf("etcd status check failed: %w", err) } e.client = client return nil } // Close 关闭etcd连接 func (e *EtcdClient) Close() error { e.mu.Lock() defer e.mu.Unlock() if e.client != nil { err := e.client.Close() e.client = nil return err } return nil } // Ping 检查etcd连接是否健康 func (e *EtcdClient) Ping(ctx context.Context) error { e.mu.RLock() defer e.mu.RUnlock() if e.client == nil { return fmt.Errorf("etcd client is nil") } if len(e.config.Endpoints) == 0 { return fmt.Errorf("etcd endpoints is empty") } timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() _, err := e.client.Status(timeoutCtx, e.config.Endpoints[0]) return err } // IsConnected 检查连接状态 func (e *EtcdClient) IsConnected() bool { e.mu.RLock() defer e.mu.RUnlock() return e.client != nil } // GetClient 获取底层的etcd客户端 func (e *EtcdClient) GetClient() *clientv3.Client { e.mu.RLock() defer e.mu.RUnlock() return e.client } // EtcdManager 创建etcd重连管理器 func EtcdManager(config clientv3.Config, reconnectConfig *Config) *Manager { client := NewEtcdClient(config) return NewManager(client, reconnectConfig) }