Files
reconnect/reconnect.go

163 lines
3.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package reconnect
import (
"context"
"errors"
"fmt"
"time"
)
// Reconnectable 定义可重连的接口
type Reconnectable interface {
// Connect 建立连接
Connect(ctx context.Context) error
// Close 关闭连接
Close() error
// Ping 检查连接是否健康
Ping(ctx context.Context) error
// IsConnected 检查连接状态
IsConnected() bool
}
// Config 重连配置
type Config struct {
// MaxRetries 最大重试次数0表示无限重试
MaxRetries int
// RetryInterval 重试间隔
RetryInterval time.Duration
// Timeout 连接超时时间
Timeout time.Duration
// OnReconnect 重连成功后的回调函数
OnReconnect func()
// OnDisconnect 断开连接时的回调函数
OnDisconnect func(error)
}
// DefaultConfig 返回默认配置
func DefaultConfig() *Config {
return &Config{
MaxRetries: 0, // 无限重试
RetryInterval: 3 * time.Second,
Timeout: 10 * time.Second,
}
}
// Manager 重连管理器
type Manager struct {
client Reconnectable
config *Config
ctx context.Context
cancel context.CancelFunc
connected bool
errCh chan error
}
// NewManager 创建新的重连管理器
func NewManager(client Reconnectable, config *Config) *Manager {
if config == nil {
config = DefaultConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &Manager{
client: client,
config: config,
ctx: ctx,
cancel: cancel,
errCh: make(chan error, 1),
}
}
// Start 启动连接并开始监控
func (m *Manager) Start(ctx context.Context) error {
// 初始连接
if err := m.connect(ctx); err != nil {
return fmt.Errorf("initial connection failed: %w", err)
}
// 启动监控 goroutine
go m.monitor()
return nil
}
// Stop 停止重连管理器
func (m *Manager) Stop() error {
m.cancel()
if m.client != nil {
return m.client.Close()
}
return nil
}
// connect 执行连接
func (m *Manager) connect(ctx context.Context) error {
connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout)
defer cancel()
if err := m.client.Connect(connectCtx); err != nil {
return err
}
m.connected = true
if m.config.OnReconnect != nil {
m.config.OnReconnect()
}
return nil
}
// monitor 监控连接状态并自动重连
func (m *Manager) monitor() {
ticker := time.NewTicker(m.config.RetryInterval)
defer ticker.Stop()
retryCount := 0
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
// 检查连接状态
if !m.client.IsConnected() {
m.connected = false
if m.config.OnDisconnect != nil {
m.config.OnDisconnect(errors.New("connection lost"))
}
// 检查是否超过最大重试次数
if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries {
m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries)
return
}
// 尝试重连
if err := m.connect(context.Background()); err != nil {
retryCount++
continue
}
// 重连成功,重置计数器
retryCount = 0
} else {
// 连接正常,执行健康检查
if err := m.client.Ping(context.Background()); err != nil {
m.connected = false
if m.config.OnDisconnect != nil {
m.config.OnDisconnect(err)
}
}
}
}
}
}
// IsConnected 返回当前连接状态
func (m *Manager) IsConnected() bool {
return m.connected && m.client.IsConnected()
}
// WaitForError 等待错误(用于阻塞等待)
func (m *Manager) WaitForError() error {
return <-m.errCh
}