From 4382d345f61fad16b4a3a1ddf2d8f024bed1aa3f Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:48 +0800 Subject: [PATCH] Add Reconnect package for managing automatic reconnections, including a Reconnectable interface, configuration options, and a Manager for connection monitoring and retries. --- reconnect.go | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 reconnect.go diff --git a/reconnect.go b/reconnect.go new file mode 100644 index 0000000..2a84ef8 --- /dev/null +++ b/reconnect.go @@ -0,0 +1,162 @@ +package reconnect + +import ( + "context" + "errors" + "fmt" + "time" +) + +// Reconnectable 定义可重连的接口 +type Reconnectable interface { + // Connect 建立连接 + Connect(ctx context.Context) error + // Close 关闭连接 + Close() error + // Ping 检查连接是否健康 + Ping(ctx context.Context) error + // IsConnected 检查连接状态 + IsConnected() bool +} + +// Config 重连配置 +type Config struct { + // MaxRetries 最大重试次数,0表示无限重试 + MaxRetries int + // RetryInterval 重试间隔 + RetryInterval time.Duration + // Timeout 连接超时时间 + Timeout time.Duration + // OnReconnect 重连成功后的回调函数 + OnReconnect func() + // OnDisconnect 断开连接时的回调函数 + OnDisconnect func(error) +} + +// DefaultConfig 返回默认配置 +func DefaultConfig() *Config { + return &Config{ + MaxRetries: 0, // 无限重试 + RetryInterval: 3 * time.Second, + Timeout: 10 * time.Second, + } +} + +// Manager 重连管理器 +type Manager struct { + client Reconnectable + config *Config + ctx context.Context + cancel context.CancelFunc + connected bool + errCh chan error +} + +// NewManager 创建新的重连管理器 +func NewManager(client Reconnectable, config *Config) *Manager { + if config == nil { + config = DefaultConfig() + } + ctx, cancel := context.WithCancel(context.Background()) + return &Manager{ + client: client, + config: config, + ctx: ctx, + cancel: cancel, + errCh: make(chan error, 1), + } +} + +// Start 启动连接并开始监控 +func (m *Manager) Start(ctx context.Context) error { + // 初始连接 + if err := m.connect(ctx); err != nil { + return fmt.Errorf("initial connection failed: %w", err) + } + + // 启动监控 goroutine + go m.monitor() + + return nil +} + +// Stop 停止重连管理器 +func (m *Manager) Stop() error { + m.cancel() + if m.client != nil { + return m.client.Close() + } + return nil +} + +// connect 执行连接 +func (m *Manager) connect(ctx context.Context) error { + connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout) + defer cancel() + + if err := m.client.Connect(connectCtx); err != nil { + return err + } + + m.connected = true + if m.config.OnReconnect != nil { + m.config.OnReconnect() + } + return nil +} + +// monitor 监控连接状态并自动重连 +func (m *Manager) monitor() { + ticker := time.NewTicker(m.config.RetryInterval) + defer ticker.Stop() + + retryCount := 0 + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + // 检查连接状态 + if !m.client.IsConnected() { + m.connected = false + if m.config.OnDisconnect != nil { + m.config.OnDisconnect(errors.New("connection lost")) + } + + // 检查是否超过最大重试次数 + if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries { + m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries) + return + } + + // 尝试重连 + if err := m.connect(context.Background()); err != nil { + retryCount++ + continue + } + + // 重连成功,重置计数器 + retryCount = 0 + } else { + // 连接正常,执行健康检查 + if err := m.client.Ping(context.Background()); err != nil { + m.connected = false + if m.config.OnDisconnect != nil { + m.config.OnDisconnect(err) + } + } + } + } + } +} + +// IsConnected 返回当前连接状态 +func (m *Manager) IsConnected() bool { + return m.connected && m.client.IsConnected() +} + +// WaitForError 等待错误(用于阻塞等待) +func (m *Manager) WaitForError() error { + return <-m.errCh +}