package reconnect import ( "context" "sync" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // EtcdClientConfig etcd客户端配置 type EtcdClientConfig struct { // Endpoints etcd服务地址列表 Endpoints []string // DialTimeout 连接超时时间 DialTimeout time.Duration // Username 用户名(可选) Username string // Password 密码(可选) Password string // ReconnectConfig 重连配置 ReconnectConfig Config } // EtcdClient 带重连功能的etcd客户端 type EtcdClient struct { config EtcdClientConfig client *clientv3.Client manager *ConnectionManager mu sync.RWMutex } // etcdConnector etcd连接器 type etcdConnector struct { client *EtcdClient } func (e *etcdConnector) Connect(ctx context.Context) error { e.client.mu.Lock() defer e.client.mu.Unlock() cfg := clientv3.Config{ Endpoints: e.client.config.Endpoints, DialTimeout: e.client.config.DialTimeout, Username: e.client.config.Username, Password: e.client.config.Password, } if cfg.DialTimeout == 0 { cfg.DialTimeout = 5 * time.Second } cli, err := clientv3.New(cfg) if err != nil { return err } // 验证连接 ctx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) defer cancel() _, err = cli.Status(ctx, cfg.Endpoints[0]) if err != nil { cli.Close() return err } e.client.client = cli return nil } func (e *etcdConnector) Close() error { e.client.mu.Lock() defer e.client.mu.Unlock() if e.client.client != nil { err := e.client.client.Close() e.client.client = nil return err } return nil } // etcdHealthChecker etcd健康检查器 type etcdHealthChecker struct { client *EtcdClient } func (e *etcdHealthChecker) HealthCheck(ctx context.Context) error { e.client.mu.RLock() cli := e.client.client endpoints := e.client.config.Endpoints e.client.mu.RUnlock() if cli == nil || len(endpoints) == 0 { return context.Canceled } _, err := cli.Status(ctx, endpoints[0]) return err } // NewEtcdClient 创建带重连功能的etcd客户端 func NewEtcdClient(cfg EtcdClientConfig) (*EtcdClient, error) { client := &EtcdClient{ config: cfg, } connector := &etcdConnector{client: client} checker := &etcdHealthChecker{client: client} client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) // 首次连接 ctx := context.Background() if err := client.manager.ConnectWithRetry(ctx); err != nil { return nil, err } return client, nil } // GetClient 获取etcd客户端 func (c *EtcdClient) GetClient() *clientv3.Client { c.mu.RLock() defer c.mu.RUnlock() return c.client } // State 获取连接状态 func (c *EtcdClient) State() ConnectionState { return c.manager.State() } // Close 关闭客户端 func (c *EtcdClient) Close() error { return c.manager.Close() } // TriggerReconnect 手动触发重连 func (c *EtcdClient) TriggerReconnect() { c.manager.TriggerReconnect() }