# Reconnect - Go 连接重连库 [![Go Version](https://img.shields.io/badge/Go-1.24+-blue.svg)](https://golang.org) [![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE) 一个用于 Go 语言的智能连接重连库,支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。 ## 特性 - 🔄 **智能重连**: 支持多种重连策略(指数退避、固定间隔、线性退避) - 💗 **健康检查**: 定期检查连接状态,自动发现问题并重连 - 🔌 **多服务支持**: 内置支持数据库(GORM)、Redis、etcd、gRPC - ⚙️ **高度可配置**: 灵活的配置选项,满足不同场景需求 - 📊 **状态监控**: 实时连接状态监控和回调通知 - 🛡️ **线程安全**: 完全的并发安全设计 ## 支持的服务 - **数据库**: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等 - **Redis**: 基于 go-redis/v8 客户端 - **etcd**: 基于 etcd client/v3 - **gRPC**: 基于 google.golang.org/grpc ## 安装 ```bash go get git.whblueocean.cn/blueocean-go/reconnect ``` ## 快速开始 ### 数据库重连 ```go package main import ( "fmt" "log" "time" "gorm.io/driver/postgres" "gorm.io/gorm" "git.whblueocean.cn/blueocean-go/reconnect" ) func main() { // 配置数据库连接 config := reconnect.DatabaseConfig{ Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"), GormConfig: &gorm.Config{}, MaxIdleConns: 10, MaxOpenConns: 100, ConnMaxLifetime: time.Hour, ReconnectConfig: reconnect.Config{ Strategy: reconnect.StrategyExponentialBackoff, MaxRetries: -1, // 无限重试 InitialDelay: 1 * time.Second, MaxDelay: 30 * time.Second, Multiplier: 2.0, HealthCheckInterval: 10 * time.Second, HealthCheckTimeout: 5 * time.Second, OnStateChange: func(oldState, newState reconnect.ConnectionState) { log.Printf("数据库状态变化: %s -> %s", oldState, newState) }, OnError: func(err error) { log.Printf("数据库错误: %v", err) }, }, } // 创建客户端 client, err := reconnect.NewDatabaseClient(config) if err != nil { log.Fatal(err) } defer client.Close() // 使用数据库 db := client.GetDB() // ... 执行数据库操作 } ``` ### Redis 重连 ```go package main import ( "log" "time" "github.com/go-redis/redis/v8" "git.whblueocean.cn/blueocean-go/reconnect" ) func main() { // 配置 Redis 连接 config := reconnect.RedisClientConfig{ Options: &redis.Options{ Addr: "localhost:6379", Password: "", // 无密码 DB: 0, // 默认数据库 }, ReconnectConfig: reconnect.Config{ Strategy: reconnect.StrategyFixedInterval, MaxRetries: 10, InitialDelay: 2 * time.Second, HealthCheckInterval: 15 * time.Second, OnReconnect: func(attempt int) { log.Printf("Redis 重连成功,尝试次数: %d", attempt) }, }, } // 创建客户端 client, err := reconnect.NewRedisClient(config) if err != nil { log.Fatal(err) } defer client.Close() // 使用 Redis rdb := client.GetClient() // ... 执行 Redis 操作 } ``` ### etcd 重连 ```go package main import ( "log" "time" "git.whblueocean.cn/blueocean-go/reconnect" ) func main() { // 配置 etcd 连接 config := reconnect.EtcdClientConfig{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, Username: "", Password: "", ReconnectConfig: reconnect.Config{ Strategy: reconnect.StrategyLinearBackoff, MaxRetries: -1, InitialDelay: 1 * time.Second, MaxDelay: 20 * time.Second, LinearIncrement: 1 * time.Second, HealthCheckInterval: 30 * time.Second, }, } // 创建客户端 client, err := reconnect.NewEtcdClient(config) if err != nil { log.Fatal(err) } defer client.Close() // 使用 etcd etcdClient := client.GetClient() // ... 执行 etcd 操作 } ``` ### gRPC 重连 ```go package main import ( "log" "time" "google.golang.org/grpc" "git.whblueocean.cn/blueocean-go/reconnect" ) func main() { // 配置 gRPC 连接 config := reconnect.GRPCClientConfig{ Target: "localhost:50051", DialOptions: []grpc.DialOption{ grpc.WithInsecure(), // 其他 gRPC 选项... }, ReconnectConfig: reconnect.Config{ Strategy: reconnect.StrategyExponentialBackoff, MaxRetries: 5, InitialDelay: 1 * time.Second, MaxDelay: 60 * time.Second, Multiplier: 1.5, HealthCheckInterval: 20 * time.Second, }, } // 创建客户端 client, err := reconnect.NewGRPCClient(config) if err != nil { log.Fatal(err) } defer client.Close() // 使用 gRPC 连接 conn := client.GetConn() // ... 创建 gRPC 客户端并调用服务 } ``` ### RabbitMQ 重连 ```go package main import ( "log" "time" amqp "github.com/rabbitmq/amqp091-go" "git.whblueocean.cn/blueocean-go/reconnect" ) func main() { // 配置 RabbitMQ 连接 config := reconnect.RabbitMQClientConfig{ URL: "amqp://guest:guest@localhost:5672/", ReconnectConfig: reconnect.Config{ Strategy: reconnect.StrategyExponentialBackoff, MaxRetries: -1, // 无限重试 InitialDelay: 1 * time.Second, MaxDelay: 30 * time.Second, Multiplier: 2.0, HealthCheckInterval: 15 * time.Second, OnStateChange: func(oldState, newState reconnect.ConnectionState) { log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState) }, OnError: func(err error) { log.Printf("RabbitMQ 错误: %v", err) }, }, } // 创建客户端 client, err := reconnect.NewRabbitMQClient(config) if err != nil { log.Fatal(err) } defer client.Close() // 声明队列 queue, err := client.QueueDeclare( "hello", // 队列名 true, // 持久化 false, // 自动删除 false, // 独占 false, // 不等待 nil, // 参数 ) if err != nil { log.Fatal(err) } // 发布消息 err = client.Publish( "", // 交换机 queue.Name, // 路由键 false, // 强制 false, // 立即 amqp.Publishing{ ContentType: "text/plain", Body: []byte("Hello World!"), }, ) if err != nil { log.Fatal(err) } // 消费消息 msgs, err := client.Consume( queue.Name, // 队列 "", // 消费者 true, // 自动确认 false, // 独占 false, // 不等待 false, // 参数 nil, ) if err != nil { log.Fatal(err) } // 处理消息 for msg := range msgs { log.Printf("收到消息: %s", msg.Body) } } ``` ## 重连策略 ### 指数退避策略 (Exponential Backoff) 默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。 ```go config := reconnect.Config{ Strategy: reconnect.StrategyExponentialBackoff, InitialDelay: 1 * time.Second, MaxDelay: 30 * time.Second, Multiplier: 2.0, // 每次延迟翻倍 } ``` ### 固定间隔策略 (Fixed Interval) 每次重试使用固定的时间间隔。 ```go config := reconnect.Config{ Strategy: reconnect.StrategyFixedInterval, InitialDelay: 5 * time.Second, // 固定 5 秒间隔 } ``` ### 线性退避策略 (Linear Backoff) 每次重试间隔线性增长。 ```go config := reconnect.Config{ Strategy: reconnect.StrategyLinearBackoff, InitialDelay: 1 * time.Second, MaxDelay: 30 * time.Second, LinearIncrement: 2 * time.Second, // 每次增加 2 秒 } ``` ## 配置选项 | 选项 | 类型 | 默认值 | 说明 | |------|------|--------|------| | `Strategy` | `StrategyType` | `ExponentialBackoff` | 重连策略类型 | | `MaxRetries` | `int` | `-1` | 最大重试次数,-1 表示无限重试 | | `InitialDelay` | `time.Duration` | `1s` | 初始延迟时间 | | `MaxDelay` | `time.Duration` | `30s` | 最大延迟时间 | | `Multiplier` | `float64` | `2.0` | 指数退避乘数 | | `LinearIncrement` | `time.Duration` | `1s` | 线性退避增量 | | `HealthCheckInterval` | `time.Duration` | `10s` | 健康检查间隔 | | `HealthCheckTimeout` | `time.Duration` | `5s` | 健康检查超时时间 | | `OnStateChange` | `func(ConnectionState, ConnectionState)` | `nil` | 状态变化回调 | | `OnReconnect` | `func(int)` | `nil` | 重连成功回调 | | `OnError` | `func(error)` | `nil` | 错误回调 | ## 连接状态 - `StateDisconnected`: 断开连接 - `StateConnecting`: 正在连接 - `StateConnected`: 已连接 - `StateReconnecting`: 正在重连 ## 高级用法 ### 自定义连接器 如果需要支持其他类型的连接,可以实现 `Connector` 和 `HealthChecker` 接口: ```go type CustomConnector struct { // 自定义连接逻辑 } func (c *CustomConnector) Connect(ctx context.Context) error { // 实现连接逻辑 return nil } func (c *CustomConnector) Close() error { // 实现关闭逻辑 return nil } type CustomHealthChecker struct { // 自定义健康检查逻辑 } func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error { // 实现健康检查逻辑 return nil } // 使用自定义连接器 connector := &CustomConnector{} checker := &CustomHealthChecker{} manager := reconnect.NewConnectionManager(connector, checker, config) ``` ### 手动触发重连 ```go // 手动触发重连 client.TriggerReconnect() // 获取当前状态 state := client.State() fmt.Printf("当前状态: %s\n", state) ``` ## 最佳实践 1. **合理设置重试次数**: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限 2. **配置适当的健康检查间隔**: 太频繁会影响性能,太稀疏会延迟发现问题 3. **使用回调函数监控状态**: 通过 `OnStateChange`、`OnReconnect`、`OnError` 回调监控连接状态 4. **设置合适的超时时间**: 避免长时间阻塞 5. **优雅关闭**: 在应用关闭时调用 `Close()` 方法释放资源 ## 依赖 - [go-redis/v8](https://github.com/go-redis/redis) - Redis 客户端 - [etcd client/v3](https://etcd.io/) - etcd 客户端 - [google.golang.org/grpc](https://grpc.io/) - gRPC 框架 - [gorm.io/gorm](https://gorm.io/) - ORM 框架 ## 许可证 MIT License ## 贡献 欢迎提交 Issue 和 Pull Request! ## 更新日志 ### v1.1.0 - 新增 RabbitMQ 重连支持 - 添加 RabbitMQ 客户端封装和健康检查 - 完善文档和使用示例 ### v1.0.0 - 初始版本发布 - 支持数据库、Redis、etcd、gRPC 重连 - 实现三种重连策略