ray 2beefb2491 feat(etcd): 增强 etcd 服务注册功能
- 添加防抖机制,防止短时间内重复注册服务
- 引入 watcher 机制,支持动态监控服务续约状态
- 优化 watcher 的上下文管理,确保资源的正确释放
- 改进日志输出,提供更详细的注册和重试信息
2025-12-18 22:50:14 +08:00
2025-12-18 15:11:02 +08:00
2025-12-18 15:11:02 +08:00
2025-12-18 15:11:02 +08:00
2025-12-18 15:11:02 +08:00
2025-12-18 15:36:07 +08:00
2025-12-18 15:11:02 +08:00
2025-12-18 15:11:02 +08:00

Reconnect - Go 连接重连库

Go Version License

一个用于 Go 语言的智能连接重连库支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。

特性

  • 🔄 智能重连: 支持多种重连策略(指数退避、固定间隔、线性退避)
  • 💗 健康检查: 定期检查连接状态,自动发现问题并重连
  • 🔌 多服务支持: 内置支持数据库GORM、Redis、etcd、gRPC
  • ⚙️ 高度可配置: 灵活的配置选项,满足不同场景需求
  • 📊 状态监控: 实时连接状态监控和回调通知
  • 🛡️ 线程安全: 完全的并发安全设计

支持的服务

  • 数据库: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等
  • Redis: 基于 go-redis/v8 客户端
  • etcd: 基于 etcd client/v3
  • gRPC: 基于 google.golang.org/grpc

安装

go get git.whblueocean.cn/blueocean-go/reconnect

快速开始

数据库重连

package main

import (
    "fmt"
    "log"
    "time"
    
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "git.whblueocean.cn/blueocean-go/reconnect"
)

func main() {
    // 配置数据库连接
    config := reconnect.DatabaseConfig{
        Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"),
        GormConfig: &gorm.Config{},
        MaxIdleConns: 10,
        MaxOpenConns: 100,
        ConnMaxLifetime: time.Hour,
        ReconnectConfig: reconnect.Config{
            Strategy:            reconnect.StrategyExponentialBackoff,
            MaxRetries:          -1, // 无限重试
            InitialDelay:        1 * time.Second,
            MaxDelay:            30 * time.Second,
            Multiplier:          2.0,
            HealthCheckInterval: 10 * time.Second,
            HealthCheckTimeout:  5 * time.Second,
            OnStateChange: func(oldState, newState reconnect.ConnectionState) {
                log.Printf("数据库状态变化: %s -> %s", oldState, newState)
            },
            OnError: func(err error) {
                log.Printf("数据库错误: %v", err)
            },
        },
    }
    
    // 创建客户端
    client, err := reconnect.NewDatabaseClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // 使用数据库
    db := client.GetDB()
    // ... 执行数据库操作
}

Redis 重连

package main

import (
    "log"
    "time"
    
    "github.com/go-redis/redis/v8"
    "git.whblueocean.cn/blueocean-go/reconnect"
)

func main() {
    // 配置 Redis 连接
    config := reconnect.RedisClientConfig{
        Options: &redis.Options{
            Addr:     "localhost:6379",
            Password: "", // 无密码
            DB:       0,  // 默认数据库
        },
        ReconnectConfig: reconnect.Config{
            Strategy:            reconnect.StrategyFixedInterval,
            MaxRetries:          10,
            InitialDelay:        2 * time.Second,
            HealthCheckInterval: 15 * time.Second,
            OnReconnect: func(attempt int) {
                log.Printf("Redis 重连成功,尝试次数: %d", attempt)
            },
        },
    }
    
    // 创建客户端
    client, err := reconnect.NewRedisClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // 使用 Redis
    rdb := client.GetClient()
    // ... 执行 Redis 操作
}

etcd 重连

package main

import (
    "log"
    "time"
    
    "git.whblueocean.cn/blueocean-go/reconnect"
)

func main() {
    // 配置 etcd 连接
    config := reconnect.EtcdClientConfig{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
        Username:    "",
        Password:    "",
        ReconnectConfig: reconnect.Config{
            Strategy:            reconnect.StrategyLinearBackoff,
            MaxRetries:          -1,
            InitialDelay:        1 * time.Second,
            MaxDelay:            20 * time.Second,
            LinearIncrement:     1 * time.Second,
            HealthCheckInterval: 30 * time.Second,
        },
    }
    
    // 创建客户端
    client, err := reconnect.NewEtcdClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // 使用 etcd
    etcdClient := client.GetClient()
    // ... 执行 etcd 操作
}

gRPC 重连

package main

import (
    "log"
    "time"
    
    "google.golang.org/grpc"
    "git.whblueocean.cn/blueocean-go/reconnect"
)

func main() {
    // 配置 gRPC 连接
    config := reconnect.GRPCClientConfig{
        Target: "localhost:50051",
        DialOptions: []grpc.DialOption{
            grpc.WithInsecure(),
            // 其他 gRPC 选项...
        },
        ReconnectConfig: reconnect.Config{
            Strategy:            reconnect.StrategyExponentialBackoff,
            MaxRetries:          5,
            InitialDelay:        1 * time.Second,
            MaxDelay:            60 * time.Second,
            Multiplier:          1.5,
            HealthCheckInterval: 20 * time.Second,
        },
    }
    
    // 创建客户端
    client, err := reconnect.NewGRPCClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // 使用 gRPC 连接
    conn := client.GetConn()
    // ... 创建 gRPC 客户端并调用服务
}

