Files
reconnect/etcd.go
2025-12-18 15:11:02 +08:00

145 lines
2.8 KiB
Go

package reconnect
import (
"context"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// EtcdClientConfig etcd客户端配置
type EtcdClientConfig struct {
// Endpoints etcd服务地址列表
Endpoints []string
// DialTimeout 连接超时时间
DialTimeout time.Duration
// Username 用户名(可选)
Username string
// Password 密码(可选)
Password string
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// EtcdClient 带重连功能的etcd客户端
type EtcdClient struct {
config EtcdClientConfig
client *clientv3.Client
manager *ConnectionManager
mu sync.RWMutex
}
// etcdConnector etcd连接器
type etcdConnector struct {
client *EtcdClient
}
func (e *etcdConnector) Connect(ctx context.Context) error {
e.client.mu.Lock()
defer e.client.mu.Unlock()
cfg := clientv3.Config{
Endpoints: e.client.config.Endpoints,
DialTimeout: e.client.config.DialTimeout,
Username: e.client.config.Username,
Password: e.client.config.Password,
}
if cfg.DialTimeout == 0 {
cfg.DialTimeout = 5 * time.Second
}
cli, err := clientv3.New(cfg)
if err != nil {
return err
}
// 验证连接
ctx, cancel := context.WithTimeout(ctx, cfg.DialTimeout)
defer cancel()
_, err = cli.Status(ctx, cfg.Endpoints[0])
if err != nil {
cli.Close()
return err
}
e.client.client = cli
return nil
}
func (e *etcdConnector) Close() error {
e.client.mu.Lock()
defer e.client.mu.Unlock()
if e.client.client != nil {
err := e.client.client.Close()
e.client.client = nil
return err
}
return nil
}
// etcdHealthChecker etcd健康检查器
type etcdHealthChecker struct {
client *EtcdClient
}
func (e *etcdHealthChecker) HealthCheck(ctx context.Context) error {
e.client.mu.RLock()
cli := e.client.client
endpoints := e.client.config.Endpoints
e.client.mu.RUnlock()
if cli == nil || len(endpoints) == 0 {
return context.Canceled
}
_, err := cli.Status(ctx, endpoints[0])
return err
}
// NewEtcdClient 创建带重连功能的etcd客户端
func NewEtcdClient(cfg EtcdClientConfig) (*EtcdClient, error) {
client := &EtcdClient{
config: cfg,
}
connector := &etcdConnector{client: client}
checker := &etcdHealthChecker{client: client}
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
// 首次连接
ctx := context.Background()
if err := client.manager.ConnectWithRetry(ctx); err != nil {
return nil, err
}
return client, nil
}
// GetClient 获取etcd客户端
func (c *EtcdClient) GetClient() *clientv3.Client {
c.mu.RLock()
defer c.mu.RUnlock()
return c.client
}
// State 获取连接状态
func (c *EtcdClient) State() ConnectionState {
return c.manager.State()
}
// Close 关闭客户端
func (c *EtcdClient) Close() error {
return c.manager.Close()
}
// TriggerReconnect 手动触发重连
func (c *EtcdClient) TriggerReconnect() {
c.manager.TriggerReconnect()
}