11 KiB
11 KiB
Reconnect - Go 连接重连库
一个用于 Go 语言的智能连接重连库,支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。
特性
- 🔄 智能重连: 支持多种重连策略(指数退避、固定间隔、线性退避)
- 💗 健康检查: 定期检查连接状态,自动发现问题并重连
- 🔌 多服务支持: 内置支持数据库(GORM)、Redis、etcd、gRPC
- ⚙️ 高度可配置: 灵活的配置选项,满足不同场景需求
- 📊 状态监控: 实时连接状态监控和回调通知
- 🛡️ 线程安全: 完全的并发安全设计
支持的服务
- 数据库: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等
- Redis: 基于 go-redis/v8 客户端
- etcd: 基于 etcd client/v3
- gRPC: 基于 google.golang.org/grpc
安装
go get git.whblueocean.cn/blueocean-go/reconnect
快速开始
数据库重连
package main
import (
"fmt"
"log"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置数据库连接
config := reconnect.DatabaseConfig{
Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"),
GormConfig: &gorm.Config{},
MaxIdleConns: 10,
MaxOpenConns: 100,
ConnMaxLifetime: time.Hour,
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: -1, // 无限重试
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
HealthCheckInterval: 10 * time.Second,
HealthCheckTimeout: 5 * time.Second,
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
log.Printf("数据库状态变化: %s -> %s", oldState, newState)
},
OnError: func(err error) {
log.Printf("数据库错误: %v", err)
},
},
}
// 创建客户端
client, err := reconnect.NewDatabaseClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用数据库
db := client.GetDB()
// ... 执行数据库操作
}
Redis 重连
package main
import (
"log"
"time"
"github.com/go-redis/redis/v8"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 Redis 连接
config := reconnect.RedisClientConfig{
Options: &redis.Options{
Addr: "localhost:6379",
Password: "", // 无密码
DB: 0, // 默认数据库
},
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyFixedInterval,
MaxRetries: 10,
InitialDelay: 2 * time.Second,
HealthCheckInterval: 15 * time.Second,
OnReconnect: func(attempt int) {
log.Printf("Redis 重连成功,尝试次数: %d", attempt)
},
},
}
// 创建客户端
client, err := reconnect.NewRedisClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 Redis
rdb := client.GetClient()
// ... 执行 Redis 操作
}
etcd 重连
package main
import (
"log"
"time"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 etcd 连接
config := reconnect.EtcdClientConfig{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
Username: "",
Password: "",
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyLinearBackoff,
MaxRetries: -1,
InitialDelay: 1 * time.Second,
MaxDelay: 20 * time.Second,
LinearIncrement: 1 * time.Second,
HealthCheckInterval: 30 * time.Second,
},
}
// 创建客户端
client, err := reconnect.NewEtcdClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 etcd
etcdClient := client.GetClient()
// ... 执行 etcd 操作
}
gRPC 重连
package main
import (
"log"
"time"
"google.golang.org/grpc"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 gRPC 连接
config := reconnect.GRPCClientConfig{
Target: "localhost:50051",
DialOptions: []grpc.DialOption{
grpc.WithInsecure(),
// 其他 gRPC 选项...
},
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 60 * time.Second,
Multiplier: 1.5,
HealthCheckInterval: 20 * time.Second,
},
}
// 创建客户端
client, err := reconnect.NewGRPCClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 gRPC 连接
conn := client.GetConn()
// ... 创建 gRPC 客户端并调用服务
}
RabbitMQ 重连
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 RabbitMQ 连接
config := reconnect.RabbitMQClientConfig{
URL: "amqp://guest:guest@localhost:5672/",
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: -1, // 无限重试
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
HealthCheckInterval: 15 * time.Second,
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState)
},
OnError: func(err error) {
log.Printf("RabbitMQ 错误: %v", err)
},
},
}
// 创建客户端
client, err := reconnect.NewRabbitMQClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 声明队列
queue, err := client.QueueDeclare(
"hello", // 队列名
true, // 持久化
false, // 自动删除
false, // 独占
false, // 不等待
nil, // 参数
)
if err != nil {
log.Fatal(err)
}
// 发布消息
err = client.Publish(
"", // 交换机
queue.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
if err != nil {
log.Fatal(err)
}
// 消费消息
msgs, err := client.Consume(
queue.Name, // 队列
"", // 消费者
true, // 自动确认
false, // 独占
false, // 不等待
false, // 参数
nil,
)
if err != nil {
log.Fatal(err)
}
// 处理消息
for msg := range msgs {
log.Printf("收到消息: %s", msg.Body)
}
}
重连策略
指数退避策略 (Exponential Backoff)
默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。
config := reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0, // 每次延迟翻倍
}
固定间隔策略 (Fixed Interval)
每次重试使用固定的时间间隔。
config := reconnect.Config{
Strategy: reconnect.StrategyFixedInterval,
InitialDelay: 5 * time.Second, // 固定 5 秒间隔
}
线性退避策略 (Linear Backoff)
每次重试间隔线性增长。
config := reconnect.Config{
Strategy: reconnect.StrategyLinearBackoff,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
LinearIncrement: 2 * time.Second, // 每次增加 2 秒
}
配置选项
| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
Strategy |
StrategyType |
ExponentialBackoff |
重连策略类型 |
MaxRetries |
int |
-1 |
最大重试次数,-1 表示无限重试 |
InitialDelay |
time.Duration |
1s |
初始延迟时间 |
MaxDelay |
time.Duration |
30s |
最大延迟时间 |
Multiplier |
float64 |
2.0 |
指数退避乘数 |
LinearIncrement |
time.Duration |
1s |
线性退避增量 |
HealthCheckInterval |
time.Duration |
10s |
健康检查间隔 |
HealthCheckTimeout |
time.Duration |
5s |
健康检查超时时间 |
OnStateChange |
func(ConnectionState, ConnectionState) |
nil |
状态变化回调 |
OnReconnect |
func(int) |
nil |
重连成功回调 |
OnError |
func(error) |
nil |
错误回调 |
连接状态
StateDisconnected: 断开连接StateConnecting: 正在连接StateConnected: 已连接StateReconnecting: 正在重连
高级用法
自定义连接器
如果需要支持其他类型的连接,可以实现 Connector 和 HealthChecker 接口:
type CustomConnector struct {
// 自定义连接逻辑
}
func (c *CustomConnector) Connect(ctx context.Context) error {
// 实现连接逻辑
return nil
}
func (c *CustomConnector) Close() error {
// 实现关闭逻辑
return nil
}
type CustomHealthChecker struct {
// 自定义健康检查逻辑
}
func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error {
// 实现健康检查逻辑
return nil
}
// 使用自定义连接器
connector := &CustomConnector{}
checker := &CustomHealthChecker{}
manager := reconnect.NewConnectionManager(connector, checker, config)
手动触发重连
// 手动触发重连
client.TriggerReconnect()
// 获取当前状态
state := client.State()
fmt.Printf("当前状态: %s\n", state)
最佳实践
- 合理设置重试次数: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限
- 配置适当的健康检查间隔: 太频繁会影响性能,太稀疏会延迟发现问题
- 使用回调函数监控状态: 通过
OnStateChange、OnReconnect、OnError回调监控连接状态 - 设置合适的超时时间: 避免长时间阻塞
- 优雅关闭: 在应用关闭时调用
Close()方法释放资源
依赖
- go-redis/v8 - Redis 客户端
- etcd client/v3 - etcd 客户端
- google.golang.org/grpc - gRPC 框架
- gorm.io/gorm - ORM 框架
许可证
MIT License
贡献
欢迎提交 Issue 和 Pull Request!
更新日志
v1.1.0
- 新增 RabbitMQ 重连支持
- 添加 RabbitMQ 客户端封装和健康检查
- 完善文档和使用示例
v1.0.0
- 初始版本发布
- 支持数据库、Redis、etcd、gRPC 重连
- 实现三种重连策略