2 Commits
v0.0.4 ... main

Author SHA1 Message Date
ray
2beefb2491 feat(etcd): 增强 etcd 服务注册功能
- 添加防抖机制,防止短时间内重复注册服务
- 引入 watcher 机制,支持动态监控服务续约状态
- 优化 watcher 的上下文管理,确保资源的正确释放
- 改进日志输出,提供更详细的注册和重试信息
2025-12-18 22:50:14 +08:00
ray
c0dd21b0f0 feat(etcd): 增强 etcd 客户端连接管理和重试机制
- 添加连接状态检查,确保在使用 etcd 客户端前验证其连接状态
- 实现超时控制,避免长时间阻塞在 etcd 操作中
- 优化重试逻辑,增加连接恢复后的重试机制,确保服务注册的可靠性
- 改进日志输出,提供更清晰的状态信息和错误处理
2025-12-18 22:47:10 +08:00
2 changed files with 173 additions and 36 deletions

View File

@@ -22,18 +22,23 @@ type EtcdRegistryConfig struct {
// EtcdRegistry etcd 服务注册(带自动重连)
type EtcdRegistry struct {
config EtcdRegistryConfig
etcdClient *EtcdClient
cli *clientv3.Client
key string
val string
serviceName string
ttl int64
leaseID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
config EtcdRegistryConfig
etcdClient *EtcdClient
cli *clientv3.Client
key string
val string
serviceName string
ttl int64
leaseID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
watcherCtx context.Context
watcherCancel context.CancelFunc
watcherMu sync.Mutex
lastRegisterTime time.Time
registerMu sync.Mutex
}
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
@@ -53,12 +58,15 @@ func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
return nil, err
}
watcherCtx, watcherCancel := context.WithCancel(context.Background())
return &EtcdRegistry{
config: cfg,
etcdClient: etcdClient,
cli: etcdClient.GetClient(),
ctx: ctx,
cancel: cancel,
config: cfg,
etcdClient: etcdClient,
cli: etcdClient.GetClient(),
ctx: ctx,
cancel: cancel,
watcherCtx: watcherCtx,
watcherCancel: watcherCancel,
}, nil
}
@@ -75,15 +83,30 @@ func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
}
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
// 防抖:防止短时间内重复注册
r.registerMu.Lock()
now := time.Now()
if !r.lastRegisterTime.IsZero() && now.Sub(r.lastRegisterTime) < 2*time.Second {
r.registerMu.Unlock()
return fmt.Errorf("register too frequently, please wait")
}
r.lastRegisterTime = now
r.registerMu.Unlock()
r.mu.RLock()
cli := r.cli
key := r.key
val := r.val
r.mu.RUnlock()
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
if cli == nil {
// 尝试重新获取客户端
if r.etcdClient != nil {
// 检查连接状态
state := r.etcdClient.State()
if state != StateConnected {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
@@ -92,66 +115,174 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error {
if cli == nil {
return fmt.Errorf("etcd client is nil")
}
} else {
// 即使 cli 不为 nil也要检查连接状态因为可能连接已断开但 cli 引用还在
if r.etcdClient != nil {
state := r.etcdClient.State()
if state != StateConnected {
// 连接已断开,尝试更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli == nil {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
}
}
}
resp, err := cli.Grant(context.Background(), ttl)
// 使用带超时的 context 来避免长时间阻塞
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := cli.Grant(ctx, ttl)
if err != nil {
return err
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
if r.etcdClient != nil {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli != nil {
// 使用新的 client 重试
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
resp, err = cli.Grant(ctx2, ttl)
}
}
if err != nil {
return fmt.Errorf("failed to grant lease: %w", err)
}
}
r.mu.Lock()
r.leaseID = resp.ID
r.mu.Unlock()
_, err = cli.Put(context.Background(), key, val, clientv3.WithLease(resp.ID))
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel3()
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return err
return fmt.Errorf("failed to put key: %w", err)
}
// KeepAlive 需要使用长期有效的 context不能使用带超时的 context
keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
return fmt.Errorf("failed to start keep-alive: %w", err)
}
r.mu.Lock()
r.keepAliveCh = keepAliveCh
r.mu.Unlock()
go r.watcher(ttl)
// 停止旧的 watcher如果存在
r.watcherMu.Lock()
if r.watcherCancel != nil {
r.watcherCancel()
}
// 创建新的 watcher context
r.watcherCtx, r.watcherCancel = context.WithCancel(context.Background())
watcherCtx := r.watcherCtx
r.watcherMu.Unlock()
go r.watcher(ttl, watcherCtx)
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
return nil
}
// watcher 监听续约
func (r *EtcdRegistry) watcher(ttl int64) {
func (r *EtcdRegistry) watcher(ttl int64, watcherCtx context.Context) {
for {
select {
case <-r.ctx.Done():
log.Println("[EtcdRegistry] context done, watcher exiting.")
return
case <-watcherCtx.Done():
log.Println("[EtcdRegistry] watcher context done, exiting.")
return
case ka, ok := <-r.keepAliveCh:
if !ok {
log.Println("[EtcdRegistry] keep-alive channel closed, attempting to re-register...")
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
// 更新 cli 引用
if r.etcdClient != nil {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
// 等待连接恢复,而不是立即重试
maxWaitTime := 60 * time.Second
checkInterval := 500 * time.Millisecond
waited := time.Duration(0)
for waited < maxWaitTime {
select {
case <-r.ctx.Done():
return
case <-watcherCtx.Done():
return
default:
}
// 检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
// 连接已恢复,更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
// 等待一段时间确保连接稳定,避免立即重新注册
time.Sleep(1 * time.Second)
// 尝试重新注册
if err := r.registerWithKV(ttl); err == nil {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
} else {
// 如果是防抖错误,继续等待
if err.Error() == "register too frequently, please wait" {
log.Println("[EtcdRegistry] 注册过于频繁,继续等待...")
time.Sleep(2 * time.Second)
} else {
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
}
}
}
}
time.Sleep(checkInterval)
waited += checkInterval
}
// 使用指数退避重试
delay := time.Second
// 如果等待超时,使用指数退避重试
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
delay := 2 * time.Second
maxDelay := 30 * time.Second
for {
select {
case <-r.ctx.Done():
return
case <-watcherCtx.Done():
return
default:
}
// 再次检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
}
}
if err := r.registerWithKV(ttl); err != nil {
// 如果是防抖错误,等待更长时间
if err.Error() == "register too frequently, please wait" {
log.Println("[EtcdRegistry] 注册过于频繁,等待更长时间...")
time.Sleep(3 * time.Second)
continue
}
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
time.Sleep(delay)
delay *= 2
@@ -159,6 +290,7 @@ func (r *EtcdRegistry) watcher(ttl int64) {
delay = maxDelay
}
} else {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
}
}
@@ -171,7 +303,14 @@ func (r *EtcdRegistry) watcher(ttl int64) {
// UnRegister 注销服务
func (r *EtcdRegistry) UnRegister() {
r.cancel() // 停止 watcher
r.cancel() // 停止主 context
// 停止 watcher
r.watcherMu.Lock()
if r.watcherCancel != nil {
r.watcherCancel()
}
r.watcherMu.Unlock()
r.mu.RLock()
cli := r.cli
@@ -197,4 +336,3 @@ func (r *EtcdRegistry) State() ConnectionState {
}
return StateDisconnected
}

View File

@@ -223,4 +223,3 @@ func (cm *ConnectionManager) Close() error {
func (cm *ConnectionManager) TriggerReconnect() {
cm.reconnect()
}