145 lines
2.8 KiB
Go
145 lines
2.8 KiB
Go
package reconnect
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
// EtcdClientConfig etcd客户端配置
|
|
type EtcdClientConfig struct {
|
|
// Endpoints etcd服务地址列表
|
|
Endpoints []string
|
|
// DialTimeout 连接超时时间
|
|
DialTimeout time.Duration
|
|
// Username 用户名(可选)
|
|
Username string
|
|
// Password 密码(可选)
|
|
Password string
|
|
// ReconnectConfig 重连配置
|
|
ReconnectConfig Config
|
|
}
|
|
|
|
// EtcdClient 带重连功能的etcd客户端
|
|
type EtcdClient struct {
|
|
config EtcdClientConfig
|
|
client *clientv3.Client
|
|
manager *ConnectionManager
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// etcdConnector etcd连接器
|
|
type etcdConnector struct {
|
|
client *EtcdClient
|
|
}
|
|
|
|
func (e *etcdConnector) Connect(ctx context.Context) error {
|
|
e.client.mu.Lock()
|
|
defer e.client.mu.Unlock()
|
|
|
|
cfg := clientv3.Config{
|
|
Endpoints: e.client.config.Endpoints,
|
|
DialTimeout: e.client.config.DialTimeout,
|
|
Username: e.client.config.Username,
|
|
Password: e.client.config.Password,
|
|
}
|
|
|
|
if cfg.DialTimeout == 0 {
|
|
cfg.DialTimeout = 5 * time.Second
|
|
}
|
|
|
|
cli, err := clientv3.New(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 验证连接
|
|
ctx, cancel := context.WithTimeout(ctx, cfg.DialTimeout)
|
|
defer cancel()
|
|
|
|
_, err = cli.Status(ctx, cfg.Endpoints[0])
|
|
if err != nil {
|
|
cli.Close()
|
|
return err
|
|
}
|
|
|
|
e.client.client = cli
|
|
return nil
|
|
}
|
|
|
|
func (e *etcdConnector) Close() error {
|
|
e.client.mu.Lock()
|
|
defer e.client.mu.Unlock()
|
|
|
|
if e.client.client != nil {
|
|
err := e.client.client.Close()
|
|
e.client.client = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// etcdHealthChecker etcd健康检查器
|
|
type etcdHealthChecker struct {
|
|
client *EtcdClient
|
|
}
|
|
|
|
func (e *etcdHealthChecker) HealthCheck(ctx context.Context) error {
|
|
e.client.mu.RLock()
|
|
cli := e.client.client
|
|
endpoints := e.client.config.Endpoints
|
|
e.client.mu.RUnlock()
|
|
|
|
if cli == nil || len(endpoints) == 0 {
|
|
return context.Canceled
|
|
}
|
|
|
|
_, err := cli.Status(ctx, endpoints[0])
|
|
return err
|
|
}
|
|
|
|
// NewEtcdClient 创建带重连功能的etcd客户端
|
|
func NewEtcdClient(cfg EtcdClientConfig) (*EtcdClient, error) {
|
|
client := &EtcdClient{
|
|
config: cfg,
|
|
}
|
|
|
|
connector := &etcdConnector{client: client}
|
|
checker := &etcdHealthChecker{client: client}
|
|
|
|
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
|
|
|
// 首次连接
|
|
ctx := context.Background()
|
|
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// GetClient 获取etcd客户端
|
|
func (c *EtcdClient) GetClient() *clientv3.Client {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.client
|
|
}
|
|
|
|
// State 获取连接状态
|
|
func (c *EtcdClient) State() ConnectionState {
|
|
return c.manager.State()
|
|
}
|
|
|
|
// Close 关闭客户端
|
|
func (c *EtcdClient) Close() error {
|
|
return c.manager.Close()
|
|
}
|
|
|
|
// TriggerReconnect 手动触发重连
|
|
func (c *EtcdClient) TriggerReconnect() {
|
|
c.manager.TriggerReconnect()
|
|
}
|
|
|