448 lines
11 KiB
Markdown
448 lines
11 KiB
Markdown
# Reconnect - Go 连接重连库
|
||
|
||
[](https://golang.org)
|
||
[](LICENSE)
|
||
|
||
一个用于 Go 语言的智能连接重连库,支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。
|
||
|
||
## 特性
|
||
|
||
- 🔄 **智能重连**: 支持多种重连策略(指数退避、固定间隔、线性退避)
|
||
- 💗 **健康检查**: 定期检查连接状态,自动发现问题并重连
|
||
- 🔌 **多服务支持**: 内置支持数据库(GORM)、Redis、etcd、gRPC
|
||
- ⚙️ **高度可配置**: 灵活的配置选项,满足不同场景需求
|
||
- 📊 **状态监控**: 实时连接状态监控和回调通知
|
||
- 🛡️ **线程安全**: 完全的并发安全设计
|
||
|
||
## 支持的服务
|
||
|
||
- **数据库**: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等
|
||
- **Redis**: 基于 go-redis/v8 客户端
|
||
- **etcd**: 基于 etcd client/v3
|
||
- **gRPC**: 基于 google.golang.org/grpc
|
||
|
||
## 安装
|
||
|
||
```bash
|
||
go get git.whblueocean.cn/blueocean-go/reconnect
|
||
```
|
||
|
||
## 快速开始
|
||
|
||
### 数据库重连
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"fmt"
|
||
"log"
|
||
"time"
|
||
|
||
"gorm.io/driver/postgres"
|
||
"gorm.io/gorm"
|
||
"git.whblueocean.cn/blueocean-go/reconnect"
|
||
)
|
||
|
||
func main() {
|
||
// 配置数据库连接
|
||
config := reconnect.DatabaseConfig{
|
||
Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"),
|
||
GormConfig: &gorm.Config{},
|
||
MaxIdleConns: 10,
|
||
MaxOpenConns: 100,
|
||
ConnMaxLifetime: time.Hour,
|
||
ReconnectConfig: reconnect.Config{
|
||
Strategy: reconnect.StrategyExponentialBackoff,
|
||
MaxRetries: -1, // 无限重试
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 30 * time.Second,
|
||
Multiplier: 2.0,
|
||
HealthCheckInterval: 10 * time.Second,
|
||
HealthCheckTimeout: 5 * time.Second,
|
||
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
|
||
log.Printf("数据库状态变化: %s -> %s", oldState, newState)
|
||
},
|
||
OnError: func(err error) {
|
||
log.Printf("数据库错误: %v", err)
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建客户端
|
||
client, err := reconnect.NewDatabaseClient(config)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer client.Close()
|
||
|
||
// 使用数据库
|
||
db := client.GetDB()
|
||
// ... 执行数据库操作
|
||
}
|
||
```
|
||
|
||
### Redis 重连
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"log"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
"git.whblueocean.cn/blueocean-go/reconnect"
|
||
)
|
||
|
||
func main() {
|
||
// 配置 Redis 连接
|
||
config := reconnect.RedisClientConfig{
|
||
Options: &redis.Options{
|
||
Addr: "localhost:6379",
|
||
Password: "", // 无密码
|
||
DB: 0, // 默认数据库
|
||
},
|
||
ReconnectConfig: reconnect.Config{
|
||
Strategy: reconnect.StrategyFixedInterval,
|
||
MaxRetries: 10,
|
||
InitialDelay: 2 * time.Second,
|
||
HealthCheckInterval: 15 * time.Second,
|
||
OnReconnect: func(attempt int) {
|
||
log.Printf("Redis 重连成功,尝试次数: %d", attempt)
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建客户端
|
||
client, err := reconnect.NewRedisClient(config)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer client.Close()
|
||
|
||
// 使用 Redis
|
||
rdb := client.GetClient()
|
||
// ... 执行 Redis 操作
|
||
}
|
||
```
|
||
|
||
### etcd 重连
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"log"
|
||
"time"
|
||
|
||
"git.whblueocean.cn/blueocean-go/reconnect"
|
||
)
|
||
|
||
func main() {
|
||
// 配置 etcd 连接
|
||
config := reconnect.EtcdClientConfig{
|
||
Endpoints: []string{"localhost:2379"},
|
||
DialTimeout: 5 * time.Second,
|
||
Username: "",
|
||
Password: "",
|
||
ReconnectConfig: reconnect.Config{
|
||
Strategy: reconnect.StrategyLinearBackoff,
|
||
MaxRetries: -1,
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 20 * time.Second,
|
||
LinearIncrement: 1 * time.Second,
|
||
HealthCheckInterval: 30 * time.Second,
|
||
},
|
||
}
|
||
|
||
// 创建客户端
|
||
client, err := reconnect.NewEtcdClient(config)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer client.Close()
|
||
|
||
// 使用 etcd
|
||
etcdClient := client.GetClient()
|
||
// ... 执行 etcd 操作
|
||
}
|
||
```
|
||
|
||
### gRPC 重连
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"log"
|
||
"time"
|
||
|
||
"google.golang.org/grpc"
|
||
"git.whblueocean.cn/blueocean-go/reconnect"
|
||
)
|
||
|
||
func main() {
|
||
// 配置 gRPC 连接
|
||
config := reconnect.GRPCClientConfig{
|
||
Target: "localhost:50051",
|
||
DialOptions: []grpc.DialOption{
|
||
grpc.WithInsecure(),
|
||
// 其他 gRPC 选项...
|
||
},
|
||
ReconnectConfig: reconnect.Config{
|
||
Strategy: reconnect.StrategyExponentialBackoff,
|
||
MaxRetries: 5,
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 60 * time.Second,
|
||
Multiplier: 1.5,
|
||
HealthCheckInterval: 20 * time.Second,
|
||
},
|
||
}
|
||
|
||
// 创建客户端
|
||
client, err := reconnect.NewGRPCClient(config)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer client.Close()
|
||
|
||
// 使用 gRPC 连接
|
||
conn := client.GetConn()
|
||
// ... 创建 gRPC 客户端并调用服务
|
||
}
|
||
```
|
||
|
||
### RabbitMQ 重连
|
||
|
||
```go
|
||
package main
|
||
|
||
import (
|
||
"log"
|
||
"time"
|
||
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
"git.whblueocean.cn/blueocean-go/reconnect"
|
||
)
|
||
|
||
func main() {
|
||
// 配置 RabbitMQ 连接
|
||
config := reconnect.RabbitMQClientConfig{
|
||
URL: "amqp://guest:guest@localhost:5672/",
|
||
ReconnectConfig: reconnect.Config{
|
||
Strategy: reconnect.StrategyExponentialBackoff,
|
||
MaxRetries: -1, // 无限重试
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 30 * time.Second,
|
||
Multiplier: 2.0,
|
||
HealthCheckInterval: 15 * time.Second,
|
||
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
|
||
log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState)
|
||
},
|
||
OnError: func(err error) {
|
||
log.Printf("RabbitMQ 错误: %v", err)
|
||
},
|
||
},
|
||
}
|
||
|
||
// 创建客户端
|
||
client, err := reconnect.NewRabbitMQClient(config)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer client.Close()
|
||
|
||
// 声明队列
|
||
queue, err := client.QueueDeclare(
|
||
"hello", // 队列名
|
||
true, // 持久化
|
||
false, // 自动删除
|
||
false, // 独占
|
||
false, // 不等待
|
||
nil, // 参数
|
||
)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
|
||
// 发布消息
|
||
err = client.Publish(
|
||
"", // 交换机
|
||
queue.Name, // 路由键
|
||
false, // 强制
|
||
false, // 立即
|
||
amqp.Publishing{
|
||
ContentType: "text/plain",
|
||
Body: []byte("Hello World!"),
|
||
},
|
||
)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
|
||
// 消费消息
|
||
msgs, err := client.Consume(
|
||
queue.Name, // 队列
|
||
"", // 消费者
|
||
true, // 自动确认
|
||
false, // 独占
|
||
false, // 不等待
|
||
false, // 参数
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
|
||
// 处理消息
|
||
for msg := range msgs {
|
||
log.Printf("收到消息: %s", msg.Body)
|
||
}
|
||
}
|
||
```
|
||
|
||
## 重连策略
|
||
|
||
### 指数退避策略 (Exponential Backoff)
|
||
|
||
默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。
|
||
|
||
```go
|
||
config := reconnect.Config{
|
||
Strategy: reconnect.StrategyExponentialBackoff,
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 30 * time.Second,
|
||
Multiplier: 2.0, // 每次延迟翻倍
|
||
}
|
||
```
|
||
|
||
### 固定间隔策略 (Fixed Interval)
|
||
|
||
每次重试使用固定的时间间隔。
|
||
|
||
```go
|
||
config := reconnect.Config{
|
||
Strategy: reconnect.StrategyFixedInterval,
|
||
InitialDelay: 5 * time.Second, // 固定 5 秒间隔
|
||
}
|
||
```
|
||
|
||
### 线性退避策略 (Linear Backoff)
|
||
|
||
每次重试间隔线性增长。
|
||
|
||
```go
|
||
config := reconnect.Config{
|
||
Strategy: reconnect.StrategyLinearBackoff,
|
||
InitialDelay: 1 * time.Second,
|
||
MaxDelay: 30 * time.Second,
|
||
LinearIncrement: 2 * time.Second, // 每次增加 2 秒
|
||
}
|
||
```
|
||
|
||
## 配置选项
|
||
|
||
| 选项 | 类型 | 默认值 | 说明 |
|
||
|------|------|--------|------|
|
||
| `Strategy` | `StrategyType` | `ExponentialBackoff` | 重连策略类型 |
|
||
| `MaxRetries` | `int` | `-1` | 最大重试次数,-1 表示无限重试 |
|
||
| `InitialDelay` | `time.Duration` | `1s` | 初始延迟时间 |
|
||
| `MaxDelay` | `time.Duration` | `30s` | 最大延迟时间 |
|
||
| `Multiplier` | `float64` | `2.0` | 指数退避乘数 |
|
||
| `LinearIncrement` | `time.Duration` | `1s` | 线性退避增量 |
|
||
| `HealthCheckInterval` | `time.Duration` | `10s` | 健康检查间隔 |
|
||
| `HealthCheckTimeout` | `time.Duration` | `5s` | 健康检查超时时间 |
|
||
| `OnStateChange` | `func(ConnectionState, ConnectionState)` | `nil` | 状态变化回调 |
|
||
| `OnReconnect` | `func(int)` | `nil` | 重连成功回调 |
|
||
| `OnError` | `func(error)` | `nil` | 错误回调 |
|
||
|
||
## 连接状态
|
||
|
||
- `StateDisconnected`: 断开连接
|
||
- `StateConnecting`: 正在连接
|
||
- `StateConnected`: 已连接
|
||
- `StateReconnecting`: 正在重连
|
||
|
||
## 高级用法
|
||
|
||
### 自定义连接器
|
||
|
||
如果需要支持其他类型的连接,可以实现 `Connector` 和 `HealthChecker` 接口:
|
||
|
||
```go
|
||
type CustomConnector struct {
|
||
// 自定义连接逻辑
|
||
}
|
||
|
||
func (c *CustomConnector) Connect(ctx context.Context) error {
|
||
// 实现连接逻辑
|
||
return nil
|
||
}
|
||
|
||
func (c *CustomConnector) Close() error {
|
||
// 实现关闭逻辑
|
||
return nil
|
||
}
|
||
|
||
type CustomHealthChecker struct {
|
||
// 自定义健康检查逻辑
|
||
}
|
||
|
||
func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error {
|
||
// 实现健康检查逻辑
|
||
return nil
|
||
}
|
||
|
||
// 使用自定义连接器
|
||
connector := &CustomConnector{}
|
||
checker := &CustomHealthChecker{}
|
||
manager := reconnect.NewConnectionManager(connector, checker, config)
|
||
```
|
||
|
||
### 手动触发重连
|
||
|
||
```go
|
||
// 手动触发重连
|
||
client.TriggerReconnect()
|
||
|
||
// 获取当前状态
|
||
state := client.State()
|
||
fmt.Printf("当前状态: %s\n", state)
|
||
```
|
||
|
||
## 最佳实践
|
||
|
||
1. **合理设置重试次数**: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限
|
||
2. **配置适当的健康检查间隔**: 太频繁会影响性能,太稀疏会延迟发现问题
|
||
3. **使用回调函数监控状态**: 通过 `OnStateChange`、`OnReconnect`、`OnError` 回调监控连接状态
|
||
4. **设置合适的超时时间**: 避免长时间阻塞
|
||
5. **优雅关闭**: 在应用关闭时调用 `Close()` 方法释放资源
|
||
|
||
## 依赖
|
||
|
||
- [go-redis/v8](https://github.com/go-redis/redis) - Redis 客户端
|
||
- [etcd client/v3](https://etcd.io/) - etcd 客户端
|
||
- [google.golang.org/grpc](https://grpc.io/) - gRPC 框架
|
||
- [gorm.io/gorm](https://gorm.io/) - ORM 框架
|
||
|
||
## 许可证
|
||
|
||
MIT License
|
||
|
||
## 贡献
|
||
|
||
欢迎提交 Issue 和 Pull Request!
|
||
|
||
## 更新日志
|
||
|
||
### v1.1.0
|
||
- 新增 RabbitMQ 重连支持
|
||
- 添加 RabbitMQ 客户端封装和健康检查
|
||
- 完善文档和使用示例
|
||
|
||
### v1.0.0
|
||
- 初始版本发布
|
||
- 支持数据库、Redis、etcd、gRPC 重连
|
||
- 实现三种重连策略
|