From 915b5dd0bb61b7794bd9b24348560c7fdf50128c Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:33 +0800 Subject: [PATCH] Add EtcdClient implementation for managing etcd connections with methods for connecting, closing, pinging, and checking connection status. --- etcd.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 etcd.go diff --git a/etcd.go b/etcd.go new file mode 100644 index 0000000..29bccd1 --- /dev/null +++ b/etcd.go @@ -0,0 +1,108 @@ +package reconnect + +import ( + "context" + "fmt" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EtcdClient etcd客户端包装器 +type EtcdClient struct { + config clientv3.Config + client *clientv3.Client + mu sync.RWMutex +} + +// NewEtcdClient 创建新的etcd客户端 +func NewEtcdClient(config clientv3.Config) *EtcdClient { + return &EtcdClient{ + config: config, + } +} + +// Connect 建立etcd连接 +func (e *EtcdClient) Connect(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.client != nil { + _ = e.client.Close() + } + + client, err := clientv3.New(e.config) + if err != nil { + return fmt.Errorf("etcd new client failed: %w", err) + } + + // 测试连接 + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, err = client.Status(timeoutCtx, e.config.Endpoints[0]) + if err != nil { + _ = client.Close() + return fmt.Errorf("etcd status check failed: %w", err) + } + + e.client = client + return nil +} + +// Close 关闭etcd连接 +func (e *EtcdClient) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.client != nil { + err := e.client.Close() + e.client = nil + return err + } + return nil +} + +// Ping 检查etcd连接是否健康 +func (e *EtcdClient) Ping(ctx context.Context) error { + e.mu.RLock() + defer e.mu.RUnlock() + + if e.client == nil { + return fmt.Errorf("etcd client is nil") + } + + if len(e.config.Endpoints) == 0 { + return fmt.Errorf("etcd endpoints is empty") + } + + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, err := e.client.Status(timeoutCtx, e.config.Endpoints[0]) + return err +} + +// IsConnected 检查连接状态 +func (e *EtcdClient) IsConnected() bool { + e.mu.RLock() + defer e.mu.RUnlock() + + return e.client != nil +} + +// GetClient 获取底层的etcd客户端 +func (e *EtcdClient) GetClient() *clientv3.Client { + e.mu.RLock() + defer e.mu.RUnlock() + + return e.client +} + +// EtcdManager 创建etcd重连管理器 +func EtcdManager(config clientv3.Config, reconnectConfig *Config) *Manager { + client := NewEtcdClient(config) + return NewManager(client, reconnectConfig) +} +