package reconnect import ( "context" "log" "sync" "sync/atomic" "time" ) // HealthChecker 健康检查接口 type HealthChecker interface { // HealthCheck 执行健康检查,返回 nil 表示健康 HealthCheck(ctx context.Context) error } // Connector 连接器接口 type Connector interface { // Connect 建立连接 Connect(ctx context.Context) error // Close 关闭连接 Close() error } // ConnectionManager 连接管理器 type ConnectionManager struct { config Config strategy Strategy connector Connector checker HealthChecker state atomic.Int32 attempts atomic.Int32 mu sync.RWMutex ctx context.Context cancel context.CancelFunc wg sync.WaitGroup closeOnce sync.Once } // NewConnectionManager 创建连接管理器 func NewConnectionManager(connector Connector, checker HealthChecker, cfg Config) *ConnectionManager { cfg.Validate() ctx, cancel := context.WithCancel(context.Background()) cm := &ConnectionManager{ config: cfg, strategy: NewStrategy(cfg), connector: connector, checker: checker, ctx: ctx, cancel: cancel, } cm.state.Store(int32(StateDisconnected)) return cm } // State 获取当前连接状态 func (cm *ConnectionManager) State() ConnectionState { return ConnectionState(cm.state.Load()) } // setState 设置连接状态 func (cm *ConnectionManager) setState(newState ConnectionState) { oldState := ConnectionState(cm.state.Swap(int32(newState))) if oldState != newState && cm.config.OnStateChange != nil { cm.config.OnStateChange(oldState, newState) } } // Connect 建立连接并启动健康检查 func (cm *ConnectionManager) Connect(ctx context.Context) error { cm.setState(StateConnecting) if err := cm.connector.Connect(ctx); err != nil { cm.setState(StateDisconnected) if cm.config.OnError != nil { cm.config.OnError(err) } return err } cm.setState(StateConnected) cm.strategy.Reset() cm.attempts.Store(0) // 启动健康检查 if cm.checker != nil && cm.config.HealthCheckInterval > 0 { cm.wg.Add(1) go cm.healthCheckLoop() } return nil } // ConnectWithRetry 带重试的连接 func (cm *ConnectionManager) ConnectWithRetry(ctx context.Context) error { cm.setState(StateConnecting) for { select { case <-ctx.Done(): cm.setState(StateDisconnected) return ctx.Err() case <-cm.ctx.Done(): cm.setState(StateDisconnected) return cm.ctx.Err() default: } attempt := int(cm.attempts.Add(1)) if err := cm.connector.Connect(ctx); err != nil { if cm.config.OnError != nil { cm.config.OnError(err) } // 检查是否超过最大重试次数 if cm.config.MaxRetries >= 0 && attempt >= cm.config.MaxRetries { cm.setState(StateDisconnected) return err } delay := cm.strategy.NextDelay(attempt) log.Printf("[reconnect] 连接失败 (尝试 %d): %v, %v 后重试...", attempt, err, delay) select { case <-ctx.Done(): cm.setState(StateDisconnected) return ctx.Err() case <-cm.ctx.Done(): cm.setState(StateDisconnected) return cm.ctx.Err() case <-time.After(delay): continue } } // 连接成功 cm.setState(StateConnected) cm.strategy.Reset() cm.attempts.Store(0) if cm.config.OnReconnect != nil && attempt > 1 { cm.config.OnReconnect(attempt) } // 启动健康检查 if cm.checker != nil && cm.config.HealthCheckInterval > 0 { cm.wg.Add(1) go cm.healthCheckLoop() } log.Printf("[reconnect] 连接成功") return nil } } // healthCheckLoop 健康检查循环 func (cm *ConnectionManager) healthCheckLoop() { defer cm.wg.Done() ticker := time.NewTicker(cm.config.HealthCheckInterval) defer ticker.Stop() for { select { case <-cm.ctx.Done(): return case <-ticker.C: if cm.State() != StateConnected { continue } ctx, cancel := context.WithTimeout(cm.ctx, cm.config.HealthCheckTimeout) err := cm.checker.HealthCheck(ctx) cancel() if err != nil { log.Printf("[reconnect] 健康检查失败: %v, 开始重连...", err) if cm.config.OnError != nil { cm.config.OnError(err) } cm.reconnect() } } } } // reconnect 执行重连 func (cm *ConnectionManager) reconnect() { if cm.State() == StateReconnecting { return // 已经在重连中 } cm.setState(StateReconnecting) // 关闭旧连接 _ = cm.connector.Close() // 重新连接 go func() { if err := cm.ConnectWithRetry(cm.ctx); err != nil { log.Printf("[reconnect] 重连失败: %v", err) } }() } // Close 关闭连接管理器 func (cm *ConnectionManager) Close() error { var err error cm.closeOnce.Do(func() { cm.cancel() cm.wg.Wait() err = cm.connector.Close() cm.setState(StateDisconnected) }) return err } // TriggerReconnect 手动触发重连 func (cm *ConnectionManager) TriggerReconnect() { cm.reconnect() }