Files
reconnect/etcd_registry.go
ray 2beefb2491 feat(etcd): 增强 etcd 服务注册功能
- 添加防抖机制,防止短时间内重复注册服务
- 引入 watcher 机制,支持动态监控服务续约状态
- 优化 watcher 的上下文管理,确保资源的正确释放
- 改进日志输出,提供更详细的注册和重试信息
2025-12-18 22:50:14 +08:00

339 lines
8.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package reconnect
import (
"context"
"fmt"
"log"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// EtcdRegistryConfig etcd 服务注册配置
type EtcdRegistryConfig struct {
// Endpoints etcd 服务地址列表
Endpoints []string
// DialTimeout 连接超时时间
DialTimeout time.Duration
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// EtcdRegistry etcd 服务注册(带自动重连)
type EtcdRegistry struct {
config EtcdRegistryConfig
etcdClient *EtcdClient
cli *clientv3.Client
key string
val string
serviceName string
ttl int64
leaseID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
watcherCtx context.Context
watcherCancel context.CancelFunc
watcherMu sync.Mutex
lastRegisterTime time.Time
registerMu sync.Mutex
}
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
ctx, cancel := context.WithCancel(context.Background())
// 创建带重连的 etcd 客户端
etcdCfg := EtcdClientConfig{
Endpoints: cfg.Endpoints,
DialTimeout: cfg.DialTimeout,
ReconnectConfig: cfg.ReconnectConfig,
}
etcdClient, err := NewEtcdClient(etcdCfg)
if err != nil {
cancel()
return nil, err
}
watcherCtx, watcherCancel := context.WithCancel(context.Background())
return &EtcdRegistry{
config: cfg,
etcdClient: etcdClient,
cli: etcdClient.GetClient(),
ctx: ctx,
cancel: cancel,
watcherCtx: watcherCtx,
watcherCancel: watcherCancel,
}, nil
}
// Register 注册服务
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
r.mu.Lock()
r.serviceName = serviceName
r.ttl = ttl
r.key = fmt.Sprintf("/%s/%s", serviceName, addr)
r.val = addr
r.mu.Unlock()
return r.registerWithKV(ttl)
}
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
// 防抖:防止短时间内重复注册
r.registerMu.Lock()
now := time.Now()
if !r.lastRegisterTime.IsZero() && now.Sub(r.lastRegisterTime) < 2*time.Second {
r.registerMu.Unlock()
return fmt.Errorf("register too frequently, please wait")
}
r.lastRegisterTime = now
r.registerMu.Unlock()
r.mu.RLock()
cli := r.cli
key := r.key
val := r.val
r.mu.RUnlock()
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
if cli == nil {
if r.etcdClient != nil {
// 检查连接状态
state := r.etcdClient.State()
if state != StateConnected {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
}
if cli == nil {
return fmt.Errorf("etcd client is nil")
}
} else {
// 即使 cli 不为 nil也要检查连接状态因为可能连接已断开但 cli 引用还在
if r.etcdClient != nil {
state := r.etcdClient.State()
if state != StateConnected {
// 连接已断开,尝试更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli == nil {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
}
}
}
// 使用带超时的 context 来避免长时间阻塞
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := cli.Grant(ctx, ttl)
if err != nil {
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
if r.etcdClient != nil {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli != nil {
// 使用新的 client 重试
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
resp, err = cli.Grant(ctx2, ttl)
}
}
if err != nil {
return fmt.Errorf("failed to grant lease: %w", err)
}
}
r.mu.Lock()
r.leaseID = resp.ID
r.mu.Unlock()
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel3()
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return fmt.Errorf("failed to put key: %w", err)
}
// KeepAlive 需要使用长期有效的 context不能使用带超时的 context
keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return fmt.Errorf("failed to start keep-alive: %w", err)
}
r.mu.Lock()
r.keepAliveCh = keepAliveCh
r.mu.Unlock()
// 停止旧的 watcher如果存在
r.watcherMu.Lock()
if r.watcherCancel != nil {
r.watcherCancel()
}
// 创建新的 watcher context
r.watcherCtx, r.watcherCancel = context.WithCancel(context.Background())
watcherCtx := r.watcherCtx
r.watcherMu.Unlock()
go r.watcher(ttl, watcherCtx)
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
return nil
}
// watcher 监听续约
func (r *EtcdRegistry) watcher(ttl int64, watcherCtx context.Context) {
for {
select {
case <-r.ctx.Done():
log.Println("[EtcdRegistry] context done, watcher exiting.")
return
case <-watcherCtx.Done():
log.Println("[EtcdRegistry] watcher context done, exiting.")
return
case ka, ok := <-r.keepAliveCh:
if !ok {
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
// 等待连接恢复,而不是立即重试
maxWaitTime := 60 * time.Second
checkInterval := 500 * time.Millisecond
waited := time.Duration(0)
for waited < maxWaitTime {
select {
case <-r.ctx.Done():
return
case <-watcherCtx.Done():
return
default:
}
// 检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
// 连接已恢复,更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
// 等待一段时间确保连接稳定,避免立即重新注册
time.Sleep(1 * time.Second)
// 尝试重新注册
if err := r.registerWithKV(ttl); err == nil {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
} else {
// 如果是防抖错误,继续等待
if err.Error() == "register too frequently, please wait" {
log.Println("[EtcdRegistry] 注册过于频繁,继续等待...")
time.Sleep(2 * time.Second)
} else {
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
}
}
}
}
time.Sleep(checkInterval)
waited += checkInterval
}
// 如果等待超时,使用指数退避重试
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
delay := 2 * time.Second
maxDelay := 30 * time.Second
for {
select {
case <-r.ctx.Done():
return
case <-watcherCtx.Done():
return
default:
}
// 再次检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
}
}
if err := r.registerWithKV(ttl); err != nil {
// 如果是防抖错误,等待更长时间
if err.Error() == "register too frequently, please wait" {
log.Println("[EtcdRegistry] 注册过于频繁,等待更长时间...")
time.Sleep(3 * time.Second)
continue
}
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
time.Sleep(delay)
delay *= 2
if delay > maxDelay {
delay = maxDelay
}
} else {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
}
}
}
// 续约成功
_ = ka
}
}
}
// UnRegister 注销服务
func (r *EtcdRegistry) UnRegister() {
r.cancel() // 停止主 context
// 停止 watcher
r.watcherMu.Lock()
if r.watcherCancel != nil {
r.watcherCancel()
}
r.watcherMu.Unlock()
r.mu.RLock()
cli := r.cli
leaseID := r.leaseID
key := r.key
r.mu.RUnlock()
if cli != nil {
cli.Revoke(context.Background(), leaseID)
cli.Delete(context.Background(), key)
log.Printf("[EtcdRegistry] 服务注销成功: %s", key)
}
if r.etcdClient != nil {
r.etcdClient.Close()
}
}
// State 获取连接状态
func (r *EtcdRegistry) State() ConnectionState {
if r.etcdClient != nil {
return r.etcdClient.State()
}
return StateDisconnected
}