RabbitMQ 重连

package main

import (
    "log"
    "time"
    
    amqp "github.com/rabbitmq/amqp091-go"
    "git.whblueocean.cn/blueocean-go/reconnect"
)

func main() {
    // 配置 RabbitMQ 连接
    config := reconnect.RabbitMQClientConfig{
        URL: "amqp://guest:guest@localhost:5672/",
        ReconnectConfig: reconnect.Config{
            Strategy:            reconnect.StrategyExponentialBackoff,
            MaxRetries:          -1, // 无限重试
            InitialDelay:        1 * time.Second,
            MaxDelay:            30 * time.Second,
            Multiplier:          2.0,
            HealthCheckInterval: 15 * time.Second,
            OnStateChange: func(oldState, newState reconnect.ConnectionState) {
                log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState)
            },
            OnError: func(err error) {
                log.Printf("RabbitMQ 错误: %v", err)
            },
        },
    }
    
    // 创建客户端
    client, err := reconnect.NewRabbitMQClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // 声明队列
    queue, err := client.QueueDeclare(
        "hello", // 队列名
        true,    // 持久化
        false,   // 自动删除
        false,   // 独占
        false,   // 不等待
        nil,     // 参数
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 发布消息
    err = client.Publish(
        "",     // 交换机
        queue.Name, // 路由键
        false,  // 强制
        false,  // 立即
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("Hello World!"),
        },
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 消费消息
    msgs, err := client.Consume(
        queue.Name, // 队列
        "",         // 消费者
        true,       // 自动确认
        false,      // 独占
        false,      // 不等待
        false,      // 参数
        nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 处理消息
    for msg := range msgs {
        log.Printf("收到消息: %s", msg.Body)
    }
}

重连策略

指数退避策略 (Exponential Backoff)

默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。

config := reconnect.Config{
    Strategy:     reconnect.StrategyExponentialBackoff,
    InitialDelay: 1 * time.Second,
    MaxDelay:     30 * time.Second,
    Multiplier:   2.0, // 每次延迟翻倍
}

固定间隔策略 (Fixed Interval)

每次重试使用固定的时间间隔。

config := reconnect.Config{
    Strategy:     reconnect.StrategyFixedInterval,
    InitialDelay: 5 * time.Second, // 固定 5 秒间隔
}

线性退避策略 (Linear Backoff)

每次重试间隔线性增长。

config := reconnect.Config{
    Strategy:        reconnect.StrategyLinearBackoff,
    InitialDelay:    1 * time.Second,
    MaxDelay:        30 * time.Second,
    LinearIncrement: 2 * time.Second, // 每次增加 2 秒
}

配置选项

选项 类型 默认值 说明
Strategy StrategyType ExponentialBackoff 重连策略类型
MaxRetries int -1 最大重试次数,-1 表示无限重试
InitialDelay time.Duration 1s 初始延迟时间
MaxDelay time.Duration 30s 最大延迟时间
Multiplier float64 2.0 指数退避乘数
LinearIncrement time.Duration 1s 线性退避增量
HealthCheckInterval time.Duration 10s 健康检查间隔
HealthCheckTimeout time.Duration 5s 健康检查超时时间
OnStateChange func(ConnectionState, ConnectionState) nil 状态变化回调
OnReconnect func(int) nil 重连成功回调
OnError func(error) nil 错误回调

连接状态

  • StateDisconnected: 断开连接
  • StateConnecting: 正在连接
  • StateConnected: 已连接
  • StateReconnecting: 正在重连

高级用法

自定义连接器

如果需要支持其他类型的连接,可以实现 ConnectorHealthChecker 接口:

type CustomConnector struct {
    // 自定义连接逻辑
}

func (c *CustomConnector) Connect(ctx context.Context) error {
    // 实现连接逻辑
    return nil
}

func (c *CustomConnector) Close() error {
    // 实现关闭逻辑
    return nil
}

type CustomHealthChecker struct {
    // 自定义健康检查逻辑
}

func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error {
    // 实现健康检查逻辑
    return nil
}

// 使用自定义连接器
connector := &CustomConnector{}
checker := &CustomHealthChecker{}
manager := reconnect.NewConnectionManager(connector, checker, config)

手动触发重连

// 手动触发重连
client.TriggerReconnect()

// 获取当前状态
state := client.State()
fmt.Printf("当前状态: %s\n", state)

最佳实践

  1. 合理设置重试次数: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限
  2. 配置适当的健康检查间隔: 太频繁会影响性能,太稀疏会延迟发现问题
  3. 使用回调函数监控状态: 通过 OnStateChangeOnReconnectOnError 回调监控连接状态
  4. 设置合适的超时时间: 避免长时间阻塞
  5. 优雅关闭: 在应用关闭时调用 Close() 方法释放资源

依赖

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request

更新日志

v1.1.0

  • 新增 RabbitMQ 重连支持
  • 添加 RabbitMQ 客户端封装和健康检查
  • 完善文档和使用示例

v1.0.0

  • 初始版本发布
  • 支持数据库、Redis、etcd、gRPC 重连
  • 实现三种重连策略
Description
No description provided
Readme 91 KiB
Languages
Go 100%