- 添加连接状态检查,确保在使用 etcd 客户端前验证其连接状态 - 实现超时控制,避免长时间阻塞在 etcd 操作中 - 优化重试逻辑,增加连接恢复后的重试机制,确保服务注册的可靠性 - 改进日志输出,提供更清晰的状态信息和错误处理
226 lines
4.7 KiB
Go
226 lines
4.7 KiB
Go
package reconnect
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// HealthChecker 健康检查接口
|
|
type HealthChecker interface {
|
|
// HealthCheck 执行健康检查,返回 nil 表示健康
|
|
HealthCheck(ctx context.Context) error
|
|
}
|
|
|
|
// Connector 连接器接口
|
|
type Connector interface {
|
|
// Connect 建立连接
|
|
Connect(ctx context.Context) error
|
|
// Close 关闭连接
|
|
Close() error
|
|
}
|
|
|
|
// ConnectionManager 连接管理器
|
|
type ConnectionManager struct {
|
|
config Config
|
|
strategy Strategy
|
|
connector Connector
|
|
checker HealthChecker
|
|
|
|
state atomic.Int32
|
|
attempts atomic.Int32
|
|
mu sync.RWMutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
closeOnce sync.Once
|
|
}
|
|
|
|
// NewConnectionManager 创建连接管理器
|
|
func NewConnectionManager(connector Connector, checker HealthChecker, cfg Config) *ConnectionManager {
|
|
cfg.Validate()
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
cm := &ConnectionManager{
|
|
config: cfg,
|
|
strategy: NewStrategy(cfg),
|
|
connector: connector,
|
|
checker: checker,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
cm.state.Store(int32(StateDisconnected))
|
|
return cm
|
|
}
|
|
|
|
// State 获取当前连接状态
|
|
func (cm *ConnectionManager) State() ConnectionState {
|
|
return ConnectionState(cm.state.Load())
|
|
}
|
|
|
|
// setState 设置连接状态
|
|
func (cm *ConnectionManager) setState(newState ConnectionState) {
|
|
oldState := ConnectionState(cm.state.Swap(int32(newState)))
|
|
if oldState != newState && cm.config.OnStateChange != nil {
|
|
cm.config.OnStateChange(oldState, newState)
|
|
}
|
|
}
|
|
|
|
// Connect 建立连接并启动健康检查
|
|
func (cm *ConnectionManager) Connect(ctx context.Context) error {
|
|
cm.setState(StateConnecting)
|
|
|
|
if err := cm.connector.Connect(ctx); err != nil {
|
|
cm.setState(StateDisconnected)
|
|
if cm.config.OnError != nil {
|
|
cm.config.OnError(err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
cm.setState(StateConnected)
|
|
cm.strategy.Reset()
|
|
cm.attempts.Store(0)
|
|
|
|
// 启动健康检查
|
|
if cm.checker != nil && cm.config.HealthCheckInterval > 0 {
|
|
cm.wg.Add(1)
|
|
go cm.healthCheckLoop()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConnectWithRetry 带重试的连接
|
|
func (cm *ConnectionManager) ConnectWithRetry(ctx context.Context) error {
|
|
cm.setState(StateConnecting)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
cm.setState(StateDisconnected)
|
|
return ctx.Err()
|
|
case <-cm.ctx.Done():
|
|
cm.setState(StateDisconnected)
|
|
return cm.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
attempt := int(cm.attempts.Add(1))
|
|
|
|
if err := cm.connector.Connect(ctx); err != nil {
|
|
if cm.config.OnError != nil {
|
|
cm.config.OnError(err)
|
|
}
|
|
|
|
// 检查是否超过最大重试次数
|
|
if cm.config.MaxRetries >= 0 && attempt >= cm.config.MaxRetries {
|
|
cm.setState(StateDisconnected)
|
|
return err
|
|
}
|
|
|
|
delay := cm.strategy.NextDelay(attempt)
|
|
log.Printf("[reconnect] 连接失败 (尝试 %d): %v, %v 后重试...", attempt, err, delay)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
cm.setState(StateDisconnected)
|
|
return ctx.Err()
|
|
case <-cm.ctx.Done():
|
|
cm.setState(StateDisconnected)
|
|
return cm.ctx.Err()
|
|
case <-time.After(delay):
|
|
continue
|
|
}
|
|
}
|
|
|
|
// 连接成功
|
|
cm.setState(StateConnected)
|
|
cm.strategy.Reset()
|
|
cm.attempts.Store(0)
|
|
|
|
if cm.config.OnReconnect != nil && attempt > 1 {
|
|
cm.config.OnReconnect(attempt)
|
|
}
|
|
|
|
// 启动健康检查
|
|
if cm.checker != nil && cm.config.HealthCheckInterval > 0 {
|
|
cm.wg.Add(1)
|
|
go cm.healthCheckLoop()
|
|
}
|
|
|
|
log.Printf("[reconnect] 连接成功")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// healthCheckLoop 健康检查循环
|
|
func (cm *ConnectionManager) healthCheckLoop() {
|
|
defer cm.wg.Done()
|
|
|
|
ticker := time.NewTicker(cm.config.HealthCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cm.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if cm.State() != StateConnected {
|
|
continue
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(cm.ctx, cm.config.HealthCheckTimeout)
|
|
err := cm.checker.HealthCheck(ctx)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
log.Printf("[reconnect] 健康检查失败: %v, 开始重连...", err)
|
|
if cm.config.OnError != nil {
|
|
cm.config.OnError(err)
|
|
}
|
|
cm.reconnect()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// reconnect 执行重连
|
|
func (cm *ConnectionManager) reconnect() {
|
|
if cm.State() == StateReconnecting {
|
|
return // 已经在重连中
|
|
}
|
|
|
|
cm.setState(StateReconnecting)
|
|
|
|
// 关闭旧连接
|
|
_ = cm.connector.Close()
|
|
|
|
// 重新连接
|
|
go func() {
|
|
if err := cm.ConnectWithRetry(cm.ctx); err != nil {
|
|
log.Printf("[reconnect] 重连失败: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Close 关闭连接管理器
|
|
func (cm *ConnectionManager) Close() error {
|
|
var err error
|
|
cm.closeOnce.Do(func() {
|
|
cm.cancel()
|
|
cm.wg.Wait()
|
|
err = cm.connector.Close()
|
|
cm.setState(StateDisconnected)
|
|
})
|
|
return err
|
|
}
|
|
|
|
// TriggerReconnect 手动触发重连
|
|
func (cm *ConnectionManager) TriggerReconnect() {
|
|
cm.reconnect()
|
|
}
|