Add Reconnect package for managing automatic reconnections, including a Reconnectable interface, configuration options, and a Manager for connection monitoring and retries.

This commit is contained in:
2025-12-12 15:48:48 +08:00
parent a8180f5115
commit 4382d345f6

162
reconnect.go Normal file
View File

@@ -0,0 +1,162 @@
package reconnect
import (
"context"
"errors"
"fmt"
"time"
)
// Reconnectable 定义可重连的接口
type Reconnectable interface {
// Connect 建立连接
Connect(ctx context.Context) error
// Close 关闭连接
Close() error
// Ping 检查连接是否健康
Ping(ctx context.Context) error
// IsConnected 检查连接状态
IsConnected() bool
}
// Config 重连配置
type Config struct {
// MaxRetries 最大重试次数0表示无限重试
MaxRetries int
// RetryInterval 重试间隔
RetryInterval time.Duration
// Timeout 连接超时时间
Timeout time.Duration
// OnReconnect 重连成功后的回调函数
OnReconnect func()
// OnDisconnect 断开连接时的回调函数
OnDisconnect func(error)
}
// DefaultConfig 返回默认配置
func DefaultConfig() *Config {
return &Config{
MaxRetries: 0, // 无限重试
RetryInterval: 3 * time.Second,
Timeout: 10 * time.Second,
}
}
// Manager 重连管理器
type Manager struct {
client Reconnectable
config *Config
ctx context.Context
cancel context.CancelFunc
connected bool
errCh chan error
}
// NewManager 创建新的重连管理器
func NewManager(client Reconnectable, config *Config) *Manager {
if config == nil {
config = DefaultConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &Manager{
client: client,
config: config,
ctx: ctx,
cancel: cancel,
errCh: make(chan error, 1),
}
}
// Start 启动连接并开始监控
func (m *Manager) Start(ctx context.Context) error {
// 初始连接
if err := m.connect(ctx); err != nil {
return fmt.Errorf("initial connection failed: %w", err)
}
// 启动监控 goroutine
go m.monitor()
return nil
}
// Stop 停止重连管理器
func (m *Manager) Stop() error {
m.cancel()
if m.client != nil {
return m.client.Close()
}
return nil
}
// connect 执行连接
func (m *Manager) connect(ctx context.Context) error {
connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout)
defer cancel()
if err := m.client.Connect(connectCtx); err != nil {
return err
}
m.connected = true
if m.config.OnReconnect != nil {
m.config.OnReconnect()
}
return nil
}
// monitor 监控连接状态并自动重连
func (m *Manager) monitor() {
ticker := time.NewTicker(m.config.RetryInterval)
defer ticker.Stop()
retryCount := 0
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
// 检查连接状态
if !m.client.IsConnected() {
m.connected = false
if m.config.OnDisconnect != nil {
m.config.OnDisconnect(errors.New("connection lost"))
}
// 检查是否超过最大重试次数
if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries {
m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries)
return
}
// 尝试重连
if err := m.connect(context.Background()); err != nil {
retryCount++
continue
}
// 重连成功,重置计数器
retryCount = 0
} else {
// 连接正常,执行健康检查
if err := m.client.Ping(context.Background()); err != nil {
m.connected = false
if m.config.OnDisconnect != nil {
m.config.OnDisconnect(err)
}
}
}
}
}
}
// IsConnected 返回当前连接状态
func (m *Manager) IsConnected() bool {
return m.connected && m.client.IsConnected()
}
// WaitForError 等待错误(用于阻塞等待)
func (m *Manager) WaitForError() error {
return <-m.errCh
}