From c70c2e9f1c802f0b3c78ee60fb6086bb3c0b690a Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:13 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) diff --git a/README.md b/README.md index dccb4e8..e803fe5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,218 @@ # reconnect +一个通用的Go语言重连库,支持Redis、PostgreSQL和etcd的自动重连功能。 + +## 特性 + +- 🔄 自动重连机制 +- ⚙️ 可配置的重连策略 +- 🔍 连接健康检查 +- 📊 连接状态监控 +- 🎯 支持多种服务(Redis、PostgreSQL、etcd) +- 🛡️ 线程安全 + +## 安装 + +```bash +go get github.com/blueocean-go/reconnect +``` + +## 快速开始 + +### Redis重连 + +```go +package main + +import ( + "context" + "time" + + "github.com/go-redis/redis/v8" + "github.com/blueocean-go/reconnect" +) + +func main() { + options := &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + } + + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.OnReconnect = func() { + fmt.Println("Redis重连成功!") + } + + manager := reconnect.RedisManager(options, config) + + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatal(err) + } + defer manager.Stop() + + // 使用Redis客户端 + redisClient := reconnect.NewRedisClient(options) + redisClient.Connect(ctx) + client := redisClient.GetClient() + // ... 使用client进行操作 +} +``` + +### PostgreSQL重连 + +```go +package main + +import ( + "context" + "time" + + "github.com/blueocean-go/reconnect" +) + +func main() { + dsn := "host=localhost user=postgres password=postgres dbname=testdb sslmode=disable" + + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.OnReconnect = func() { + fmt.Println("PostgreSQL重连成功!") + } + + manager := reconnect.PostgresManager( + dsn, + 25, // maxConnections + 5, // maxIdleConnections + 30*time.Minute, // maxConnectionAge + config, + ) + + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatal(err) + } + defer manager.Stop() + + // 使用数据库连接 + pgClient := reconnect.NewPostgresClient(dsn, 25, 5, 30*time.Minute) + pgClient.Connect(ctx) + db := pgClient.GetDB() + // ... 使用db进行操作 +} +``` + +### etcd重连 + +```go +package main + +import ( + "context" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/blueocean-go/reconnect" +) + +func main() { + etcdConfig := clientv3.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: 5 * time.Second, + } + + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.OnReconnect = func() { + fmt.Println("etcd重连成功!") + } + + manager := reconnect.EtcdManager(etcdConfig, config) + + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatal(err) + } + defer manager.Stop() + + // 使用etcd客户端 + etcdClient := reconnect.NewEtcdClient(etcdConfig) + etcdClient.Connect(ctx) + client := etcdClient.GetClient() + // ... 使用client进行操作 +} +``` + +## 配置选项 + +```go +type Config struct { + // MaxRetries 最大重试次数,0表示无限重试 + MaxRetries int + + // RetryInterval 重试间隔 + RetryInterval time.Duration + + // Timeout 连接超时时间 + Timeout time.Duration + + // OnReconnect 重连成功后的回调函数 + OnReconnect func() + + // OnDisconnect 断开连接时的回调函数 + OnDisconnect func(error) +} +``` + +## API文档 + +### Manager + +重连管理器,负责监控连接状态并自动重连。 + +- `Start(ctx context.Context) error` - 启动连接并开始监控 +- `Stop() error` - 停止重连管理器 +- `IsConnected() bool` - 返回当前连接状态 +- `WaitForError() error` - 等待错误(用于阻塞等待) + +### RedisClient + +Redis客户端包装器。 + +- `Connect(ctx context.Context) error` - 建立连接 +- `Close() error` - 关闭连接 +- `Ping(ctx context.Context) error` - 健康检查 +- `IsConnected() bool` - 检查连接状态 +- `GetClient() *redis.Client` - 获取底层Redis客户端 + +### PostgresClient + +PostgreSQL客户端包装器。 + +- `Connect(ctx context.Context) error` - 建立连接 +- `Close() error` - 关闭连接 +- `Ping(ctx context.Context) error` - 健康检查 +- `IsConnected() bool` - 检查连接状态 +- `GetDB() *sql.DB` - 获取底层数据库连接 + +### EtcdClient + +etcd客户端包装器。 + +- `Connect(ctx context.Context) error` - 建立连接 +- `Close() error` - 关闭连接 +- `Ping(ctx context.Context) error` - 健康检查 +- `IsConnected() bool` - 检查连接状态 +- `GetClient() *clientv3.Client` - 获取底层etcd客户端 + +## 示例 + +更多示例代码请查看 `examples/` 目录。 + +## 许可证 + +MIT License + +Copyright (c) 2025 blueocean-go From 896d518c54f3505bf443efc980726c7ffadd7eb8 Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:25 +0800 Subject: [PATCH 2/7] Add go.mod and go.sum files for dependency management --- go.mod | 31 +++++++++++++++++ go.sum | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 go.mod create mode 100644 go.sum diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ef0b1a3 --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module github.com/blueocean-go/reconnect + +go 1.21 + +require ( + github.com/lib/pq v1.10.9 + github.com/redis/go-redis/v9 v9.3.0 + go.etcd.io/etcd/client/v3 v3.5.10 +) + +require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + go.etcd.io/etcd/api/v3 v3.5.10 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.17.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/grpc v1.59.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..57e945d --- /dev/null +++ b/go.sum @@ -0,0 +1,104 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= +github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/etcd/api/v3 v3.5.10 h1:szRajuUUbLyppkhs9K6BRtjY37l66XQQmw7oZRANE4k= +go.etcd.io/etcd/api/v3 v3.5.10/go.mod h1:TidfmT4Uycad3NM/o25fG3J07odo4GBB9hoxaodFCtI= +go.etcd.io/etcd/client/pkg/v3 v3.5.10 h1:kfYIdQftBnbAq8pUWFXfpuuxFSKzlmM5cSn76JByiT0= +go.etcd.io/etcd/client/pkg/v3 v3.5.10/go.mod h1:DYivfIviIuQ8+/lCq4vcxuseg2P2XbHygkKwFo9fc8U= +go.etcd.io/etcd/client/v3 v3.5.10 h1:W9TXNZ+oB3MCd/8UjxHTWK5J9Nquw9fQBLJd5ne5/Ao= +go.etcd.io/etcd/client/v3 v3.5.10/go.mod h1:RVeBnDz2PUEZqTpgqwAtUd8nAPf5kjyFyND7P1VkOKc= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= +go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 915b5dd0bb61b7794bd9b24348560c7fdf50128c Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:33 +0800 Subject: [PATCH 3/7] Add EtcdClient implementation for managing etcd connections with methods for connecting, closing, pinging, and checking connection status. --- etcd.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 etcd.go diff --git a/etcd.go b/etcd.go new file mode 100644 index 0000000..29bccd1 --- /dev/null +++ b/etcd.go @@ -0,0 +1,108 @@ +package reconnect + +import ( + "context" + "fmt" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EtcdClient etcd客户端包装器 +type EtcdClient struct { + config clientv3.Config + client *clientv3.Client + mu sync.RWMutex +} + +// NewEtcdClient 创建新的etcd客户端 +func NewEtcdClient(config clientv3.Config) *EtcdClient { + return &EtcdClient{ + config: config, + } +} + +// Connect 建立etcd连接 +func (e *EtcdClient) Connect(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.client != nil { + _ = e.client.Close() + } + + client, err := clientv3.New(e.config) + if err != nil { + return fmt.Errorf("etcd new client failed: %w", err) + } + + // 测试连接 + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, err = client.Status(timeoutCtx, e.config.Endpoints[0]) + if err != nil { + _ = client.Close() + return fmt.Errorf("etcd status check failed: %w", err) + } + + e.client = client + return nil +} + +// Close 关闭etcd连接 +func (e *EtcdClient) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.client != nil { + err := e.client.Close() + e.client = nil + return err + } + return nil +} + +// Ping 检查etcd连接是否健康 +func (e *EtcdClient) Ping(ctx context.Context) error { + e.mu.RLock() + defer e.mu.RUnlock() + + if e.client == nil { + return fmt.Errorf("etcd client is nil") + } + + if len(e.config.Endpoints) == 0 { + return fmt.Errorf("etcd endpoints is empty") + } + + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _, err := e.client.Status(timeoutCtx, e.config.Endpoints[0]) + return err +} + +// IsConnected 检查连接状态 +func (e *EtcdClient) IsConnected() bool { + e.mu.RLock() + defer e.mu.RUnlock() + + return e.client != nil +} + +// GetClient 获取底层的etcd客户端 +func (e *EtcdClient) GetClient() *clientv3.Client { + e.mu.RLock() + defer e.mu.RUnlock() + + return e.client +} + +// EtcdManager 创建etcd重连管理器 +func EtcdManager(config clientv3.Config, reconnectConfig *Config) *Manager { + client := NewEtcdClient(config) + return NewManager(client, reconnectConfig) +} + From a8180f5115752550886b020fc74f9499527af90f Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:39 +0800 Subject: [PATCH 4/7] Add PostgresClient implementation for managing PostgreSQL connections, including methods for connecting, closing, pinging, and checking connection status. --- postgres.go | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 postgres.go diff --git a/postgres.go b/postgres.go new file mode 100644 index 0000000..ce4df1f --- /dev/null +++ b/postgres.go @@ -0,0 +1,106 @@ +package reconnect + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + _ "github.com/lib/pq" +) + +// PostgresClient PostgreSQL客户端包装器 +type PostgresClient struct { + dsn string + db *sql.DB + mu sync.RWMutex + maxAge time.Duration +} + +// NewPostgresClient 创建新的PostgreSQL客户端 +func NewPostgresClient(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration) *PostgresClient { + return &PostgresClient{ + dsn: dsn, + maxAge: maxConnectionAge, + } +} + +// Connect 建立PostgreSQL连接 +func (p *PostgresClient) Connect(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.db != nil { + _ = p.db.Close() + } + + db, err := sql.Open("postgres", p.dsn) + if err != nil { + return fmt.Errorf("postgres open failed: %w", err) + } + + // 设置连接池参数 + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + if p.maxAge > 0 { + db.SetConnMaxLifetime(p.maxAge) + } + + // 测试连接 + if err := db.PingContext(ctx); err != nil { + _ = db.Close() + return fmt.Errorf("postgres ping failed: %w", err) + } + + p.db = db + return nil +} + +// Close 关闭PostgreSQL连接 +func (p *PostgresClient) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.db != nil { + err := p.db.Close() + p.db = nil + return err + } + return nil +} + +// Ping 检查PostgreSQL连接是否健康 +func (p *PostgresClient) Ping(ctx context.Context) error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.db == nil { + return fmt.Errorf("postgres db is nil") + } + + return p.db.PingContext(ctx) +} + +// IsConnected 检查连接状态 +func (p *PostgresClient) IsConnected() bool { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.db != nil +} + +// GetDB 获取底层的数据库连接 +func (p *PostgresClient) GetDB() *sql.DB { + p.mu.RLock() + defer p.mu.RUnlock() + + return p.db +} + +// PostgresManager 创建PostgreSQL重连管理器 +func PostgresManager(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration, config *Config) *Manager { + client := NewPostgresClient(dsn, maxConnections, maxIdleConnections, maxConnectionAge) + return NewManager(client, config) +} + From 4382d345f61fad16b4a3a1ddf2d8f024bed1aa3f Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:48 +0800 Subject: [PATCH 5/7] Add Reconnect package for managing automatic reconnections, including a Reconnectable interface, configuration options, and a Manager for connection monitoring and retries. --- reconnect.go | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 reconnect.go diff --git a/reconnect.go b/reconnect.go new file mode 100644 index 0000000..2a84ef8 --- /dev/null +++ b/reconnect.go @@ -0,0 +1,162 @@ +package reconnect + +import ( + "context" + "errors" + "fmt" + "time" +) + +// Reconnectable 定义可重连的接口 +type Reconnectable interface { + // Connect 建立连接 + Connect(ctx context.Context) error + // Close 关闭连接 + Close() error + // Ping 检查连接是否健康 + Ping(ctx context.Context) error + // IsConnected 检查连接状态 + IsConnected() bool +} + +// Config 重连配置 +type Config struct { + // MaxRetries 最大重试次数,0表示无限重试 + MaxRetries int + // RetryInterval 重试间隔 + RetryInterval time.Duration + // Timeout 连接超时时间 + Timeout time.Duration + // OnReconnect 重连成功后的回调函数 + OnReconnect func() + // OnDisconnect 断开连接时的回调函数 + OnDisconnect func(error) +} + +// DefaultConfig 返回默认配置 +func DefaultConfig() *Config { + return &Config{ + MaxRetries: 0, // 无限重试 + RetryInterval: 3 * time.Second, + Timeout: 10 * time.Second, + } +} + +// Manager 重连管理器 +type Manager struct { + client Reconnectable + config *Config + ctx context.Context + cancel context.CancelFunc + connected bool + errCh chan error +} + +// NewManager 创建新的重连管理器 +func NewManager(client Reconnectable, config *Config) *Manager { + if config == nil { + config = DefaultConfig() + } + ctx, cancel := context.WithCancel(context.Background()) + return &Manager{ + client: client, + config: config, + ctx: ctx, + cancel: cancel, + errCh: make(chan error, 1), + } +} + +// Start 启动连接并开始监控 +func (m *Manager) Start(ctx context.Context) error { + // 初始连接 + if err := m.connect(ctx); err != nil { + return fmt.Errorf("initial connection failed: %w", err) + } + + // 启动监控 goroutine + go m.monitor() + + return nil +} + +// Stop 停止重连管理器 +func (m *Manager) Stop() error { + m.cancel() + if m.client != nil { + return m.client.Close() + } + return nil +} + +// connect 执行连接 +func (m *Manager) connect(ctx context.Context) error { + connectCtx, cancel := context.WithTimeout(ctx, m.config.Timeout) + defer cancel() + + if err := m.client.Connect(connectCtx); err != nil { + return err + } + + m.connected = true + if m.config.OnReconnect != nil { + m.config.OnReconnect() + } + return nil +} + +// monitor 监控连接状态并自动重连 +func (m *Manager) monitor() { + ticker := time.NewTicker(m.config.RetryInterval) + defer ticker.Stop() + + retryCount := 0 + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + // 检查连接状态 + if !m.client.IsConnected() { + m.connected = false + if m.config.OnDisconnect != nil { + m.config.OnDisconnect(errors.New("connection lost")) + } + + // 检查是否超过最大重试次数 + if m.config.MaxRetries > 0 && retryCount >= m.config.MaxRetries { + m.errCh <- fmt.Errorf("max retries (%d) exceeded", m.config.MaxRetries) + return + } + + // 尝试重连 + if err := m.connect(context.Background()); err != nil { + retryCount++ + continue + } + + // 重连成功,重置计数器 + retryCount = 0 + } else { + // 连接正常,执行健康检查 + if err := m.client.Ping(context.Background()); err != nil { + m.connected = false + if m.config.OnDisconnect != nil { + m.config.OnDisconnect(err) + } + } + } + } + } +} + +// IsConnected 返回当前连接状态 +func (m *Manager) IsConnected() bool { + return m.connected && m.client.IsConnected() +} + +// WaitForError 等待错误(用于阻塞等待) +func (m *Manager) WaitForError() error { + return <-m.errCh +} From 597e1cf840125bfa7fffcad289c6b727de156208 Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:48:54 +0800 Subject: [PATCH 6/7] Add RedisClient implementation for managing Redis connections, including methods for connecting, closing, pinging, and checking connection status. --- redis.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 redis.go diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..d831b86 --- /dev/null +++ b/redis.go @@ -0,0 +1,88 @@ +package reconnect + +import ( + "context" + "fmt" + "sync" + + "github.com/redis/go-redis/v9" +) + +// RedisClient Redis客户端包装器 +type RedisClient struct { + options *redis.Options + client *redis.Client + mu sync.RWMutex +} + +// NewRedisClient 创建新的Redis客户端 +func NewRedisClient(options *redis.Options) *RedisClient { + return &RedisClient{ + options: options, + } +} + +// Connect 建立Redis连接 +func (r *RedisClient) Connect(ctx context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.client != nil { + _ = r.client.Close() + } + + client := redis.NewClient(r.options) + if err := client.Ping(ctx).Err(); err != nil { + return fmt.Errorf("redis ping failed: %w", err) + } + + r.client = client + return nil +} + +// Close 关闭Redis连接 +func (r *RedisClient) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.client != nil { + err := r.client.Close() + r.client = nil + return err + } + return nil +} + +// Ping 检查Redis连接是否健康 +func (r *RedisClient) Ping(ctx context.Context) error { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.client == nil { + return fmt.Errorf("redis client is nil") + } + + return r.client.Ping(ctx).Err() +} + +// IsConnected 检查连接状态 +func (r *RedisClient) IsConnected() bool { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.client != nil +} + +// GetClient 获取底层的Redis客户端 +func (r *RedisClient) GetClient() *redis.Client { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.client +} + +// RedisManager 创建Redis重连管理器 +func RedisManager(options *redis.Options, config *Config) *Manager { + client := NewRedisClient(options) + return NewManager(client, config) +} From c68b77b1b7f99977160af6c330a3e4b7a7c05c8a Mon Sep 17 00:00:00 2001 From: mayiming <1627832236@qq.com> Date: Fri, 12 Dec 2025 15:49:00 +0800 Subject: [PATCH 7/7] Add example implementations for etcd, PostgreSQL, and Redis clients demonstrating automatic reconnection management and basic operations. --- examples/etcd_example.go | 73 ++++++++++++++++++++++++++++++++++++ examples/postgres_example.go | 65 ++++++++++++++++++++++++++++++++ examples/redis_example.go | 70 ++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 examples/etcd_example.go create mode 100644 examples/postgres_example.go create mode 100644 examples/redis_example.go diff --git a/examples/etcd_example.go b/examples/etcd_example.go new file mode 100644 index 0000000..1274244 --- /dev/null +++ b/examples/etcd_example.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/blueocean-go/reconnect" +) + +func main() { + // etcd配置 + etcdConfig := clientv3.Config{ + Endpoints: []string{"localhost:2379"}, + DialTimeout: 5 * time.Second, + } + + // 创建重连配置 + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.MaxRetries = 0 // 无限重试 + config.OnReconnect = func() { + fmt.Println("etcd重连成功!") + } + config.OnDisconnect = func(err error) { + fmt.Printf("etcd连接断开: %v\n", err) + } + + // 创建重连管理器 + manager := reconnect.EtcdManager(etcdConfig, config) + + // 启动连接 + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatalf("启动失败: %v", err) + } + defer manager.Stop() + + // 获取etcd客户端并使用 + etcdClient := reconnect.NewEtcdClient(etcdConfig) + if err := etcdClient.Connect(ctx); err != nil { + log.Fatalf("连接失败: %v", err) + } + + client := etcdClient.GetClient() + if client == nil { + log.Fatal("客户端为空") + } + + // 使用etcd客户端 + kv := clientv3.NewKV(client) + _, err := kv.Put(ctx, "key", "value") + if err != nil { + log.Printf("设置键值失败: %v", err) + } else { + fmt.Println("设置键值成功") + } + + resp, err := kv.Get(ctx, "key") + if err != nil { + log.Printf("获取键值失败: %v", err) + } else { + if len(resp.Kvs) > 0 { + fmt.Printf("获取键值: %s = %s\n", resp.Kvs[0].Key, resp.Kvs[0].Value) + } + } + + // 保持运行 + select {} +} + diff --git a/examples/postgres_example.go b/examples/postgres_example.go new file mode 100644 index 0000000..e380979 --- /dev/null +++ b/examples/postgres_example.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/blueocean-go/reconnect" +) + +func main() { + // PostgreSQL连接字符串 + dsn := "host=localhost user=postgres password=postgres dbname=testdb sslmode=disable" + + // 创建重连配置 + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.MaxRetries = 0 // 无限重试 + config.OnReconnect = func() { + fmt.Println("PostgreSQL重连成功!") + } + config.OnDisconnect = func(err error) { + fmt.Printf("PostgreSQL连接断开: %v\n", err) + } + + // 创建重连管理器 + manager := reconnect.PostgresManager( + dsn, + 25, // maxConnections + 5, // maxIdleConnections + 30*time.Minute, // maxConnectionAge + config, + ) + + // 启动连接 + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatalf("启动失败: %v", err) + } + defer manager.Stop() + + // 获取数据库连接并使用 + pgClient := reconnect.NewPostgresClient(dsn, 25, 5, 30*time.Minute) + if err := pgClient.Connect(ctx); err != nil { + log.Fatalf("连接失败: %v", err) + } + + db := pgClient.GetDB() + if db == nil { + log.Fatal("数据库连接为空") + } + + // 使用数据库连接 + var version string + err := db.QueryRow("SELECT version()").Scan(&version) + if err != nil { + log.Printf("查询失败: %v", err) + } else { + fmt.Printf("PostgreSQL版本: %s\n", version) + } + + // 保持运行 + select {} +} diff --git a/examples/redis_example.go b/examples/redis_example.go new file mode 100644 index 0000000..a364714 --- /dev/null +++ b/examples/redis_example.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/blueocean-go/reconnect" + "github.com/redis/go-redis/v9" +) + +func main() { + // 创建Redis选项 + options := &redis.Options{ + Addr: "localhost:6379", + Password: "", // 无密码 + DB: 0, + } + + // 创建重连配置 + config := reconnect.DefaultConfig() + config.RetryInterval = 3 * time.Second + config.MaxRetries = 0 // 无限重试 + config.OnReconnect = func() { + fmt.Println("Redis重连成功!") + } + config.OnDisconnect = func(err error) { + fmt.Printf("Redis连接断开: %v\n", err) + } + + // 创建重连管理器 + manager := reconnect.RedisManager(options, config) + + // 启动连接 + ctx := context.Background() + if err := manager.Start(ctx); err != nil { + log.Fatalf("启动失败: %v", err) + } + defer manager.Stop() + + // 获取Redis客户端并使用 + redisClient := reconnect.NewRedisClient(options) + if err := redisClient.Connect(ctx); err != nil { + log.Fatalf("连接失败: %v", err) + } + + client := redisClient.GetClient() + if client == nil { + log.Fatal("客户端为空") + } + + // 使用Redis客户端 + err := client.Set(ctx, "key", "value", 0).Err() + if err != nil { + log.Printf("设置键值失败: %v", err) + } else { + fmt.Println("设置键值成功") + } + + val, err := client.Get(ctx, "key").Result() + if err != nil { + log.Printf("获取键值失败: %v", err) + } else { + fmt.Printf("获取键值: %s\n", val) + } + + // 保持运行 + select {} +}