Compare commits
7 Commits
65fe52ceb0
...
c68b77b1b7
| Author | SHA1 | Date | |
|---|---|---|---|
| c68b77b1b7 | |||
| 597e1cf840 | |||
| 4382d345f6 | |||
| a8180f5115 | |||
| 915b5dd0bb | |||
| 896d518c54 | |||
| c70c2e9f1c |
216
README.md
216
README.md
@@ -1,2 +1,218 @@
|
|||||||
# reconnect
|
# reconnect
|
||||||
|
|
||||||
|
一个通用的Go语言重连库,支持Redis、PostgreSQL和etcd的自动重连功能。
|
||||||
|
|
||||||
|
## 特性
|
||||||
|
|
||||||
|
- 🔄 自动重连机制
|
||||||
|
- ⚙️ 可配置的重连策略
|
||||||
|
- 🔍 连接健康检查
|
||||||
|
- 📊 连接状态监控
|
||||||
|
- 🎯 支持多种服务(Redis、PostgreSQL、etcd)
|
||||||
|
- 🛡️ 线程安全
|
||||||
|
|
||||||
|
## 安装
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get github.com/blueocean-go/reconnect
|
||||||
|
```
|
||||||
|
|
||||||
|
## 快速开始
|
||||||
|
|
||||||
|
### Redis重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
options := &redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Password: "",
|
||||||
|
DB: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("Redis重连成功!")
|
||||||
|
}
|
||||||
|
|
||||||
|
manager := reconnect.RedisManager(options, config)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 使用Redis客户端
|
||||||
|
redisClient := reconnect.NewRedisClient(options)
|
||||||
|
redisClient.Connect(ctx)
|
||||||
|
client := redisClient.GetClient()
|
||||||
|
// ... 使用client进行操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### PostgreSQL重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
dsn := "host=localhost user=postgres password=postgres dbname=testdb sslmode=disable"
|
||||||
|
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("PostgreSQL重连成功!")
|
||||||
|
}
|
||||||
|
|
||||||
|
manager := reconnect.PostgresManager(
|
||||||
|
dsn,
|
||||||
|
25, // maxConnections
|
||||||
|
5, // maxIdleConnections
|
||||||
|
30*time.Minute, // maxConnectionAge
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 使用数据库连接
|
||||||
|
pgClient := reconnect.NewPostgresClient(dsn, 25, 5, 30*time.Minute)
|
||||||
|
pgClient.Connect(ctx)
|
||||||
|
db := pgClient.GetDB()
|
||||||
|
// ... 使用db进行操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### etcd重连
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
etcdConfig := clientv3.Config{
|
||||||
|
Endpoints: []string{"localhost:2379"},
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("etcd重连成功!")
|
||||||
|
}
|
||||||
|
|
||||||
|
manager := reconnect.EtcdManager(etcdConfig, config)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 使用etcd客户端
|
||||||
|
etcdClient := reconnect.NewEtcdClient(etcdConfig)
|
||||||
|
etcdClient.Connect(ctx)
|
||||||
|
client := etcdClient.GetClient()
|
||||||
|
// ... 使用client进行操作
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 配置选项
|
||||||
|
|
||||||
|
```go
|
||||||
|
type Config struct {
|
||||||
|
// MaxRetries 最大重试次数,0表示无限重试
|
||||||
|
MaxRetries int
|
||||||
|
|
||||||
|
// RetryInterval 重试间隔
|
||||||
|
RetryInterval time.Duration
|
||||||
|
|
||||||
|
// Timeout 连接超时时间
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// OnReconnect 重连成功后的回调函数
|
||||||
|
OnReconnect func()
|
||||||
|
|
||||||
|
// OnDisconnect 断开连接时的回调函数
|
||||||
|
OnDisconnect func(error)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## API文档
|
||||||
|
|
||||||
|
### Manager
|
||||||
|
|
||||||
|
重连管理器,负责监控连接状态并自动重连。
|
||||||
|
|
||||||
|
- `Start(ctx context.Context) error` - 启动连接并开始监控
|
||||||
|
- `Stop() error` - 停止重连管理器
|
||||||
|
- `IsConnected() bool` - 返回当前连接状态
|
||||||
|
- `WaitForError() error` - 等待错误(用于阻塞等待)
|
||||||
|
|
||||||
|
### RedisClient
|
||||||
|
|
||||||
|
Redis客户端包装器。
|
||||||
|
|
||||||
|
- `Connect(ctx context.Context) error` - 建立连接
|
||||||
|
- `Close() error` - 关闭连接
|
||||||
|
- `Ping(ctx context.Context) error` - 健康检查
|
||||||
|
- `IsConnected() bool` - 检查连接状态
|
||||||
|
- `GetClient() *redis.Client` - 获取底层Redis客户端
|
||||||
|
|
||||||
|
### PostgresClient
|
||||||
|
|
||||||
|
PostgreSQL客户端包装器。
|
||||||
|
|
||||||
|
- `Connect(ctx context.Context) error` - 建立连接
|
||||||
|
- `Close() error` - 关闭连接
|
||||||
|
- `Ping(ctx context.Context) error` - 健康检查
|
||||||
|
- `IsConnected() bool` - 检查连接状态
|
||||||
|
- `GetDB() *sql.DB` - 获取底层数据库连接
|
||||||
|
|
||||||
|
### EtcdClient
|
||||||
|
|
||||||
|
etcd客户端包装器。
|
||||||
|
|
||||||
|
- `Connect(ctx context.Context) error` - 建立连接
|
||||||
|
- `Close() error` - 关闭连接
|
||||||
|
- `Ping(ctx context.Context) error` - 健康检查
|
||||||
|
- `IsConnected() bool` - 检查连接状态
|
||||||
|
- `GetClient() *clientv3.Client` - 获取底层etcd客户端
|
||||||
|
|
||||||
|
## 示例
|
||||||
|
|
||||||
|
更多示例代码请查看 `examples/` 目录。
|
||||||
|
|
||||||
|
## 许可证
|
||||||
|
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2025 blueocean-go
|
||||||
|
|||||||
108
etcd.go
Normal file
108
etcd.go
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EtcdClient etcd客户端包装器
|
||||||
|
type EtcdClient struct {
|
||||||
|
config clientv3.Config
|
||||||
|
client *clientv3.Client
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEtcdClient 创建新的etcd客户端
|
||||||
|
func NewEtcdClient(config clientv3.Config) *EtcdClient {
|
||||||
|
return &EtcdClient{
|
||||||
|
config: config,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect 建立etcd连接
|
||||||
|
func (e *EtcdClient) Connect(ctx context.Context) error {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if e.client != nil {
|
||||||
|
_ = e.client.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
client, err := clientv3.New(e.config)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("etcd new client failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试连接
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err = client.Status(timeoutCtx, e.config.Endpoints[0])
|
||||||
|
if err != nil {
|
||||||
|
_ = client.Close()
|
||||||
|
return fmt.Errorf("etcd status check failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.client = client
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭etcd连接
|
||||||
|
func (e *EtcdClient) Close() error {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if e.client != nil {
|
||||||
|
err := e.client.Close()
|
||||||
|
e.client = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping 检查etcd连接是否健康
|
||||||
|
func (e *EtcdClient) Ping(ctx context.Context) error {
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
|
||||||
|
if e.client == nil {
|
||||||
|
return fmt.Errorf("etcd client is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(e.config.Endpoints) == 0 {
|
||||||
|
return fmt.Errorf("etcd endpoints is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := e.client.Status(timeoutCtx, e.config.Endpoints[0])
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsConnected 检查连接状态
|
||||||
|
func (e *EtcdClient) IsConnected() bool {
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
|
||||||
|
return e.client != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClient 获取底层的etcd客户端
|
||||||
|
func (e *EtcdClient) GetClient() *clientv3.Client {
|
||||||
|
e.mu.RLock()
|
||||||
|
defer e.mu.RUnlock()
|
||||||
|
|
||||||
|
return e.client
|
||||||
|
}
|
||||||
|
|
||||||
|
// EtcdManager 创建etcd重连管理器
|
||||||
|
func EtcdManager(config clientv3.Config, reconnectConfig *Config) *Manager {
|
||||||
|
client := NewEtcdClient(config)
|
||||||
|
return NewManager(client, reconnectConfig)
|
||||||
|
}
|
||||||
|
|
||||||
73
examples/etcd_example.go
Normal file
73
examples/etcd_example.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// etcd配置
|
||||||
|
etcdConfig := clientv3.Config{
|
||||||
|
Endpoints: []string{"localhost:2379"},
|
||||||
|
DialTimeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建重连配置
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.MaxRetries = 0 // 无限重试
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("etcd重连成功!")
|
||||||
|
}
|
||||||
|
config.OnDisconnect = func(err error) {
|
||||||
|
fmt.Printf("etcd连接断开: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建重连管理器
|
||||||
|
manager := reconnect.EtcdManager(etcdConfig, config)
|
||||||
|
|
||||||
|
// 启动连接
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatalf("启动失败: %v", err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 获取etcd客户端并使用
|
||||||
|
etcdClient := reconnect.NewEtcdClient(etcdConfig)
|
||||||
|
if err := etcdClient.Connect(ctx); err != nil {
|
||||||
|
log.Fatalf("连接失败: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client := etcdClient.GetClient()
|
||||||
|
if client == nil {
|
||||||
|
log.Fatal("客户端为空")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用etcd客户端
|
||||||
|
kv := clientv3.NewKV(client)
|
||||||
|
_, err := kv.Put(ctx, "key", "value")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("设置键值失败: %v", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("设置键值成功")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := kv.Get(ctx, "key")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("获取键值失败: %v", err)
|
||||||
|
} else {
|
||||||
|
if len(resp.Kvs) > 0 {
|
||||||
|
fmt.Printf("获取键值: %s = %s\n", resp.Kvs[0].Key, resp.Kvs[0].Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保持运行
|
||||||
|
select {}
|
||||||
|
}
|
||||||
|
|
||||||
65
examples/postgres_example.go
Normal file
65
examples/postgres_example.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// PostgreSQL连接字符串
|
||||||
|
dsn := "host=localhost user=postgres password=postgres dbname=testdb sslmode=disable"
|
||||||
|
|
||||||
|
// 创建重连配置
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.MaxRetries = 0 // 无限重试
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("PostgreSQL重连成功!")
|
||||||
|
}
|
||||||
|
config.OnDisconnect = func(err error) {
|
||||||
|
fmt.Printf("PostgreSQL连接断开: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建重连管理器
|
||||||
|
manager := reconnect.PostgresManager(
|
||||||
|
dsn,
|
||||||
|
25, // maxConnections
|
||||||
|
5, // maxIdleConnections
|
||||||
|
30*time.Minute, // maxConnectionAge
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 启动连接
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatalf("启动失败: %v", err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 获取数据库连接并使用
|
||||||
|
pgClient := reconnect.NewPostgresClient(dsn, 25, 5, 30*time.Minute)
|
||||||
|
if err := pgClient.Connect(ctx); err != nil {
|
||||||
|
log.Fatalf("连接失败: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db := pgClient.GetDB()
|
||||||
|
if db == nil {
|
||||||
|
log.Fatal("数据库连接为空")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用数据库连接
|
||||||
|
var version string
|
||||||
|
err := db.QueryRow("SELECT version()").Scan(&version)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("查询失败: %v", err)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("PostgreSQL版本: %s\n", version)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保持运行
|
||||||
|
select {}
|
||||||
|
}
|
||||||
70
examples/redis_example.go
Normal file
70
examples/redis_example.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/blueocean-go/reconnect"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// 创建Redis选项
|
||||||
|
options := &redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Password: "", // 无密码
|
||||||
|
DB: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建重连配置
|
||||||
|
config := reconnect.DefaultConfig()
|
||||||
|
config.RetryInterval = 3 * time.Second
|
||||||
|
config.MaxRetries = 0 // 无限重试
|
||||||
|
config.OnReconnect = func() {
|
||||||
|
fmt.Println("Redis重连成功!")
|
||||||
|
}
|
||||||
|
config.OnDisconnect = func(err error) {
|
||||||
|
fmt.Printf("Redis连接断开: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建重连管理器
|
||||||
|
manager := reconnect.RedisManager(options, config)
|
||||||
|
|
||||||
|
// 启动连接
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := manager.Start(ctx); err != nil {
|
||||||
|
log.Fatalf("启动失败: %v", err)
|
||||||
|
}
|
||||||
|
defer manager.Stop()
|
||||||
|
|
||||||
|
// 获取Redis客户端并使用
|
||||||
|
redisClient := reconnect.NewRedisClient(options)
|
||||||
|
if err := redisClient.Connect(ctx); err != nil {
|
||||||
|
log.Fatalf("连接失败: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client := redisClient.GetClient()
|
||||||
|
if client == nil {
|
||||||
|
log.Fatal("客户端为空")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用Redis客户端
|
||||||
|
err := client.Set(ctx, "key", "value", 0).Err()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("设置键值失败: %v", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("设置键值成功")
|
||||||
|
}
|
||||||
|
|
||||||
|
val, err := client.Get(ctx, "key").Result()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("获取键值失败: %v", err)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("获取键值: %s\n", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 保持运行
|
||||||
|
select {}
|
||||||
|
}
|
||||||
31
go.mod
Normal file
31
go.mod
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
module github.com/blueocean-go/reconnect
|
||||||
|
|
||||||
|
go 1.21
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/lib/pq v1.10.9
|
||||||
|
github.com/redis/go-redis/v9 v9.3.0
|
||||||
|
go.etcd.io/etcd/client/v3 v3.5.10
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||||
|
github.com/coreos/go-semver v0.3.0 // indirect
|
||||||
|
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
|
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
|
||||||
|
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
|
||||||
|
go.uber.org/atomic v1.7.0 // indirect
|
||||||
|
go.uber.org/multierr v1.6.0 // indirect
|
||||||
|
go.uber.org/zap v1.17.0 // indirect
|
||||||
|
golang.org/x/net v0.17.0 // indirect
|
||||||
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
|
golang.org/x/text v0.13.0 // indirect
|
||||||
|
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
|
||||||
|
google.golang.org/grpc v1.59.0 // indirect
|
||||||
|
google.golang.org/protobuf v1.31.0 // indirect
|
||||||
|
)
|
||||||
104
go.sum
Normal file
104
go.sum
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||||
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
|
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||||
|
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||||
|
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
|
||||||
|
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||||
|
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
|
||||||
|
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||||
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||||
|
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||||
|
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||||
|
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||||
|
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||||
|
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||||
|
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||||
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||||
|
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||||
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||||
|
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
|
||||||
|
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
|
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
|
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
|
go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k=
|
||||||
|
go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI=
|
||||||
|
go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0=
|
||||||
|
go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U=
|
||||||
|
go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao=
|
||||||
|
go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc=
|
||||||
|
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||||
|
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||||
|
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
|
||||||
|
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
|
||||||
|
go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
|
||||||
|
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||||
|
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
|
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
|
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
|
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||||
|
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||||
|
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||||
|
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||||
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||||
|
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||||
|
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
|
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||||
|
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||||
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY=
|
||||||
|
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
|
||||||
|
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||||
|
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||||
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||||
|
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
106
postgres.go
Normal file
106
postgres.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PostgresClient PostgreSQL客户端包装器
|
||||||
|
type PostgresClient struct {
|
||||||
|
dsn string
|
||||||
|
db *sql.DB
|
||||||
|
mu sync.RWMutex
|
||||||
|
maxAge time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPostgresClient 创建新的PostgreSQL客户端
|
||||||
|
func NewPostgresClient(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration) *PostgresClient {
|
||||||
|
return &PostgresClient{
|
||||||
|
dsn: dsn,
|
||||||
|
maxAge: maxConnectionAge,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect 建立PostgreSQL连接
|
||||||
|
func (p *PostgresClient) Connect(ctx context.Context) error {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
if p.db != nil {
|
||||||
|
_ = p.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := sql.Open("postgres", p.dsn)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("postgres open failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置连接池参数
|
||||||
|
db.SetMaxOpenConns(25)
|
||||||
|
db.SetMaxIdleConns(5)
|
||||||
|
if p.maxAge > 0 {
|
||||||
|
db.SetConnMaxLifetime(p.maxAge)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试连接
|
||||||
|
if err := db.PingContext(ctx); err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return fmt.Errorf("postgres ping failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.db = db
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭PostgreSQL连接
|
||||||
|
func (p *PostgresClient) Close() error {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
if p.db != nil {
|
||||||
|
err := p.db.Close()
|
||||||
|
p.db = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping 检查PostgreSQL连接是否健康
|
||||||
|
func (p *PostgresClient) Ping(ctx context.Context) error {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
if p.db == nil {
|
||||||
|
return fmt.Errorf("postgres db is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.db.PingContext(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsConnected 检查连接状态
|
||||||
|
func (p *PostgresClient) IsConnected() bool {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
return p.db != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDB 获取底层的数据库连接
|
||||||
|
func (p *PostgresClient) GetDB() *sql.DB {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
return p.db
|
||||||
|
}
|
||||||
|
|
||||||
|
// PostgresManager 创建PostgreSQL重连管理器
|
||||||
|
func PostgresManager(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration, config *Config) *Manager {
|
||||||
|
client := NewPostgresClient(dsn, maxConnections, maxIdleConnections, maxConnectionAge)
|
||||||
|
return NewManager(client, config)
|
||||||
|
}
|
||||||
|
|
||||||
162
reconnect.go
Normal file
162
reconnect.go
Normal file
@@ -0,0 +1,162 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Reconnectable 定义可重连的接口
|
||||||
|
type Reconnectable interface {
|
||||||
|
// Connect 建立连接
|
||||||
|
Connect(ctx context.Context) error
|
||||||
|
// Close 关闭连接
|
||||||
|
Close() error
|
||||||
|
// Ping 检查连接是否健康
|
||||||
|
Ping(ctx context.Context) error
|
||||||
|
// IsConnected 检查连接状态
|
||||||
|
IsConnected() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config 重连配置
|
||||||
|
type Config struct {
|
||||||
|
// MaxRetries 最大重试次数,0表示无限重试
|
||||||
|
MaxRetries int
|
||||||
|
// RetryInterval 重试间隔
|
||||||
|
RetryInterval time.Duration
|
||||||
|
// Timeout 连接超时时间
|
||||||
|
Timeout time.Duration
|
||||||
|
// OnReconnect 重连成功后的回调函数
|
||||||
|
OnReconnect func()
|
||||||
|
// OnDisconnect 断开连接时的回调函数
|
||||||
|
OnDisconnect func(error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultConfig 返回默认配置
|
||||||
|
func DefaultConfig() *Config {
|
||||||
|
return &Config{
|
||||||
|
MaxRetries: 0, // 无限重试
|
||||||
|
RetryInterval: 3 * time.Second,
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manager 重连管理器
|
||||||
|
type Manager struct {
|
||||||
|
client Reconnectable
|
||||||
|
config *Config
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
connected bool
|
||||||
|
errCh chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager 创建新的重连管理器
|
||||||
|
func NewManager(client Reconnectable, config *Config) *Manager {
|
||||||
|
if config == nil {
|
||||||
|
config = DefaultConfig()
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
return &Manager{
|
||||||
|
client: client,
|
||||||
|
config: config,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
errCh: make(chan error, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start 启动连接并开始监控
|
||||||
|
func (m *Manager) Start(ctx context.Context) error {
|
||||||
|
// 初始连接
|
||||||
|
if err := m.connect(ctx); err != nil {
|
||||||
|
return fmt.Errorf("initial connection failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 启动监控 goroutine
|
||||||
|
go m.monitor()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop 停止重连管理器
|
||||||
|
func (m *Manager) Stop() error {
|
||||||
|
m.cancel()
|
||||||
|
if m.client != nil {
|
||||||
|
return m.client.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// connect 执行连接
|
||||||
|
func (m *Manager) connect(ctx context.Context) error {
|
||||||
|
connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := m.client.Connect(connectCtx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.connected = true
|
||||||
|
if m.config.OnReconnect != nil {
|
||||||
|
m.config.OnReconnect()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitor 监控连接状态并自动重连
|
||||||
|
func (m *Manager) monitor() {
|
||||||
|
ticker := time.NewTicker(m.config.RetryInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
retryCount := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-m.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
// 检查连接状态
|
||||||
|
if !m.client.IsConnected() {
|
||||||
|
m.connected = false
|
||||||
|
if m.config.OnDisconnect != nil {
|
||||||
|
m.config.OnDisconnect(errors.New("connection lost"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查是否超过最大重试次数
|
||||||
|
if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries {
|
||||||
|
m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试重连
|
||||||
|
if err := m.connect(context.Background()); err != nil {
|
||||||
|
retryCount++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 重连成功,重置计数器
|
||||||
|
retryCount = 0
|
||||||
|
} else {
|
||||||
|
// 连接正常,执行健康检查
|
||||||
|
if err := m.client.Ping(context.Background()); err != nil {
|
||||||
|
m.connected = false
|
||||||
|
if m.config.OnDisconnect != nil {
|
||||||
|
m.config.OnDisconnect(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsConnected 返回当前连接状态
|
||||||
|
func (m *Manager) IsConnected() bool {
|
||||||
|
return m.connected && m.client.IsConnected()
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForError 等待错误(用于阻塞等待)
|
||||||
|
func (m *Manager) WaitForError() error {
|
||||||
|
return <-m.errCh
|
||||||
|
}
|
||||||
88
redis.go
Normal file
88
redis.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RedisClient Redis客户端包装器
|
||||||
|
type RedisClient struct {
|
||||||
|
options *redis.Options
|
||||||
|
client *redis.Client
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRedisClient 创建新的Redis客户端
|
||||||
|
func NewRedisClient(options *redis.Options) *RedisClient {
|
||||||
|
return &RedisClient{
|
||||||
|
options: options,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect 建立Redis连接
|
||||||
|
func (r *RedisClient) Connect(ctx context.Context) error {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
if r.client != nil {
|
||||||
|
_ = r.client.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
client := redis.NewClient(r.options)
|
||||||
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
|
return fmt.Errorf("redis ping failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.client = client
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭Redis连接
|
||||||
|
func (r *RedisClient) Close() error {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
if r.client != nil {
|
||||||
|
err := r.client.Close()
|
||||||
|
r.client = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping 检查Redis连接是否健康
|
||||||
|
func (r *RedisClient) Ping(ctx context.Context) error {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
if r.client == nil {
|
||||||
|
return fmt.Errorf("redis client is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.client.Ping(ctx).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsConnected 检查连接状态
|
||||||
|
func (r *RedisClient) IsConnected() bool {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
return r.client != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetClient 获取底层的Redis客户端
|
||||||
|
func (r *RedisClient) GetClient() *redis.Client {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
return r.client
|
||||||
|
}
|
||||||
|
|
||||||
|
// RedisManager 创建Redis重连管理器
|
||||||
|
func RedisManager(options *redis.Options, config *Config) *Manager {
|
||||||
|
client := NewRedisClient(options)
|
||||||
|
return NewManager(client, config)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user