package reconnect import ( "context" "errors" "fmt" "time" ) // Reconnectable 定义可重连的接口 type Reconnectable interface { // Connect 建立连接 Connect(ctx context.Context) error // Close 关闭连接 Close() error // Ping 检查连接是否健康 Ping(ctx context.Context) error // IsConnected 检查连接状态 IsConnected() bool } // Config 重连配置 type Config struct { // MaxRetries 最大重试次数,0表示无限重试 MaxRetries int // RetryInterval 重试间隔 RetryInterval time.Duration // Timeout 连接超时时间 Timeout time.Duration // OnReconnect 重连成功后的回调函数 OnReconnect func() // OnDisconnect 断开连接时的回调函数 OnDisconnect func(error) } // DefaultConfig 返回默认配置 func DefaultConfig() *Config { return &Config{ MaxRetries: 0, // 无限重试 RetryInterval: 3 * time.Second, Timeout: 10 * time.Second, } } // Manager 重连管理器 type Manager struct { client Reconnectable config *Config ctx context.Context cancel context.CancelFunc connected bool errCh chan error } // NewManager 创建新的重连管理器 func NewManager(client Reconnectable, config *Config) *Manager { if config == nil { config = DefaultConfig() } ctx, cancel := context.WithCancel(context.Background()) return &Manager{ client: client, config: config, ctx: ctx, cancel: cancel, errCh: make(chan error, 1), } } // Start 启动连接并开始监控 func (m *Manager) Start(ctx context.Context) error { // 初始连接 if err := m.connect(ctx); err != nil { return fmt.Errorf("initial connection failed: %w", err) } // 启动监控 goroutine go m.monitor() return nil } // Stop 停止重连管理器 func (m *Manager) Stop() error { m.cancel() if m.client != nil { return m.client.Close() } return nil } // connect 执行连接 func (m *Manager) connect(ctx context.Context) error { connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout) defer cancel() if err := m.client.Connect(connectCtx); err != nil { return err } m.connected = true if m.config.OnReconnect != nil { m.config.OnReconnect() } return nil } // monitor 监控连接状态并自动重连 func (m *Manager) monitor() { ticker := time.NewTicker(m.config.RetryInterval) defer ticker.Stop() retryCount := 0 for { select { case <-m.ctx.Done(): return case <-ticker.C: // 检查连接状态 if !m.client.IsConnected() { m.connected = false if m.config.OnDisconnect != nil { m.config.OnDisconnect(errors.New("connection lost")) } // 检查是否超过最大重试次数 if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries { m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries) return } // 尝试重连 if err := m.connect(context.Background()); err != nil { retryCount++ continue } // 重连成功,重置计数器 retryCount = 0 } else { // 连接正常,执行健康检查 if err := m.client.Ping(context.Background()); err != nil { m.connected = false if m.config.OnDisconnect != nil { m.config.OnDisconnect(err) } } } } } } // IsConnected 返回当前连接状态 func (m *Manager) IsConnected() bool { return m.connected && m.client.IsConnected() } // WaitForError 等待错误(用于阻塞等待) func (m *Manager) WaitForError() error { return <-m.errCh }