Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c0dd21b0f0 | |||
| 1df38ff4bc | |||
| 4a1f66dd34 |
447
README.md
Normal file
447
README.md
Normal file
@@ -0,0 +1,447 @@
|
|||||||
|
# Reconnect - Go 连接重连库
|
||||||
|
|
||||||
|
[](https://golang.org)
|
||||||
|
[](LICENSE)
|
||||||
|
|
||||||
|
一个用于 Go 语言的智能连接重连库,支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。
|
||||||
|
|
||||||
|
## 特性
|
||||||
|
|
||||||
|
- 🔄 **智能重连**: 支持多种重连策略(指数退避、固定间隔、线性退避)
|
||||||
|
- 💗 **健康检查**: 定期检查连接状态,自动发现问题并重连
|
||||||
|
- 🔌 **多服务支持**: 内置支持数据库(GORM)、Redis、etcd、gRPC
|
||||||
|
- ⚙️ **高度可配置**: 灵活的配置选项,满足不同场景需求
|
||||||
|
- 📊 **状态监控**: 实时连接状态监控和回调通知
|
||||||
|
- 🛡️ **线程安全**: 完全的并发安全设计
|
||||||
|
|
||||||
|
## 支持的服务
|
||||||
|
|
||||||
|
- **数据库**: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等
|
||||||
|
- **Redis**: 基于 go-redis/v8 客户端
|
||||||
|
- **etcd**: 基于 etcd client/v3
|
||||||
|
- **gRPC**: 基于 google.golang.org/grpc
|
||||||
|
|
||||||
|
## 安装
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get git.whblueocean.cn/blueocean-go/reconnect
|
||||||
|
```
|
||||||
|
|
||||||
|
## 快速开始
|
||||||
|
|
||||||
|
### 数据库重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gorm.io/driver/postgres"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"git.whblueocean.cn/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 配置数据库连接
|
||||||
|
config := reconnect.DatabaseConfig{
|
||||||
|
Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"),
|
||||||
|
GormConfig: &gorm.Config{},
|
||||||
|
MaxIdleConns: 10,
|
||||||
|
MaxOpenConns: 100,
|
||||||
|
ConnMaxLifetime: time.Hour,
|
||||||
|
ReconnectConfig: reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyExponentialBackoff,
|
||||||
|
MaxRetries: -1, // 无限重试
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 30 * time.Second,
|
||||||
|
Multiplier: 2.0,
|
||||||
|
HealthCheckInterval: 10 * time.Second,
|
||||||
|
HealthCheckTimeout: 5 * time.Second,
|
||||||
|
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
|
||||||
|
log.Printf("数据库状态变化: %s -> %s", oldState, newState)
|
||||||
|
},
|
||||||
|
OnError: func(err error) {
|
||||||
|
log.Printf("数据库错误: %v", err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建客户端
|
||||||
|
client, err := reconnect.NewDatabaseClient(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 使用数据库
|
||||||
|
db := client.GetDB()
|
||||||
|
// ... 执行数据库操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Redis 重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"git.whblueocean.cn/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 配置 Redis 连接
|
||||||
|
config := reconnect.RedisClientConfig{
|
||||||
|
Options: &redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Password: "", // 无密码
|
||||||
|
DB: 0, // 默认数据库
|
||||||
|
},
|
||||||
|
ReconnectConfig: reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyFixedInterval,
|
||||||
|
MaxRetries: 10,
|
||||||
|
InitialDelay: 2 * time.Second,
|
||||||
|
HealthCheckInterval: 15 * time.Second,
|
||||||
|
OnReconnect: func(attempt int) {
|
||||||
|
log.Printf("Redis 重连成功,尝试次数: %d", attempt)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建客户端
|
||||||
|
client, err := reconnect.NewRedisClient(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 使用 Redis
|
||||||
|
rdb := client.GetClient()
|
||||||
|
// ... 执行 Redis 操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### etcd 重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.whblueocean.cn/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 配置 etcd 连接
|
||||||
|
config := reconnect.EtcdClientConfig{
|
||||||
|
Endpoints: []string{"localhost:2379"},
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
Username: "",
|
||||||
|
Password: "",
|
||||||
|
ReconnectConfig: reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyLinearBackoff,
|
||||||
|
MaxRetries: -1,
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 20 * time.Second,
|
||||||
|
LinearIncrement: 1 * time.Second,
|
||||||
|
HealthCheckInterval: 30 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建客户端
|
||||||
|
client, err := reconnect.NewEtcdClient(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 使用 etcd
|
||||||
|
etcdClient := client.GetClient()
|
||||||
|
// ... 执行 etcd 操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### gRPC 重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"git.whblueocean.cn/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 配置 gRPC 连接
|
||||||
|
config := reconnect.GRPCClientConfig{
|
||||||
|
Target: "localhost:50051",
|
||||||
|
DialOptions: []grpc.DialOption{
|
||||||
|
grpc.WithInsecure(),
|
||||||
|
// 其他 gRPC 选项...
|
||||||
|
},
|
||||||
|
ReconnectConfig: reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyExponentialBackoff,
|
||||||
|
MaxRetries: 5,
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 60 * time.Second,
|
||||||
|
Multiplier: 1.5,
|
||||||
|
HealthCheckInterval: 20 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建客户端
|
||||||
|
client, err := reconnect.NewGRPCClient(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 使用 gRPC 连接
|
||||||
|
conn := client.GetConn()
|
||||||
|
// ... 创建 gRPC 客户端并调用服务
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### RabbitMQ 重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
"git.whblueocean.cn/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 配置 RabbitMQ 连接
|
||||||
|
config := reconnect.RabbitMQClientConfig{
|
||||||
|
URL: "amqp://guest:guest@localhost:5672/",
|
||||||
|
ReconnectConfig: reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyExponentialBackoff,
|
||||||
|
MaxRetries: -1, // 无限重试
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 30 * time.Second,
|
||||||
|
Multiplier: 2.0,
|
||||||
|
HealthCheckInterval: 15 * time.Second,
|
||||||
|
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
|
||||||
|
log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState)
|
||||||
|
},
|
||||||
|
OnError: func(err error) {
|
||||||
|
log.Printf("RabbitMQ 错误: %v", err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建客户端
|
||||||
|
client, err := reconnect.NewRabbitMQClient(config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// 声明队列
|
||||||
|
queue, err := client.QueueDeclare(
|
||||||
|
"hello", // 队列名
|
||||||
|
true, // 持久化
|
||||||
|
false, // 自动删除
|
||||||
|
false, // 独占
|
||||||
|
false, // 不等待
|
||||||
|
nil, // 参数
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发布消息
|
||||||
|
err = client.Publish(
|
||||||
|
"", // 交换机
|
||||||
|
queue.Name, // 路由键
|
||||||
|
false, // 强制
|
||||||
|
false, // 立即
|
||||||
|
amqp.Publishing{
|
||||||
|
ContentType: "text/plain",
|
||||||
|
Body: []byte("Hello World!"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 消费消息
|
||||||
|
msgs, err := client.Consume(
|
||||||
|
queue.Name, // 队列
|
||||||
|
"", // 消费者
|
||||||
|
true, // 自动确认
|
||||||
|
false, // 独占
|
||||||
|
false, // 不等待
|
||||||
|
false, // 参数
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理消息
|
||||||
|
for msg := range msgs {
|
||||||
|
log.Printf("收到消息: %s", msg.Body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 重连策略
|
||||||
|
|
||||||
|
### 指数退避策略 (Exponential Backoff)
|
||||||
|
|
||||||
|
默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。
|
||||||
|
|
||||||
|
```go
|
||||||
|
config := reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyExponentialBackoff,
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 30 * time.Second,
|
||||||
|
Multiplier: 2.0, // 每次延迟翻倍
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 固定间隔策略 (Fixed Interval)
|
||||||
|
|
||||||
|
每次重试使用固定的时间间隔。
|
||||||
|
|
||||||
|
```go
|
||||||
|
config := reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyFixedInterval,
|
||||||
|
InitialDelay: 5 * time.Second, // 固定 5 秒间隔
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### 线性退避策略 (Linear Backoff)
|
||||||
|
|
||||||
|
每次重试间隔线性增长。
|
||||||
|
|
||||||
|
```go
|
||||||
|
config := reconnect.Config{
|
||||||
|
Strategy: reconnect.StrategyLinearBackoff,
|
||||||
|
InitialDelay: 1 * time.Second,
|
||||||
|
MaxDelay: 30 * time.Second,
|
||||||
|
LinearIncrement: 2 * time.Second, // 每次增加 2 秒
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 配置选项
|
||||||
|
|
||||||
|
| 选项 | 类型 | 默认值 | 说明 |
|
||||||
|
|------|------|--------|------|
|
||||||
|
| `Strategy` | `StrategyType` | `ExponentialBackoff` | 重连策略类型 |
|
||||||
|
| `MaxRetries` | `int` | `-1` | 最大重试次数,-1 表示无限重试 |
|
||||||
|
| `InitialDelay` | `time.Duration` | `1s` | 初始延迟时间 |
|
||||||
|
| `MaxDelay` | `time.Duration` | `30s` | 最大延迟时间 |
|
||||||
|
| `Multiplier` | `float64` | `2.0` | 指数退避乘数 |
|
||||||
|
| `LinearIncrement` | `time.Duration` | `1s` | 线性退避增量 |
|
||||||
|
| `HealthCheckInterval` | `time.Duration` | `10s` | 健康检查间隔 |
|
||||||
|
| `HealthCheckTimeout` | `time.Duration` | `5s` | 健康检查超时时间 |
|
||||||
|
| `OnStateChange` | `func(ConnectionState, ConnectionState)` | `nil` | 状态变化回调 |
|
||||||
|
| `OnReconnect` | `func(int)` | `nil` | 重连成功回调 |
|
||||||
|
| `OnError` | `func(error)` | `nil` | 错误回调 |
|
||||||
|
|
||||||
|
## 连接状态
|
||||||
|
|
||||||
|
- `StateDisconnected`: 断开连接
|
||||||
|
- `StateConnecting`: 正在连接
|
||||||
|
- `StateConnected`: 已连接
|
||||||
|
- `StateReconnecting`: 正在重连
|
||||||
|
|
||||||
|
## 高级用法
|
||||||
|
|
||||||
|
### 自定义连接器
|
||||||
|
|
||||||
|
如果需要支持其他类型的连接,可以实现 `Connector` 和 `HealthChecker` 接口:
|
||||||
|
|
||||||
|
```go
|
||||||
|
type CustomConnector struct {
|
||||||
|
// 自定义连接逻辑
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CustomConnector) Connect(ctx context.Context) error {
|
||||||
|
// 实现连接逻辑
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CustomConnector) Close() error {
|
||||||
|
// 实现关闭逻辑
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type CustomHealthChecker struct {
|
||||||
|
// 自定义健康检查逻辑
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error {
|
||||||
|
// 实现健康检查逻辑
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用自定义连接器
|
||||||
|
connector := &CustomConnector{}
|
||||||
|
checker := &CustomHealthChecker{}
|
||||||
|
manager := reconnect.NewConnectionManager(connector, checker, config)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 手动触发重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
// 手动触发重连
|
||||||
|
client.TriggerReconnect()
|
||||||
|
|
||||||
|
// 获取当前状态
|
||||||
|
state := client.State()
|
||||||
|
fmt.Printf("当前状态: %s\n", state)
|
||||||
|
```
|
||||||
|
|
||||||
|
## 最佳实践
|
||||||
|
|
||||||
|
1. **合理设置重试次数**: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限
|
||||||
|
2. **配置适当的健康检查间隔**: 太频繁会影响性能,太稀疏会延迟发现问题
|
||||||
|
3. **使用回调函数监控状态**: 通过 `OnStateChange`、`OnReconnect`、`OnError` 回调监控连接状态
|
||||||
|
4. **设置合适的超时时间**: 避免长时间阻塞
|
||||||
|
5. **优雅关闭**: 在应用关闭时调用 `Close()` 方法释放资源
|
||||||
|
|
||||||
|
## 依赖
|
||||||
|
|
||||||
|
- [go-redis/v8](https://github.com/go-redis/redis) - Redis 客户端
|
||||||
|
- [etcd client/v3](https://etcd.io/) - etcd 客户端
|
||||||
|
- [google.golang.org/grpc](https://grpc.io/) - gRPC 框架
|
||||||
|
- [gorm.io/gorm](https://gorm.io/) - ORM 框架
|
||||||
|
|
||||||
|
## 许可证
|
||||||
|
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
## 贡献
|
||||||
|
|
||||||
|
欢迎提交 Issue 和 Pull Request!
|
||||||
|
|
||||||
|
## 更新日志
|
||||||
|
|
||||||
|
### v1.1.0
|
||||||
|
- 新增 RabbitMQ 重连支持
|
||||||
|
- 添加 RabbitMQ 客户端封装和健康检查
|
||||||
|
- 完善文档和使用示例
|
||||||
|
|
||||||
|
### v1.0.0
|
||||||
|
- 初始版本发布
|
||||||
|
- 支持数据库、Redis、etcd、gRPC 重连
|
||||||
|
- 实现三种重连策略
|
||||||
285
etcd_registry.go
Normal file
285
etcd_registry.go
Normal file
@@ -0,0 +1,285 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EtcdRegistryConfig etcd 服务注册配置
|
||||||
|
type EtcdRegistryConfig struct {
|
||||||
|
// Endpoints etcd 服务地址列表
|
||||||
|
Endpoints []string
|
||||||
|
// DialTimeout 连接超时时间
|
||||||
|
DialTimeout time.Duration
|
||||||
|
// ReconnectConfig 重连配置
|
||||||
|
ReconnectConfig Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// EtcdRegistry etcd 服务注册(带自动重连)
|
||||||
|
type EtcdRegistry struct {
|
||||||
|
config EtcdRegistryConfig
|
||||||
|
etcdClient *EtcdClient
|
||||||
|
cli *clientv3.Client
|
||||||
|
key string
|
||||||
|
val string
|
||||||
|
serviceName string
|
||||||
|
ttl int64
|
||||||
|
leaseID clientv3.LeaseID
|
||||||
|
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
|
||||||
|
mu sync.RWMutex
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
|
||||||
|
func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// 创建带重连的 etcd 客户端
|
||||||
|
etcdCfg := EtcdClientConfig{
|
||||||
|
Endpoints: cfg.Endpoints,
|
||||||
|
DialTimeout: cfg.DialTimeout,
|
||||||
|
ReconnectConfig: cfg.ReconnectConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
etcdClient, err := NewEtcdClient(etcdCfg)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &EtcdRegistry{
|
||||||
|
config: cfg,
|
||||||
|
etcdClient: etcdClient,
|
||||||
|
cli: etcdClient.GetClient(),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register 注册服务
|
||||||
|
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.serviceName = serviceName
|
||||||
|
r.ttl = ttl
|
||||||
|
r.key = fmt.Sprintf("/%s/%s", serviceName, addr)
|
||||||
|
r.val = addr
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
return r.registerWithKV(ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
||||||
|
r.mu.RLock()
|
||||||
|
cli := r.cli
|
||||||
|
key := r.key
|
||||||
|
val := r.val
|
||||||
|
r.mu.RUnlock()
|
||||||
|
|
||||||
|
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
|
||||||
|
if cli == nil {
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
// 检查连接状态
|
||||||
|
state := r.etcdClient.State()
|
||||||
|
if state != StateConnected {
|
||||||
|
return fmt.Errorf("etcd client is not connected, current state: %s", state)
|
||||||
|
}
|
||||||
|
r.mu.Lock()
|
||||||
|
r.cli = r.etcdClient.GetClient()
|
||||||
|
cli = r.cli
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
if cli == nil {
|
||||||
|
return fmt.Errorf("etcd client is nil")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 即使 cli 不为 nil,也要检查连接状态,因为可能连接已断开但 cli 引用还在
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
state := r.etcdClient.State()
|
||||||
|
if state != StateConnected {
|
||||||
|
// 连接已断开,尝试更新 cli 引用
|
||||||
|
r.mu.Lock()
|
||||||
|
r.cli = r.etcdClient.GetClient()
|
||||||
|
cli = r.cli
|
||||||
|
r.mu.Unlock()
|
||||||
|
if cli == nil {
|
||||||
|
return fmt.Errorf("etcd client is not connected, current state: %s", state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用带超时的 context 来避免长时间阻塞
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resp, err := cli.Grant(ctx, ttl)
|
||||||
|
if err != nil {
|
||||||
|
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.cli = r.etcdClient.GetClient()
|
||||||
|
cli = r.cli
|
||||||
|
r.mu.Unlock()
|
||||||
|
if cli != nil {
|
||||||
|
// 使用新的 client 重试
|
||||||
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel2()
|
||||||
|
resp, err = cli.Grant(ctx2, ttl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to grant lease: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
r.leaseID = resp.ID
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel3()
|
||||||
|
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to put key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel4()
|
||||||
|
keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to start keep-alive: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
r.keepAliveCh = keepAliveCh
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
go r.watcher(ttl)
|
||||||
|
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// watcher 监听续约
|
||||||
|
func (r *EtcdRegistry) watcher(ttl int64) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
log.Println("[EtcdRegistry] context done, watcher exiting.")
|
||||||
|
return
|
||||||
|
case ka, ok := <-r.keepAliveCh:
|
||||||
|
if !ok {
|
||||||
|
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
|
||||||
|
|
||||||
|
// 等待连接恢复,而不是立即重试
|
||||||
|
maxWaitTime := 60 * time.Second
|
||||||
|
checkInterval := 500 * time.Millisecond
|
||||||
|
waited := time.Duration(0)
|
||||||
|
|
||||||
|
for waited < maxWaitTime {
|
||||||
|
select {
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查连接状态
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
state := r.etcdClient.State()
|
||||||
|
if state == StateConnected {
|
||||||
|
// 连接已恢复,更新 cli 引用
|
||||||
|
r.mu.Lock()
|
||||||
|
r.cli = r.etcdClient.GetClient()
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
// 等待一小段时间确保连接稳定
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// 尝试重新注册
|
||||||
|
if err := r.registerWithKV(ttl); err == nil {
|
||||||
|
log.Println("[EtcdRegistry] 服务重新注册成功")
|
||||||
|
return // 退出当前 goroutine,让新的 watcher 启动
|
||||||
|
} else {
|
||||||
|
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(checkInterval)
|
||||||
|
waited += checkInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果等待超时,使用指数退避重试
|
||||||
|
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
|
||||||
|
delay := 2 * time.Second
|
||||||
|
maxDelay := 30 * time.Second
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// 再次检查连接状态
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
state := r.etcdClient.State()
|
||||||
|
if state == StateConnected {
|
||||||
|
r.mu.Lock()
|
||||||
|
r.cli = r.etcdClient.GetClient()
|
||||||
|
r.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.registerWithKV(ttl); err != nil {
|
||||||
|
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
|
||||||
|
time.Sleep(delay)
|
||||||
|
delay *= 2
|
||||||
|
if delay > maxDelay {
|
||||||
|
delay = maxDelay
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Println("[EtcdRegistry] 服务重新注册成功")
|
||||||
|
return // 退出当前 goroutine,让新的 watcher 启动
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 续约成功
|
||||||
|
_ = ka
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnRegister 注销服务
|
||||||
|
func (r *EtcdRegistry) UnRegister() {
|
||||||
|
r.cancel() // 停止 watcher
|
||||||
|
|
||||||
|
r.mu.RLock()
|
||||||
|
cli := r.cli
|
||||||
|
leaseID := r.leaseID
|
||||||
|
key := r.key
|
||||||
|
r.mu.RUnlock()
|
||||||
|
|
||||||
|
if cli != nil {
|
||||||
|
cli.Revoke(context.Background(), leaseID)
|
||||||
|
cli.Delete(context.Background(), key)
|
||||||
|
log.Printf("[EtcdRegistry] 服务注销成功: %s", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
r.etcdClient.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// State 获取连接状态
|
||||||
|
func (r *EtcdRegistry) State() ConnectionState {
|
||||||
|
if r.etcdClient != nil {
|
||||||
|
return r.etcdClient.State()
|
||||||
|
}
|
||||||
|
return StateDisconnected
|
||||||
|
}
|
||||||
2
go.mod
2
go.mod
@@ -4,6 +4,7 @@ go 1.24.0
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/go-redis/redis/v8 v8.11.5
|
github.com/go-redis/redis/v8 v8.11.5
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0
|
||||||
go.etcd.io/etcd/client/v3 v3.6.5
|
go.etcd.io/etcd/client/v3 v3.6.5
|
||||||
google.golang.org/grpc v1.75.1
|
google.golang.org/grpc v1.75.1
|
||||||
gorm.io/gorm v1.31.1
|
gorm.io/gorm v1.31.1
|
||||||
@@ -19,7 +20,6 @@ require (
|
|||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
|
|
||||||
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
|
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
|
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
|
|||||||
274
grpc_etcd.go
Normal file
274
grpc_etcd.go
Normal file
@@ -0,0 +1,274 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GRPCEtcdClientConfig 使用 etcd 服务发现的 gRPC 客户端配置
|
||||||
|
type GRPCEtcdClientConfig struct {
|
||||||
|
// EtcdEndpoints etcd 服务地址列表
|
||||||
|
EtcdEndpoints []string
|
||||||
|
// EtcdDialTimeout etcd 连接超时时间
|
||||||
|
EtcdDialTimeout time.Duration
|
||||||
|
// ServiceName 服务名称(在 etcd 中注册的 key 前缀)
|
||||||
|
ServiceName string
|
||||||
|
// DialOptions 额外的 gRPC 拨号选项
|
||||||
|
DialOptions []grpc.DialOption
|
||||||
|
// ReconnectConfig 重连配置
|
||||||
|
ReconnectConfig Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// GRPCEtcdClient 带 etcd 服务发现和重连功能的 gRPC 客户端
|
||||||
|
type GRPCEtcdClient struct {
|
||||||
|
config GRPCEtcdClientConfig
|
||||||
|
etcdClient *clientv3.Client
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
manager *ConnectionManager
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// grpcEtcdConnector gRPC + etcd 连接器
|
||||||
|
type grpcEtcdConnector struct {
|
||||||
|
client *GRPCEtcdClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEtcdConnector) Connect(ctx context.Context) error {
|
||||||
|
g.client.mu.Lock()
|
||||||
|
defer g.client.mu.Unlock()
|
||||||
|
|
||||||
|
cfg := g.client.config
|
||||||
|
|
||||||
|
// 创建 etcd 客户端
|
||||||
|
etcdTimeout := cfg.EtcdDialTimeout
|
||||||
|
if etcdTimeout == 0 {
|
||||||
|
etcdTimeout = 5 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
|
Endpoints: cfg.EtcdEndpoints,
|
||||||
|
DialTimeout: etcdTimeout,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 注册 etcd resolver
|
||||||
|
etcdResolver := &etcdResolverBuilder{cli: cli}
|
||||||
|
resolver.Register(etcdResolver)
|
||||||
|
|
||||||
|
target := "etcd:///" + cfg.ServiceName + "/"
|
||||||
|
|
||||||
|
// 默认 DialOptions
|
||||||
|
opts := []grpc.DialOption{
|
||||||
|
grpc.WithResolvers(etcdResolver),
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
|
||||||
|
}
|
||||||
|
// 追加用户自定义选项
|
||||||
|
opts = append(opts, cfg.DialOptions...)
|
||||||
|
|
||||||
|
conn, err := grpc.NewClient(target, opts...)
|
||||||
|
if err != nil {
|
||||||
|
cli.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 等待连接就绪
|
||||||
|
connCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn.Connect()
|
||||||
|
for {
|
||||||
|
state := conn.GetState()
|
||||||
|
if state == connectivity.Ready {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
|
||||||
|
conn.Close()
|
||||||
|
cli.Close()
|
||||||
|
return context.DeadlineExceeded
|
||||||
|
}
|
||||||
|
if !conn.WaitForStateChange(connCtx, state) {
|
||||||
|
conn.Close()
|
||||||
|
cli.Close()
|
||||||
|
return connCtx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g.client.etcdClient = cli
|
||||||
|
g.client.conn = conn
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEtcdConnector) Close() error {
|
||||||
|
g.client.mu.Lock()
|
||||||
|
defer g.client.mu.Unlock()
|
||||||
|
|
||||||
|
if g.client.conn != nil {
|
||||||
|
g.client.conn.Close()
|
||||||
|
g.client.conn = nil
|
||||||
|
}
|
||||||
|
if g.client.etcdClient != nil {
|
||||||
|
g.client.etcdClient.Close()
|
||||||
|
g.client.etcdClient = nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// grpcEtcdHealthChecker gRPC + etcd 健康检查器
|
||||||
|
type grpcEtcdHealthChecker struct {
|
||||||
|
client *GRPCEtcdClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *grpcEtcdHealthChecker) HealthCheck(ctx context.Context) error {
|
||||||
|
g.client.mu.RLock()
|
||||||
|
conn := g.client.conn
|
||||||
|
g.client.mu.RUnlock()
|
||||||
|
|
||||||
|
if conn == nil {
|
||||||
|
return context.Canceled
|
||||||
|
}
|
||||||
|
|
||||||
|
state := conn.GetState()
|
||||||
|
if state == connectivity.Ready || state == connectivity.Idle {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.Connect()
|
||||||
|
if !conn.WaitForStateChange(ctx, state) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
newState := conn.GetState()
|
||||||
|
if newState != connectivity.Ready && newState != connectivity.Idle {
|
||||||
|
return context.DeadlineExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGRPCClientWithEtcd 创建带 etcd 服务发现和重连功能的 gRPC 客户端
|
||||||
|
func NewGRPCClientWithEtcd(cfg GRPCEtcdClientConfig) (*GRPCEtcdClient, error) {
|
||||||
|
client := &GRPCEtcdClient{
|
||||||
|
config: cfg,
|
||||||
|
}
|
||||||
|
|
||||||
|
connector := &grpcEtcdConnector{client: client}
|
||||||
|
checker := &grpcEtcdHealthChecker{client: client}
|
||||||
|
|
||||||
|
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
||||||
|
|
||||||
|
// 首次连接
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConn 获取 gRPC 连接
|
||||||
|
func (c *GRPCEtcdClient) GetConn() *grpc.ClientConn {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.conn
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEtcdClient 获取 etcd 客户端
|
||||||
|
func (c *GRPCEtcdClient) GetEtcdClient() *clientv3.Client {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.etcdClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// State 获取连接状态
|
||||||
|
func (c *GRPCEtcdClient) State() ConnectionState {
|
||||||
|
return c.manager.State()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭客户端
|
||||||
|
func (c *GRPCEtcdClient) Close() error {
|
||||||
|
return c.manager.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerReconnect 手动触发重连
|
||||||
|
func (c *GRPCEtcdClient) TriggerReconnect() {
|
||||||
|
c.manager.TriggerReconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================== etcd resolver 实现 ==================
|
||||||
|
|
||||||
|
// etcdResolverBuilder 实现 resolver.Builder
|
||||||
|
type etcdResolverBuilder struct {
|
||||||
|
cli *clientv3.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *etcdResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||||
|
r := &etcdResolver{
|
||||||
|
cli: b.cli,
|
||||||
|
cc: cc,
|
||||||
|
target: target,
|
||||||
|
}
|
||||||
|
go r.watch(target.Endpoint())
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *etcdResolverBuilder) Scheme() string {
|
||||||
|
return "etcd"
|
||||||
|
}
|
||||||
|
|
||||||
|
// etcdResolver 实现 resolver.Resolver
|
||||||
|
type etcdResolver struct {
|
||||||
|
cli *clientv3.Client
|
||||||
|
cc resolver.ClientConn
|
||||||
|
target resolver.Target
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *etcdResolver) watch(keyPrefix string) {
|
||||||
|
// 首次获取服务列表
|
||||||
|
resp, err := r.cli.Get(context.Background(), "/"+keyPrefix, clientv3.WithPrefix())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[etcdResolver] 获取服务列表失败: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
addrs := make([]resolver.Address, 0, len(resp.Kvs))
|
||||||
|
for _, kv := range resp.Kvs {
|
||||||
|
addrs = append(addrs, resolver.Address{Addr: string(kv.Value)})
|
||||||
|
}
|
||||||
|
r.cc.UpdateState(resolver.State{Addresses: addrs})
|
||||||
|
|
||||||
|
// 监听变化
|
||||||
|
watchCh := r.cli.Watch(context.Background(), "/"+keyPrefix, clientv3.WithPrefix())
|
||||||
|
for wresp := range watchCh {
|
||||||
|
for _, ev := range wresp.Events {
|
||||||
|
switch ev.Type {
|
||||||
|
case clientv3.EventTypePut:
|
||||||
|
addrs = append(addrs, resolver.Address{Addr: string(ev.Kv.Value)})
|
||||||
|
case clientv3.EventTypeDelete:
|
||||||
|
newAddrs := make([]resolver.Address, 0)
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if addr.Addr != string(ev.Kv.Value) {
|
||||||
|
newAddrs = append(newAddrs, addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addrs = newAddrs
|
||||||
|
}
|
||||||
|
r.cc.UpdateState(resolver.State{Addresses: addrs})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *etcdResolver) ResolveNow(opts resolver.ResolveNowOptions) {}
|
||||||
|
|
||||||
|
func (r *etcdResolver) Close() {}
|
||||||
|
|
||||||
@@ -223,4 +223,3 @@ func (cm *ConnectionManager) Close() error {
|
|||||||
func (cm *ConnectionManager) TriggerReconnect() {
|
func (cm *ConnectionManager) TriggerReconnect() {
|
||||||
cm.reconnect()
|
cm.reconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user