Files
reconnect/etcd_registry.go
ray c0dd21b0f0 feat(etcd): 增强 etcd 客户端连接管理和重试机制
- 添加连接状态检查,确保在使用 etcd 客户端前验证其连接状态
- 实现超时控制,避免长时间阻塞在 etcd 操作中
- 优化重试逻辑,增加连接恢复后的重试机制,确保服务注册的可靠性
- 改进日志输出,提供更清晰的状态信息和错误处理
2025-12-18 22:47:10 +08:00

286 lines
6.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package reconnect
import (
"context"
"fmt"
"log"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// EtcdRegistryConfig etcd 服务注册配置
type EtcdRegistryConfig struct {
// Endpoints etcd 服务地址列表
Endpoints []string
// DialTimeout 连接超时时间
DialTimeout time.Duration
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// EtcdRegistry etcd 服务注册(带自动重连)
type EtcdRegistry struct {
config EtcdRegistryConfig
etcdClient *EtcdClient
cli *clientv3.Client
key string
val string
serviceName string
ttl int64
leaseID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
ctx, cancel := context.WithCancel(context.Background())
// 创建带重连的 etcd 客户端
etcdCfg := EtcdClientConfig{
Endpoints: cfg.Endpoints,
DialTimeout: cfg.DialTimeout,
ReconnectConfig: cfg.ReconnectConfig,
}
etcdClient, err := NewEtcdClient(etcdCfg)
if err != nil {
cancel()
return nil, err
}
return &EtcdRegistry{
config: cfg,
etcdClient: etcdClient,
cli: etcdClient.GetClient(),
ctx: ctx,
cancel: cancel,
}, nil
}
// Register 注册服务
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
r.mu.Lock()
r.serviceName = serviceName
r.ttl = ttl
r.key = fmt.Sprintf("/%s/%s", serviceName, addr)
r.val = addr
r.mu.Unlock()
return r.registerWithKV(ttl)
}
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
r.mu.RLock()
cli := r.cli
key := r.key
val := r.val
r.mu.RUnlock()
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
if cli == nil {
if r.etcdClient != nil {
// 检查连接状态
state := r.etcdClient.State()
if state != StateConnected {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
}
if cli == nil {
return fmt.Errorf("etcd client is nil")
}
} else {
// 即使 cli 不为 nil也要检查连接状态因为可能连接已断开但 cli 引用还在
if r.etcdClient != nil {
state := r.etcdClient.State()
if state != StateConnected {
// 连接已断开,尝试更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli == nil {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
}
}
}
// 使用带超时的 context 来避免长时间阻塞
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := cli.Grant(ctx, ttl)
if err != nil {
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
if r.etcdClient != nil {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli != nil {
// 使用新的 client 重试
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
resp, err = cli.Grant(ctx2, ttl)
}
}
if err != nil {
return fmt.Errorf("failed to grant lease: %w", err)
}
}
r.mu.Lock()
r.leaseID = resp.ID
r.mu.Unlock()
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel3()
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return fmt.Errorf("failed to put key: %w", err)
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()
keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID)
if err != nil {
return fmt.Errorf("failed to start keep-alive: %w", err)
}
r.mu.Lock()
r.keepAliveCh = keepAliveCh
r.mu.Unlock()
go r.watcher(ttl)
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
return nil
}
// watcher 监听续约
func (r *EtcdRegistry) watcher(ttl int64) {
for {
select {
case <-r.ctx.Done():
log.Println("[EtcdRegistry] context done, watcher exiting.")
return
case ka, ok := <-r.keepAliveCh:
if !ok {
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
// 等待连接恢复,而不是立即重试
maxWaitTime := 60 * time.Second
checkInterval := 500 * time.Millisecond
waited := time.Duration(0)
for waited < maxWaitTime {
select {
case <-r.ctx.Done():
return
default:
}
// 检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
// 连接已恢复,更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
// 等待一小段时间确保连接稳定
time.Sleep(100 * time.Millisecond)
// 尝试重新注册
if err := r.registerWithKV(ttl); err == nil {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
} else {
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
}
}
}
time.Sleep(checkInterval)
waited += checkInterval
}
// 如果等待超时,使用指数退避重试
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
delay := 2 * time.Second
maxDelay := 30 * time.Second
for {
select {
case <-r.ctx.Done():
return
default:
}
// 再次检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
}
}
if err := r.registerWithKV(ttl); err != nil {
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
time.Sleep(delay)
delay *= 2
if delay > maxDelay {
delay = maxDelay
}
} else {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
}
}
}
// 续约成功
_ = ka
}
}
}
// UnRegister 注销服务
func (r *EtcdRegistry) UnRegister() {
r.cancel() // 停止 watcher
r.mu.RLock()
cli := r.cli
leaseID := r.leaseID
key := r.key
r.mu.RUnlock()
if cli != nil {
cli.Revoke(context.Background(), leaseID)
cli.Delete(context.Background(), key)
log.Printf("[EtcdRegistry] 服务注销成功: %s", key)
}
if r.etcdClient != nil {
r.etcdClient.Close()
}
}
// State 获取连接状态
func (r *EtcdRegistry) State() ConnectionState {
if r.etcdClient != nil {
return r.etcdClient.State()
}
return StateDisconnected
}