127 lines
2.4 KiB
Go
127 lines
2.4 KiB
Go
package reconnect
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/go-redis/redis/v8"
|
|
)
|
|
|
|
// RedisClientConfig Redis客户端配置
|
|
type RedisClientConfig struct {
|
|
// Options Redis连接选项
|
|
Options *redis.Options
|
|
// ReconnectConfig 重连配置
|
|
ReconnectConfig Config
|
|
}
|
|
|
|
// RedisClient 带重连功能的Redis客户端
|
|
type RedisClient struct {
|
|
config RedisClientConfig
|
|
client *redis.Client
|
|
manager *ConnectionManager
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// redisConnector Redis连接器
|
|
type redisConnector struct {
|
|
client *RedisClient
|
|
}
|
|
|
|
func (r *redisConnector) Connect(ctx context.Context) error {
|
|
r.client.mu.Lock()
|
|
defer r.client.mu.Unlock()
|
|
|
|
rdb := redis.NewClient(r.client.config.Options)
|
|
|
|
// 验证连接
|
|
if err := rdb.Ping(ctx).Err(); err != nil {
|
|
rdb.Close()
|
|
return err
|
|
}
|
|
|
|
r.client.client = rdb
|
|
return nil
|
|
}
|
|
|
|
func (r *redisConnector) Close() error {
|
|
r.client.mu.Lock()
|
|
defer r.client.mu.Unlock()
|
|
|
|
if r.client.client != nil {
|
|
err := r.client.client.Close()
|
|
r.client.client = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// redisHealthChecker Redis健康检查器
|
|
type redisHealthChecker struct {
|
|
client *RedisClient
|
|
}
|
|
|
|
func (r *redisHealthChecker) HealthCheck(ctx context.Context) error {
|
|
r.client.mu.RLock()
|
|
rdb := r.client.client
|
|
r.client.mu.RUnlock()
|
|
|
|
if rdb == nil {
|
|
return context.Canceled
|
|
}
|
|
|
|
return rdb.Ping(ctx).Err()
|
|
}
|
|
|
|
// NewRedisClient 创建带重连功能的Redis客户端
|
|
func NewRedisClient(cfg RedisClientConfig) (*RedisClient, error) {
|
|
client := &RedisClient{
|
|
config: cfg,
|
|
}
|
|
|
|
connector := &redisConnector{client: client}
|
|
checker := &redisHealthChecker{client: client}
|
|
|
|
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
|
|
|
// 首次连接
|
|
ctx := context.Background()
|
|
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// GetClient 获取Redis客户端
|
|
func (c *RedisClient) GetClient() *redis.Client {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.client
|
|
}
|
|
|
|
// State 获取连接状态
|
|
func (c *RedisClient) State() ConnectionState {
|
|
return c.manager.State()
|
|
}
|
|
|
|
// Close 关闭客户端
|
|
func (c *RedisClient) Close() error {
|
|
return c.manager.Close()
|
|
}
|
|
|
|
// TriggerReconnect 手动触发重连
|
|
func (c *RedisClient) TriggerReconnect() {
|
|
c.manager.TriggerReconnect()
|
|
}
|
|
|
|
// Ping 执行Ping命令
|
|
func (c *RedisClient) Ping(ctx context.Context) error {
|
|
client := c.GetClient()
|
|
if client == nil {
|
|
return context.Canceled
|
|
}
|
|
return client.Ping(ctx).Err()
|
|
}
|
|
|