5 Commits

Author SHA1 Message Date
ray
c0dd21b0f0 feat(etcd): 增强 etcd 客户端连接管理和重试机制
- 添加连接状态检查,确保在使用 etcd 客户端前验证其连接状态
- 实现超时控制,避免长时间阻塞在 etcd 操作中
- 优化重试逻辑,增加连接恢复后的重试机制,确保服务注册的可靠性
- 改进日志输出,提供更清晰的状态信息和错误处理
2025-12-18 22:47:10 +08:00
ray
1df38ff4bc feat(etcd): 添加 etcd 服务注册和 gRPC 客户端支持
- 新增 `etcd_registry.go` 文件,实现带自动重连的 etcd 服务注册功能
- 新增 `grpc_etcd.go` 文件,提供基于 etcd 的 gRPC 客户端,支持服务发现和重连机制
- 更新 `go.mod` 文件,添加 `github.com/rabbitmq/amqp091-go` 依赖
- 实现了服务注册、注销、续约及健康检查等功能,增强了连接管理能力
2025-12-18 16:51:35 +08:00
ray
4a1f66dd34 add readme 2025-12-18 15:36:07 +08:00
ray
2947bf150d 📦 build(module): 更新go模块路径
- 将模块路径从 blueocean.local/reconnect 更新为 git.whblueocean.cn/blueocean-go/reconnect
- 统一模块命名规范,与新的代码仓库地址保持一致
2025-12-18 15:20:35 +08:00
ray
fd523d34ed feat(rabbitmq): 引入带重连功能的 RabbitMQ 客户端
- 新增 `rabbitmq.go` 文件,实现了一个具有自动重连和健康检查机制的 RabbitMQ 客户端
- 引入 `github.com/rabbitmq/amqp091-go` 依赖,用于 RabbitMQ 交互
- 封装了连接、通道管理、发布、消费、声明等操作,提供健壮的连接管理
- 简化应用层与 RabbitMQ 的交互,自动处理连接中断和恢复
2025-12-18 15:17:00 +08:00
7 changed files with 1215 additions and 2 deletions

447
README.md Normal file
View File

