Files
reconnect/manager.go
ray c0dd21b0f0 feat(etcd): 增强 etcd 客户端连接管理和重试机制
- 添加连接状态检查,确保在使用 etcd 客户端前验证其连接状态
- 实现超时控制,避免长时间阻塞在 etcd 操作中
- 优化重试逻辑,增加连接恢复后的重试机制,确保服务注册的可靠性
- 改进日志输出,提供更清晰的状态信息和错误处理
2025-12-18 22:47:10 +08:00

226 lines
4.7 KiB
Go

package reconnect
import (
"context"
"log"
"sync"
"sync/atomic"
"time"
)
// HealthChecker 健康检查接口
type HealthChecker interface {
// HealthCheck 执行健康检查,返回 nil 表示健康
HealthCheck(ctx context.Context) error
}
// Connector 连接器接口
type Connector interface {
// Connect 建立连接
Connect(ctx context.Context) error
// Close 关闭连接
Close() error
}
// ConnectionManager 连接管理器
type ConnectionManager struct {
config Config
strategy Strategy
connector Connector
checker HealthChecker
state atomic.Int32
attempts atomic.Int32
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
closeOnce sync.Once
}
// NewConnectionManager 创建连接管理器
func NewConnectionManager(connector Connector, checker HealthChecker, cfg Config) *ConnectionManager {
cfg.Validate()
ctx, cancel := context.WithCancel(context.Background())
cm := &ConnectionManager{
config: cfg,
strategy: NewStrategy(cfg),
connector: connector,
checker: checker,
ctx: ctx,
cancel: cancel,
}
cm.state.Store(int32(StateDisconnected))
return cm
}
// State 获取当前连接状态
func (cm *ConnectionManager) State() ConnectionState {
return ConnectionState(cm.state.Load())
}
// setState 设置连接状态
func (cm *ConnectionManager) setState(newState ConnectionState) {
oldState := ConnectionState(cm.state.Swap(int32(newState)))
if oldState != newState && cm.config.OnStateChange != nil {
cm.config.OnStateChange(oldState, newState)
}
}
// Connect 建立连接并启动健康检查
func (cm *ConnectionManager) Connect(ctx context.Context) error {
cm.setState(StateConnecting)
if err := cm.connector.Connect(ctx); err != nil {
cm.setState(StateDisconnected)
if cm.config.OnError != nil {
cm.config.OnError(err)
}
return err
}
cm.setState(StateConnected)
cm.strategy.Reset()
cm.attempts.Store(0)
// 启动健康检查
if cm.checker != nil && cm.config.HealthCheckInterval > 0 {
cm.wg.Add(1)
go cm.healthCheckLoop()
}
return nil
}
// ConnectWithRetry 带重试的连接
func (cm *ConnectionManager) ConnectWithRetry(ctx context.Context) error {
cm.setState(StateConnecting)
for {
select {
case <-ctx.Done():
cm.setState(StateDisconnected)
return ctx.Err()
case <-cm.ctx.Done():
cm.setState(StateDisconnected)
return cm.ctx.Err()
default:
}
attempt := int(cm.attempts.Add(1))
if err := cm.connector.Connect(ctx); err != nil {
if cm.config.OnError != nil {
cm.config.OnError(err)
}
// 检查是否超过最大重试次数
if cm.config.MaxRetries >= 0 && attempt >= cm.config.MaxRetries {
cm.setState(StateDisconnected)
return err
}
delay := cm.strategy.NextDelay(attempt)
log.Printf("[reconnect] 连接失败 (尝试 %d): %v, %v 后重试...", attempt, err, delay)
select {
case <-ctx.Done():
cm.setState(StateDisconnected)
return ctx.Err()
case <-cm.ctx.Done():
cm.setState(StateDisconnected)
return cm.ctx.Err()
case <-time.After(delay):
continue
}
}
// 连接成功
cm.setState(StateConnected)
cm.strategy.Reset()
cm.attempts.Store(0)
if cm.config.OnReconnect != nil && attempt > 1 {
cm.config.OnReconnect(attempt)
}
// 启动健康检查
if cm.checker != nil && cm.config.HealthCheckInterval > 0 {
cm.wg.Add(1)
go cm.healthCheckLoop()
}
log.Printf("[reconnect] 连接成功")
return nil
}
}
// healthCheckLoop 健康检查循环
func (cm *ConnectionManager) healthCheckLoop() {
defer cm.wg.Done()
ticker := time.NewTicker(cm.config.HealthCheckInterval)
defer ticker.Stop()
for {
select {
case <-cm.ctx.Done():
return
case <-ticker.C:
if cm.State() != StateConnected {
continue
}
ctx, cancel := context.WithTimeout(cm.ctx, cm.config.HealthCheckTimeout)
err := cm.checker.HealthCheck(ctx)
cancel()
if err != nil {
log.Printf("[reconnect] 健康检查失败: %v, 开始重连...", err)
if cm.config.OnError != nil {
cm.config.OnError(err)
}
cm.reconnect()
}
}
}
}
// reconnect 执行重连
func (cm *ConnectionManager) reconnect() {
if cm.State() == StateReconnecting {
return // 已经在重连中
}
cm.setState(StateReconnecting)
// 关闭旧连接
_ = cm.connector.Close()
// 重新连接
go func() {
if err := cm.ConnectWithRetry(cm.ctx); err != nil {
log.Printf("[reconnect] 重连失败: %v", err)
}
}()
}
// Close 关闭连接管理器
func (cm *ConnectionManager) Close() error {
var err error
cm.closeOnce.Do(func() {
cm.cancel()
cm.wg.Wait()
err = cm.connector.Close()
cm.setState(StateDisconnected)
})
return err
}
// TriggerReconnect 手动触发重连
func (cm *ConnectionManager) TriggerReconnect() {
cm.reconnect()
}