@@ -0,0 +1,447 @@
# Reconnect - Go 连接重连库
[![Go Version](https://img.shields.io/badge/Go-1.24+-blue.svg)](https://golang.org)
[![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE)
一个用于 Go 语言的智能连接重连库支持数据库、Redis、etcd、gRPC 等多种服务的自动重连和健康检查。
## 特性
- 🔄 **智能重连**: 支持多种重连策略(指数退避、固定间隔、线性退避)
- 💗 **健康检查**: 定期检查连接状态,自动发现问题并重连
- 🔌 **多服务支持**: 内置支持数据库GORM、Redis、etcd、gRPC
- ⚙️ **高度可配置**: 灵活的配置选项,满足不同场景需求
- 📊 **状态监控**: 实时连接状态监控和回调通知
- 🛡️ **线程安全**: 完全的并发安全设计
## 支持的服务
- **数据库**: 通过 GORM 支持 MySQL、PostgreSQL、SQLite 等
- **Redis**: 基于 go-redis/v8 客户端
- **etcd**: 基于 etcd client/v3
- **gRPC**: 基于 google.golang.org/grpc
## 安装
```bash
go get git.whblueocean.cn/blueocean-go/reconnect
```
## 快速开始
### 数据库重连
```go
package main
import (
"fmt"
"log"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置数据库连接
config := reconnect.DatabaseConfig{
Dialector: postgres.Open("host=localhost user=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai"),
GormConfig: &gorm.Config{},
MaxIdleConns: 10,
MaxOpenConns: 100,
ConnMaxLifetime: time.Hour,
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: -1, // 无限重试
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
HealthCheckInterval: 10 * time.Second,
HealthCheckTimeout: 5 * time.Second,
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
log.Printf("数据库状态变化: %s -> %s", oldState, newState)
},
OnError: func(err error) {
log.Printf("数据库错误: %v", err)
},
},
}
// 创建客户端
client, err := reconnect.NewDatabaseClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用数据库
db := client.GetDB()
// ... 执行数据库操作
}
```
### Redis 重连
```go
package main
import (
"log"
"time"
"github.com/go-redis/redis/v8"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 Redis 连接
config := reconnect.RedisClientConfig{
Options: &redis.Options{
Addr: "localhost:6379",
Password: "", // 无密码
DB: 0, // 默认数据库
},
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyFixedInterval,
MaxRetries: 10,
InitialDelay: 2 * time.Second,
HealthCheckInterval: 15 * time.Second,
OnReconnect: func(attempt int) {
log.Printf("Redis 重连成功,尝试次数: %d", attempt)
},
},
}
// 创建客户端
client, err := reconnect.NewRedisClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 Redis
rdb := client.GetClient()
// ... 执行 Redis 操作
}
```
### etcd 重连
```go
package main
import (
"log"
"time"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 etcd 连接
config := reconnect.EtcdClientConfig{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
Username: "",
Password: "",
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyLinearBackoff,
MaxRetries: -1,
InitialDelay: 1 * time.Second,
MaxDelay: 20 * time.Second,
LinearIncrement: 1 * time.Second,
HealthCheckInterval: 30 * time.Second,
},
}
// 创建客户端
client, err := reconnect.NewEtcdClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 etcd
etcdClient := client.GetClient()
// ... 执行 etcd 操作
}
```
### gRPC 重连
```go
package main
import (
"log"
"time"
"google.golang.org/grpc"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 gRPC 连接
config := reconnect.GRPCClientConfig{
Target: "localhost:50051",
DialOptions: []grpc.DialOption{
grpc.WithInsecure(),
// 其他 gRPC 选项...
},
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 60 * time.Second,
Multiplier: 1.5,
HealthCheckInterval: 20 * time.Second,
},
}
// 创建客户端
client, err := reconnect.NewGRPCClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 使用 gRPC 连接
conn := client.GetConn()
// ... 创建 gRPC 客户端并调用服务
}
```
### RabbitMQ 重连
```go
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"git.whblueocean.cn/blueocean-go/reconnect"
)
func main() {
// 配置 RabbitMQ 连接
config := reconnect.RabbitMQClientConfig{
URL: "amqp://guest:guest@localhost:5672/",
ReconnectConfig: reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
MaxRetries: -1, // 无限重试
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0,
HealthCheckInterval: 15 * time.Second,
OnStateChange: func(oldState, newState reconnect.ConnectionState) {
log.Printf("RabbitMQ 状态变化: %s -> %s", oldState, newState)
},
OnError: func(err error) {
log.Printf("RabbitMQ 错误: %v", err)
},
},
}
// 创建客户端
client, err := reconnect.NewRabbitMQClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 声明队列
queue, err := client.QueueDeclare(
"hello", // 队列名
true, // 持久化
false, // 自动删除
false, // 独占
false, // 不等待
nil, // 参数
)
if err != nil {
log.Fatal(err)
}
// 发布消息
err = client.Publish(
"", // 交换机
queue.Name, // 路由键
false, // 强制
false, // 立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
if err != nil {
log.Fatal(err)
}
// 消费消息
msgs, err := client.Consume(
queue.Name, // 队列
"", // 消费者
true, // 自动确认
false, // 独占
false, // 不等待
false, // 参数
nil,
)
if err != nil {
log.Fatal(err)
}
// 处理消息
for msg := range msgs {
log.Printf("收到消息: %s", msg.Body)
}
}
```
## 重连策略
### 指数退避策略 (Exponential Backoff)
默认策略,每次重试间隔按指数增长,避免对服务器造成过大压力。
```go
config := reconnect.Config{
Strategy: reconnect.StrategyExponentialBackoff,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
Multiplier: 2.0, // 每次延迟翻倍
}
```
### 固定间隔策略 (Fixed Interval)
每次重试使用固定的时间间隔。
```go
config := reconnect.Config{
Strategy: reconnect.StrategyFixedInterval,
InitialDelay: 5 * time.Second, // 固定 5 秒间隔
}
```
### 线性退避策略 (Linear Backoff)
每次重试间隔线性增长。
```go
config := reconnect.Config{
Strategy: reconnect.StrategyLinearBackoff,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
LinearIncrement: 2 * time.Second, // 每次增加 2 秒
}
```
## 配置选项
| 选项 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `Strategy` | `StrategyType` | `ExponentialBackoff` | 重连策略类型 |
| `MaxRetries` | `int` | `-1` | 最大重试次数,-1 表示无限重试 |
| `InitialDelay` | `time.Duration` | `1s` | 初始延迟时间 |
| `MaxDelay` | `time.Duration` | `30s` | 最大延迟时间 |
| `Multiplier` | `float64` | `2.0` | 指数退避乘数 |
| `LinearIncrement` | `time.Duration` | `1s` | 线性退避增量 |
| `HealthCheckInterval` | `time.Duration` | `10s` | 健康检查间隔 |
| `HealthCheckTimeout` | `time.Duration` | `5s` | 健康检查超时时间 |
| `OnStateChange` | `func(ConnectionState, ConnectionState)` | `nil` | 状态变化回调 |
| `OnReconnect` | `func(int)` | `nil` | 重连成功回调 |
| `OnError` | `func(error)` | `nil` | 错误回调 |
## 连接状态
- `StateDisconnected`: 断开连接
- `StateConnecting`: 正在连接
- `StateConnected`: 已连接
- `StateReconnecting`: 正在重连
## 高级用法
### 自定义连接器
如果需要支持其他类型的连接,可以实现 `Connector``HealthChecker` 接口:
```go
type CustomConnector struct {
// 自定义连接逻辑
}
func (c *CustomConnector) Connect(ctx context.Context) error {
// 实现连接逻辑
return nil
}
func (c *CustomConnector) Close() error {
// 实现关闭逻辑
return nil
}
type CustomHealthChecker struct {
// 自定义健康检查逻辑
}
func (c *CustomHealthChecker) HealthCheck(ctx context.Context) error {
// 实现健康检查逻辑
return nil
}
// 使用自定义连接器
connector := &CustomConnector{}
checker := &CustomHealthChecker{}
manager := reconnect.NewConnectionManager(connector, checker, config)
```
### 手动触发重连
```go
// 手动触发重连
client.TriggerReconnect()
// 获取当前状态
state := client.State()
fmt.Printf("当前状态: %s\n", state)
```
## 最佳实践
1. **合理设置重试次数**: 对于关键服务建议设置为无限重试,非关键服务设置合理的重试上限
2. **配置适当的健康检查间隔**: 太频繁会影响性能,太稀疏会延迟发现问题
3. **使用回调函数监控状态**: 通过 `OnStateChange``OnReconnect``OnError` 回调监控连接状态
4. **设置合适的超时时间**: 避免长时间阻塞
5. **优雅关闭**: 在应用关闭时调用 `Close()` 方法释放资源
## 依赖
- [go-redis/v8](https://github.com/go-redis/redis) - Redis 客户端
- [etcd client/v3](https://etcd.io/) - etcd 客户端
- [google.golang.org/grpc](https://grpc.io/) - gRPC 框架
- [gorm.io/gorm](https://gorm.io/) - ORM 框架
## 许可证
MIT License
## 贡献
欢迎提交 Issue 和 Pull Request
## 更新日志
### v1.1.0
- 新增 RabbitMQ 重连支持
- 添加 RabbitMQ 客户端封装和健康检查
- 完善文档和使用示例
### v1.0.0
- 初始版本发布
- 支持数据库、Redis、etcd、gRPC 重连
- 实现三种重连策略

285
etcd_registry.go Normal file
View File

@@ -0,0 +1,285 @@
package reconnect
import (
"context"
"fmt"
"log"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
// EtcdRegistryConfig etcd 服务注册配置
type EtcdRegistryConfig struct {
// Endpoints etcd 服务地址列表
Endpoints []string
// DialTimeout 连接超时时间
DialTimeout time.Duration
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// EtcdRegistry etcd 服务注册(带自动重连)
type EtcdRegistry struct {
config EtcdRegistryConfig
etcdClient *EtcdClient
cli *clientv3.Client
key string
val string
serviceName string
ttl int64
leaseID clientv3.LeaseID
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
ctx, cancel := context.WithCancel(context.Background())
// 创建带重连的 etcd 客户端
etcdCfg := EtcdClientConfig{
Endpoints: cfg.Endpoints,
DialTimeout: cfg.DialTimeout,
ReconnectConfig: cfg.ReconnectConfig,
}
etcdClient, err := NewEtcdClient(etcdCfg)
if err != nil {
cancel()
return nil, err
}
return &EtcdRegistry{
config: cfg,
etcdClient: etcdClient,
cli: etcdClient.GetClient(),
ctx: ctx,
cancel: cancel,
}, nil
}
// Register 注册服务
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
r.mu.Lock()
r.serviceName = serviceName
r.ttl = ttl
r.key = fmt.Sprintf("/%s/%s", serviceName, addr)
r.val = addr
r.mu.Unlock()
return r.registerWithKV(ttl)
}
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
r.mu.RLock()
cli := r.cli
key := r.key
val := r.val
r.mu.RUnlock()
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
if cli == nil {
if r.etcdClient != nil {
// 检查连接状态
state := r.etcdClient.State()
if state != StateConnected {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
}
if cli == nil {
return fmt.Errorf("etcd client is nil")
}
} else {
// 即使 cli 不为 nil也要检查连接状态因为可能连接已断开但 cli 引用还在
if r.etcdClient != nil {
state := r.etcdClient.State()
if state != StateConnected {
// 连接已断开,尝试更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli == nil {
return fmt.Errorf("etcd client is not connected, current state: %s", state)
}
}
}
}
// 使用带超时的 context 来避免长时间阻塞
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := cli.Grant(ctx, ttl)
if err != nil {
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
if r.etcdClient != nil {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
cli = r.cli
r.mu.Unlock()
if cli != nil {
// 使用新的 client 重试
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
resp, err = cli.Grant(ctx2, ttl)
}
}
if err != nil {
return fmt.Errorf("failed to grant lease: %w", err)
}
}
r.mu.Lock()
r.leaseID = resp.ID
r.mu.Unlock()
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel3()
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return fmt.Errorf("failed to put key: %w", err)
}
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel4()
keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID)
if err != nil {
return fmt.Errorf("failed to start keep-alive: %w", err)
}
r.mu.Lock()
r.keepAliveCh = keepAliveCh
r.mu.Unlock()
go r.watcher(ttl)
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
return nil
}
// watcher 监听续约
func (r *EtcdRegistry) watcher(ttl int64) {
for {
select {
case <-r.ctx.Done():
log.Println("[EtcdRegistry] context done, watcher exiting.")
return
case ka, ok := <-r.keepAliveCh:
if !ok {
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
// 等待连接恢复,而不是立即重试
maxWaitTime := 60 * time.Second
checkInterval := 500 * time.Millisecond
waited := time.Duration(0)
for waited < maxWaitTime {
select {
case <-r.ctx.Done():
return
default:
}
// 检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
// 连接已恢复,更新 cli 引用
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
// 等待一小段时间确保连接稳定
time.Sleep(100 * time.Millisecond)
// 尝试重新注册
if err := r.registerWithKV(ttl); err == nil {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
} else {
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
}
}
}
time.Sleep(checkInterval)
waited += checkInterval
}
// 如果等待超时,使用指数退避重试
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
delay := 2 * time.Second
maxDelay := 30 * time.Second
for {
select {
case <-r.ctx.Done():
return
default:
}
// 再次检查连接状态
if r.etcdClient != nil {
state := r.etcdClient.State()
if state == StateConnected {
r.mu.Lock()
r.cli = r.etcdClient.GetClient()
r.mu.Unlock()
}
}
if err := r.registerWithKV(ttl); err != nil {
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
time.Sleep(delay)
delay *= 2
if delay > maxDelay {
delay = maxDelay
}
} else {
log.Println("[EtcdRegistry] 服务重新注册成功")
return // 退出当前 goroutine让新的 watcher 启动
}
}
}
// 续约成功
_ = ka
}
}
}
// UnRegister 注销服务
func (r *EtcdRegistry) UnRegister() {
r.cancel() // 停止 watcher
r.mu.RLock()
cli := r.cli
leaseID := r.leaseID
key := r.key
r.mu.RUnlock()
if cli != nil {
cli.Revoke(context.Background(), leaseID)
cli.Delete(context.Background(), key)
log.Printf("[EtcdRegistry] 服务注销成功: %s", key)
}
if r.etcdClient != nil {
r.etcdClient.Close()
}
}
// State 获取连接状态
func (r *EtcdRegistry) State() ConnectionState {
if r.etcdClient != nil {
return r.etcdClient.State()
}
return StateDisconnected
}

3
go.mod
View File

@@ -1,9 +1,10 @@
module blueocean.local/reconnect
module git.whblueocean.cn/blueocean-go/reconnect
go 1.24.0
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/rabbitmq/amqp091-go v1.10.0
go.etcd.io/etcd/client/v3 v3.6.5
google.golang.org/grpc v1.75.1
gorm.io/gorm v1.31.1

2
go.sum
View File

@@ -41,6 +41,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

274
grpc_etcd.go Normal file
View File

@@ -0,0 +1,274 @@
package reconnect
import (
"context"
"log"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
)
// GRPCEtcdClientConfig 使用 etcd 服务发现的 gRPC 客户端配置
type GRPCEtcdClientConfig struct {
// EtcdEndpoints etcd 服务地址列表
EtcdEndpoints []string
// EtcdDialTimeout etcd 连接超时时间
EtcdDialTimeout time.Duration
// ServiceName 服务名称(在 etcd 中注册的 key 前缀)
ServiceName string
// DialOptions 额外的 gRPC 拨号选项
DialOptions []grpc.DialOption
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// GRPCEtcdClient 带 etcd 服务发现和重连功能的 gRPC 客户端
type GRPCEtcdClient struct {
config GRPCEtcdClientConfig
etcdClient *clientv3.Client
conn *grpc.ClientConn
manager *ConnectionManager
mu sync.RWMutex
}
// grpcEtcdConnector gRPC + etcd 连接器
type grpcEtcdConnector struct {
client *GRPCEtcdClient
}
func (g *grpcEtcdConnector) Connect(ctx context.Context) error {
g.client.mu.Lock()
defer g.client.mu.Unlock()
cfg := g.client.config
// 创建 etcd 客户端
etcdTimeout := cfg.EtcdDialTimeout
if etcdTimeout == 0 {
etcdTimeout = 5 * time.Second
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: cfg.EtcdEndpoints,
DialTimeout: etcdTimeout,
})
if err != nil {
return err
}
// 注册 etcd resolver
etcdResolver := &etcdResolverBuilder{cli: cli}
resolver.Register(etcdResolver)
target := "etcd:///" + cfg.ServiceName + "/"
// 默认 DialOptions
opts := []grpc.DialOption{
grpc.WithResolvers(etcdResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
}
// 追加用户自定义选项
opts = append(opts, cfg.DialOptions...)
conn, err := grpc.NewClient(target, opts...)
if err != nil {
cli.Close()
return err
}
// 等待连接就绪
connCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn.Connect()
for {
state := conn.GetState()
if state == connectivity.Ready {
break
}
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
conn.Close()
cli.Close()
return context.DeadlineExceeded
}
if !conn.WaitForStateChange(connCtx, state) {
conn.Close()
cli.Close()
return connCtx.Err()
}
}
g.client.etcdClient = cli
g.client.conn = conn
return nil
}
func (g *grpcEtcdConnector) Close() error {
g.client.mu.Lock()
defer g.client.mu.Unlock()
if g.client.conn != nil {
g.client.conn.Close()
g.client.conn = nil
}
if g.client.etcdClient != nil {
g.client.etcdClient.Close()
g.client.etcdClient = nil
}
return nil
}
// grpcEtcdHealthChecker gRPC + etcd 健康检查器
type grpcEtcdHealthChecker struct {
client *GRPCEtcdClient
}
func (g *grpcEtcdHealthChecker) HealthCheck(ctx context.Context) error {
g.client.mu.RLock()
conn := g.client.conn
g.client.mu.RUnlock()
if conn == nil {
return context.Canceled
}
state := conn.GetState()
if state == connectivity.Ready || state == connectivity.Idle {
return nil
}
conn.Connect()
if !conn.WaitForStateChange(ctx, state) {
return ctx.Err()
}
newState := conn.GetState()
if newState != connectivity.Ready && newState != connectivity.Idle {
return context.DeadlineExceeded
}
return nil
}
// NewGRPCClientWithEtcd 创建带 etcd 服务发现和重连功能的 gRPC 客户端
func NewGRPCClientWithEtcd(cfg GRPCEtcdClientConfig) (*GRPCEtcdClient, error) {
client := &GRPCEtcdClient{
config: cfg,
}
connector := &grpcEtcdConnector{client: client}
checker := &grpcEtcdHealthChecker{client: client}
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
// 首次连接
ctx := context.Background()
if err := client.manager.ConnectWithRetry(ctx); err != nil {
return nil, err
}
return client, nil
}
// GetConn 获取 gRPC 连接
func (c *GRPCEtcdClient) GetConn() *grpc.ClientConn {
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn
}
// GetEtcdClient 获取 etcd 客户端
func (c *GRPCEtcdClient) GetEtcdClient() *clientv3.Client {
c.mu.RLock()
defer c.mu.RUnlock()
return c.etcdClient
}
// State 获取连接状态
func (c *GRPCEtcdClient) State() ConnectionState {
return c.manager.State()
}
// Close 关闭客户端
func (c *GRPCEtcdClient) Close() error {
return c.manager.Close()
}
// TriggerReconnect 手动触发重连
func (c *GRPCEtcdClient) TriggerReconnect() {
c.manager.TriggerReconnect()
}
// ================== etcd resolver 实现 ==================
// etcdResolverBuilder 实现 resolver.Builder
type etcdResolverBuilder struct {
cli *clientv3.Client
}
func (b *etcdResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &etcdResolver{
cli: b.cli,
cc: cc,
target: target,
}
go r.watch(target.Endpoint())
return r, nil
}
func (b *etcdResolverBuilder) Scheme() string {
return "etcd"
}
// etcdResolver 实现 resolver.Resolver
type etcdResolver struct {
cli *clientv3.Client
cc resolver.ClientConn
target resolver.Target
}
func (r *etcdResolver) watch(keyPrefix string) {
// 首次获取服务列表
resp, err := r.cli.Get(context.Background(), "/"+keyPrefix, clientv3.WithPrefix())
if err != nil {
log.Printf("[etcdResolver] 获取服务列表失败: %v", err)
return
}
addrs := make([]resolver.Address, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
addrs = append(addrs, resolver.Address{Addr: string(kv.Value)})
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
// 监听变化
watchCh := r.cli.Watch(context.Background(), "/"+keyPrefix, clientv3.WithPrefix())
for wresp := range watchCh {
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut:
addrs = append(addrs, resolver.Address{Addr: string(ev.Kv.Value)})
case clientv3.EventTypeDelete:
newAddrs := make([]resolver.Address, 0)
for _, addr := range addrs {
if addr.Addr != string(ev.Kv.Value) {
newAddrs = append(newAddrs, addr)
}
}
addrs = newAddrs
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
}
}
func (r *etcdResolver) ResolveNow(opts resolver.ResolveNowOptions) {}
func (r *etcdResolver) Close() {}

View File

@@ -223,4 +223,3 @@ func (cm *ConnectionManager) Close() error {
func (cm *ConnectionManager) TriggerReconnect() {
cm.reconnect()
}

205
rabbitmq.go Normal file
View File

@@ -0,0 +1,205 @@
package reconnect
import (
"context"
"sync"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitMQClientConfig RabbitMQ客户端配置
type RabbitMQClientConfig struct {
// URL RabbitMQ连接URL
URL string
// Config amqp配置
Config *amqp.Config
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// RabbitMQClient 带重连功能的RabbitMQ客户端
type RabbitMQClient struct {
config RabbitMQClientConfig
conn *amqp.Connection
manager *ConnectionManager
mu sync.RWMutex
}
// rabbitmqConnector RabbitMQ连接器
type rabbitmqConnector struct {
client *RabbitMQClient
}
func (r *rabbitmqConnector) Connect(ctx context.Context) error {
r.client.mu.Lock()
defer r.client.mu.Unlock()
var conn *amqp.Connection
var err error
if r.client.config.Config != nil {
conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config)
} else {
conn, err = amqp.Dial(r.client.config.URL)
}
if err != nil {
return err
}
// 验证连接
if conn.IsClosed() {
conn.Close()
return amqp.ErrClosed
}
r.client.conn = conn
return nil
}
func (r *rabbitmqConnector) Close() error {
r.client.mu.Lock()
defer r.client.mu.Unlock()
if r.client.conn != nil && !r.client.conn.IsClosed() {
err := r.client.conn.Close()
r.client.conn = nil
return err
}
return nil
}
// rabbitmqHealthChecker RabbitMQ健康检查器
type rabbitmqHealthChecker struct {
client *RabbitMQClient
}
func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error {
r.client.mu.RLock()
conn := r.client.conn
r.client.mu.RUnlock()
if conn == nil || conn.IsClosed() {
return amqp.ErrClosed
}
// 尝试创建通道来验证连接
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
return nil
}
// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端
func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) {
client := &RabbitMQClient{
config: cfg,
}
connector := &rabbitmqConnector{client: client}
checker := &rabbitmqHealthChecker{client: client}
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
// 首次连接
ctx := context.Background()
if err := client.manager.ConnectWithRetry(ctx); err != nil {
return nil, err
}
return client, nil
}
// GetConn 获取RabbitMQ连接
func (c *RabbitMQClient) GetConn() *amqp.Connection {
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn
}
// Channel 创建通道
func (c *RabbitMQClient) Channel() (*amqp.Channel, error) {
conn := c.GetConn()
if conn == nil || conn.IsClosed() {
return nil, amqp.ErrClosed
}
return conn.Channel()
}
// State 获取连接状态
func (c *RabbitMQClient) State() ConnectionState {
return c.manager.State()
}
// Close 关闭客户端
func (c *RabbitMQClient) Close() error {
return c.manager.Close()
}
// TriggerReconnect 手动触发重连
func (c *RabbitMQClient) TriggerReconnect() {
c.manager.TriggerReconnect()
}
// IsClosed 检查连接是否关闭
func (c *RabbitMQClient) IsClosed() bool {
conn := c.GetConn()
return conn == nil || conn.IsClosed()
}
// Publish 发布消息(带重连支持)
func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
// Consume 消费消息(带重连支持)
func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
ch, err := c.Channel()
if err != nil {
return nil, err
}
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
// QueueDeclare 声明队列(带重连支持)
func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
ch, err := c.Channel()
if err != nil {
return amqp.Queue{}, err
}
defer ch.Close()
return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}
// ExchangeDeclare 声明交换机(带重连支持)
func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}
// QueueBind 绑定队列到交换机(带重连支持)
func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.QueueBind(name, key, exchange, noWait, args)
}