commit 96858c70e686022d2d29dbdd1545fb296a419e9c Author: ray Date: Thu Dec 18 15:11:02 2025 +0800 init diff --git a/config.go b/config.go new file mode 100644 index 0000000..5518f8d --- /dev/null +++ b/config.go @@ -0,0 +1,117 @@ +package reconnect + +import "time" + +// StrategyType 定义重连策略类型 +type StrategyType string + +const ( + // StrategyExponentialBackoff 指数退避策略 + StrategyExponentialBackoff StrategyType = "exponential" + // StrategyFixedInterval 固定间隔策略 + StrategyFixedInterval StrategyType = "fixed" + // StrategyLinearBackoff 线性退避策略 + StrategyLinearBackoff StrategyType = "linear" +) + +// ConnectionState 连接状态 +type ConnectionState int + +const ( + // StateDisconnected 断开连接 + StateDisconnected ConnectionState = iota + // StateConnecting 正在连接 + StateConnecting + // StateConnected 已连接 + StateConnected + // StateReconnecting 正在重连 + StateReconnecting +) + +func (s ConnectionState) String() string { + switch s { + case StateDisconnected: + return "disconnected" + case StateConnecting: + return "connecting" + case StateConnected: + return "connected" + case StateReconnecting: + return "reconnecting" + default: + return "unknown" + } +} + +// Config 重连配置 +type Config struct { + // Strategy 重连策略类型 + Strategy StrategyType + + // MaxRetries 最大重试次数,-1 表示无限重试 + MaxRetries int + + // InitialDelay 初始延迟时间 + InitialDelay time.Duration + + // MaxDelay 最大延迟时间 + MaxDelay time.Duration + + // Multiplier 指数退避乘数(仅用于指数退避策略) + Multiplier float64 + + // LinearIncrement 线性增量(仅用于线性退避策略) + LinearIncrement time.Duration + + // HealthCheckInterval 健康检查间隔 + HealthCheckInterval time.Duration + + // HealthCheckTimeout 健康检查超时时间 + HealthCheckTimeout time.Duration + + // OnStateChange 状态变化回调函数 + OnStateChange func(oldState, newState ConnectionState) + + // OnReconnect 重连回调函数(重连成功时调用) + OnReconnect func(attempt int) + + // OnError 错误回调函数 + OnError func(err error) +} + +// DefaultConfig 返回默认配置 +func DefaultConfig() Config { + return Config{ + Strategy: StrategyExponentialBackoff, + MaxRetries: -1, // 无限重试 + InitialDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + Multiplier: 2.0, + LinearIncrement: 1 * time.Second, + HealthCheckInterval: 10 * time.Second, + HealthCheckTimeout: 5 * time.Second, + } +} + +// Validate 验证配置 +func (c *Config) Validate() { + if c.InitialDelay <= 0 { + c.InitialDelay = 1 * time.Second + } + if c.MaxDelay <= 0 { + c.MaxDelay = 30 * time.Second + } + if c.Multiplier <= 0 { + c.Multiplier = 2.0 + } + if c.LinearIncrement <= 0 { + c.LinearIncrement = 1 * time.Second + } + if c.HealthCheckInterval <= 0 { + c.HealthCheckInterval = 10 * time.Second + } + if c.HealthCheckTimeout <= 0 { + c.HealthCheckTimeout = 5 * time.Second + } +} + diff --git a/database.go b/database.go new file mode 100644 index 0000000..cd99b3b --- /dev/null +++ b/database.go @@ -0,0 +1,170 @@ +package reconnect + +import ( + "context" + "database/sql" + "sync" + "time" + + "gorm.io/gorm" +) + +// DatabaseConfig 数据库配置 +type DatabaseConfig struct { + // Dialector gorm dialector(如 postgres.Open(dsn), mysql.Open(dsn)) + Dialector gorm.Dialector + // GormConfig gorm配置 + GormConfig *gorm.Config + // MaxIdleConns 最大空闲连接数 + MaxIdleConns int + // MaxOpenConns 最大打开连接数 + MaxOpenConns int + // ConnMaxLifetime 连接最大生命周期 + ConnMaxLifetime time.Duration + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// DatabaseClient 带重连功能的数据库客户端 +type DatabaseClient struct { + config DatabaseConfig + db *gorm.DB + sqlDB *sql.DB + manager *ConnectionManager + mu sync.RWMutex +} + +// databaseConnector 数据库连接器 +type databaseConnector struct { + client *DatabaseClient +} + +func (d *databaseConnector) Connect(ctx context.Context) error { + d.client.mu.Lock() + defer d.client.mu.Unlock() + + gormConfig := d.client.config.GormConfig + if gormConfig == nil { + gormConfig = &gorm.Config{} + } + + db, err := gorm.Open(d.client.config.Dialector, gormConfig) + if err != nil { + return err + } + + sqlDB, err := db.DB() + if err != nil { + return err + } + + // 配置连接池 + if d.client.config.MaxIdleConns > 0 { + sqlDB.SetMaxIdleConns(d.client.config.MaxIdleConns) + } + if d.client.config.MaxOpenConns > 0 { + sqlDB.SetMaxOpenConns(d.client.config.MaxOpenConns) + } + if d.client.config.ConnMaxLifetime > 0 { + sqlDB.SetConnMaxLifetime(d.client.config.ConnMaxLifetime) + } + + // 验证连接 + if err := sqlDB.PingContext(ctx); err != nil { + sqlDB.Close() + return err + } + + d.client.db = db + d.client.sqlDB = sqlDB + return nil +} + +func (d *databaseConnector) Close() error { + d.client.mu.Lock() + defer d.client.mu.Unlock() + + if d.client.sqlDB != nil { + err := d.client.sqlDB.Close() + d.client.db = nil + d.client.sqlDB = nil + return err + } + return nil +} + +// databaseHealthChecker 数据库健康检查器 +type databaseHealthChecker struct { + client *DatabaseClient +} + +func (d *databaseHealthChecker) HealthCheck(ctx context.Context) error { + d.client.mu.RLock() + sqlDB := d.client.sqlDB + d.client.mu.RUnlock() + + if sqlDB == nil { + return context.Canceled + } + + return sqlDB.PingContext(ctx) +} + +// NewDatabaseClient 创建带重连功能的数据库客户端 +func NewDatabaseClient(cfg DatabaseConfig) (*DatabaseClient, error) { + client := &DatabaseClient{ + config: cfg, + } + + connector := &databaseConnector{client: client} + checker := &databaseHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetDB 获取gorm.DB实例 +func (c *DatabaseClient) GetDB() *gorm.DB { + c.mu.RLock() + defer c.mu.RUnlock() + return c.db +} + +// GetSqlDB 获取sql.DB实例 +func (c *DatabaseClient) GetSqlDB() *sql.DB { + c.mu.RLock() + defer c.mu.RUnlock() + return c.sqlDB +} + +// State 获取连接状态 +func (c *DatabaseClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *DatabaseClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *DatabaseClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// Ping 执行Ping命令 +func (c *DatabaseClient) Ping(ctx context.Context) error { + sqlDB := c.GetSqlDB() + if sqlDB == nil { + return context.Canceled + } + return sqlDB.PingContext(ctx) +} + diff --git a/etcd.go b/etcd.go new file mode 100644 index 0000000..eae982e --- /dev/null +++ b/etcd.go @@ -0,0 +1,144 @@ +package reconnect + +import ( + "context" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EtcdClientConfig etcd客户端配置 +type EtcdClientConfig struct { + // Endpoints etcd服务地址列表 + Endpoints []string + // DialTimeout 连接超时时间 + DialTimeout time.Duration + // Username 用户名(可选) + Username string + // Password 密码(可选) + Password string + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// EtcdClient 带重连功能的etcd客户端 +type EtcdClient struct { + config EtcdClientConfig + client *clientv3.Client + manager *ConnectionManager + mu sync.RWMutex +} + +// etcdConnector etcd连接器 +type etcdConnector struct { + client *EtcdClient +} + +func (e *etcdConnector) Connect(ctx context.Context) error { + e.client.mu.Lock() + defer e.client.mu.Unlock() + + cfg := clientv3.Config{ + Endpoints: e.client.config.Endpoints, + DialTimeout: e.client.config.DialTimeout, + Username: e.client.config.Username, + Password: e.client.config.Password, + } + + if cfg.DialTimeout == 0 { + cfg.DialTimeout = 5 * time.Second + } + + cli, err := clientv3.New(cfg) + if err != nil { + return err + } + + // 验证连接 + ctx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) + defer cancel() + + _, err = cli.Status(ctx, cfg.Endpoints[0]) + if err != nil { + cli.Close() + return err + } + + e.client.client = cli + return nil +} + +func (e *etcdConnector) Close() error { + e.client.mu.Lock() + defer e.client.mu.Unlock() + + if e.client.client != nil { + err := e.client.client.Close() + e.client.client = nil + return err + } + return nil +} + +// etcdHealthChecker etcd健康检查器 +type etcdHealthChecker struct { + client *EtcdClient +} + +func (e *etcdHealthChecker) HealthCheck(ctx context.Context) error { + e.client.mu.RLock() + cli := e.client.client + endpoints := e.client.config.Endpoints + e.client.mu.RUnlock() + + if cli == nil || len(endpoints) == 0 { + return context.Canceled + } + + _, err := cli.Status(ctx, endpoints[0]) + return err +} + +// NewEtcdClient 创建带重连功能的etcd客户端 +func NewEtcdClient(cfg EtcdClientConfig) (*EtcdClient, error) { + client := &EtcdClient{ + config: cfg, + } + + connector := &etcdConnector{client: client} + checker := &etcdHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetClient 获取etcd客户端 +func (c *EtcdClient) GetClient() *clientv3.Client { + c.mu.RLock() + defer c.mu.RUnlock() + return c.client +} + +// State 获取连接状态 +func (c *EtcdClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *EtcdClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *EtcdClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4f01de7 --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module blueocean.local/reconnect + +go 1.24.0 + +require ( + github.com/go-redis/redis/v8 v8.11.5 + go.etcd.io/etcd/client/v3 v3.6.5 + google.golang.org/grpc v1.75.1 + gorm.io/gorm v1.31.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + go.etcd.io/etcd/api/v3 v3.6.5 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..07e50bf --- /dev/null +++ b/go.sum @@ -0,0 +1,121 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/etcd/api/v3 v3.6.5 h1:pMMc42276sgR1j1raO/Qv3QI9Af/AuyQUW6CBAWuntA= +go.etcd.io/etcd/api/v3 v3.6.5/go.mod h1:ob0/oWA/UQQlT1BmaEkWQzI0sJ1M0Et0mMpaABxguOQ= +go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3RrBP8= +go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk= +go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U= +go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 h1:FiusG7LWj+4byqhbvmB+Q93B/mOxJLN2DTozDuZm4EU= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/grpc.go b/grpc.go new file mode 100644 index 0000000..26d0a3d --- /dev/null +++ b/grpc.go @@ -0,0 +1,162 @@ +package reconnect + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +// GRPCClientConfig gRPC客户端配置 +type GRPCClientConfig struct { + // Target gRPC服务地址 + Target string + // DialOptions gRPC拨号选项 + DialOptions []grpc.DialOption + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// GRPCClient 带重连功能的gRPC客户端 +type GRPCClient struct { + config GRPCClientConfig + conn *grpc.ClientConn + manager *ConnectionManager + mu sync.RWMutex +} + +// grpcConnector gRPC连接器 +type grpcConnector struct { + client *GRPCClient +} + +func (g *grpcConnector) Connect(ctx context.Context) error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + opts := g.client.config.DialOptions + if len(opts) == 0 { + opts = []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + } + + conn, err := grpc.NewClient(g.client.config.Target, opts...) + if err != nil { + return err + } + + // 等待连接就绪 + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + conn.Connect() + for { + state := conn.GetState() + if state == connectivity.Ready { + break + } + if state == connectivity.TransientFailure || state == connectivity.Shutdown { + conn.Close() + return context.DeadlineExceeded + } + if !conn.WaitForStateChange(ctx, state) { + conn.Close() + return ctx.Err() + } + } + + g.client.conn = conn + return nil +} + +func (g *grpcConnector) Close() error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + if g.client.conn != nil { + err := g.client.conn.Close() + g.client.conn = nil + return err + } + return nil +} + +// grpcHealthChecker gRPC健康检查器 +type grpcHealthChecker struct { + client *GRPCClient +} + +func (g *grpcHealthChecker) HealthCheck(ctx context.Context) error { + g.client.mu.RLock() + conn := g.client.conn + g.client.mu.RUnlock() + + if conn == nil { + return context.Canceled + } + + state := conn.GetState() + if state == connectivity.Ready || state == connectivity.Idle { + return nil + } + + // 尝试连接并等待 + conn.Connect() + if !conn.WaitForStateChange(ctx, state) { + return ctx.Err() + } + + newState := conn.GetState() + if newState != connectivity.Ready && newState != connectivity.Idle { + return context.DeadlineExceeded + } + + return nil +} + +// NewGRPCClient 创建带重连功能的gRPC客户端 +func NewGRPCClient(cfg GRPCClientConfig) (*GRPCClient, error) { + client := &GRPCClient{ + config: cfg, + } + + connector := &grpcConnector{client: client} + checker := &grpcHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetConn 获取gRPC连接 +func (c *GRPCClient) GetConn() *grpc.ClientConn { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// State 获取连接状态 +func (c *GRPCClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *GRPCClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *GRPCClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..7d6913d --- /dev/null +++ b/manager.go @@ -0,0 +1,226 @@ +package reconnect + +import ( + "context" + "log" + "sync" + "sync/atomic" + "time" +) + +// HealthChecker 健康检查接口 +type HealthChecker interface { + // HealthCheck 执行健康检查,返回 nil 表示健康 + HealthCheck(ctx context.Context) error +} + +// Connector 连接器接口 +type Connector interface { + // Connect 建立连接 + Connect(ctx context.Context) error + // Close 关闭连接 + Close() error +} + +// ConnectionManager 连接管理器 +type ConnectionManager struct { + config Config + strategy Strategy + connector Connector + checker HealthChecker + + state atomic.Int32 + attempts atomic.Int32 + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + closeOnce sync.Once +} + +// NewConnectionManager 创建连接管理器 +func NewConnectionManager(connector Connector, checker HealthChecker, cfg Config) *ConnectionManager { + cfg.Validate() + ctx, cancel := context.WithCancel(context.Background()) + + cm := &ConnectionManager{ + config: cfg, + strategy: NewStrategy(cfg), + connector: connector, + checker: checker, + ctx: ctx, + cancel: cancel, + } + + cm.state.Store(int32(StateDisconnected)) + return cm +} + +// State 获取当前连接状态 +func (cm *ConnectionManager) State() ConnectionState { + return ConnectionState(cm.state.Load()) +} + +// setState 设置连接状态 +func (cm *ConnectionManager) setState(newState ConnectionState) { + oldState := ConnectionState(cm.state.Swap(int32(newState))) + if oldState != newState && cm.config.OnStateChange != nil { + cm.config.OnStateChange(oldState, newState) + } +} + +// Connect 建立连接并启动健康检查 +func (cm *ConnectionManager) Connect(ctx context.Context) error { + cm.setState(StateConnecting) + + if err := cm.connector.Connect(ctx); err != nil { + cm.setState(StateDisconnected) + if cm.config.OnError != nil { + cm.config.OnError(err) + } + return err + } + + cm.setState(StateConnected) + cm.strategy.Reset() + cm.attempts.Store(0) + + // 启动健康检查 + if cm.checker != nil && cm.config.HealthCheckInterval > 0 { + cm.wg.Add(1) + go cm.healthCheckLoop() + } + + return nil +} + +// ConnectWithRetry 带重试的连接 +func (cm *ConnectionManager) ConnectWithRetry(ctx context.Context) error { + cm.setState(StateConnecting) + + for { + select { + case <-ctx.Done(): + cm.setState(StateDisconnected) + return ctx.Err() + case <-cm.ctx.Done(): + cm.setState(StateDisconnected) + return cm.ctx.Err() + default: + } + + attempt := int(cm.attempts.Add(1)) + + if err := cm.connector.Connect(ctx); err != nil { + if cm.config.OnError != nil { + cm.config.OnError(err) + } + + // 检查是否超过最大重试次数 + if cm.config.MaxRetries >= 0 && attempt >= cm.config.MaxRetries { + cm.setState(StateDisconnected) + return err + } + + delay := cm.strategy.NextDelay(attempt) + log.Printf("[reconnect] 连接失败 (尝试 %d): %v, %v 后重试...", attempt, err, delay) + + select { + case <-ctx.Done(): + cm.setState(StateDisconnected) + return ctx.Err() + case <-cm.ctx.Done(): + cm.setState(StateDisconnected) + return cm.ctx.Err() + case <-time.After(delay): + continue + } + } + + // 连接成功 + cm.setState(StateConnected) + cm.strategy.Reset() + cm.attempts.Store(0) + + if cm.config.OnReconnect != nil && attempt > 1 { + cm.config.OnReconnect(attempt) + } + + // 启动健康检查 + if cm.checker != nil && cm.config.HealthCheckInterval > 0 { + cm.wg.Add(1) + go cm.healthCheckLoop() + } + + log.Printf("[reconnect] 连接成功") + return nil + } +} + +// healthCheckLoop 健康检查循环 +func (cm *ConnectionManager) healthCheckLoop() { + defer cm.wg.Done() + + ticker := time.NewTicker(cm.config.HealthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-cm.ctx.Done(): + return + case <-ticker.C: + if cm.State() != StateConnected { + continue + } + + ctx, cancel := context.WithTimeout(cm.ctx, cm.config.HealthCheckTimeout) + err := cm.checker.HealthCheck(ctx) + cancel() + + if err != nil { + log.Printf("[reconnect] 健康检查失败: %v, 开始重连...", err) + if cm.config.OnError != nil { + cm.config.OnError(err) + } + cm.reconnect() + } + } + } +} + +// reconnect 执行重连 +func (cm *ConnectionManager) reconnect() { + if cm.State() == StateReconnecting { + return // 已经在重连中 + } + + cm.setState(StateReconnecting) + + // 关闭旧连接 + _ = cm.connector.Close() + + // 重新连接 + go func() { + if err := cm.ConnectWithRetry(cm.ctx); err != nil { + log.Printf("[reconnect] 重连失败: %v", err) + } + }() +} + +// Close 关闭连接管理器 +func (cm *ConnectionManager) Close() error { + var err error + cm.closeOnce.Do(func() { + cm.cancel() + cm.wg.Wait() + err = cm.connector.Close() + cm.setState(StateDisconnected) + }) + return err +} + +// TriggerReconnect 手动触发重连 +func (cm *ConnectionManager) TriggerReconnect() { + cm.reconnect() +} + diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..86b25b8 --- /dev/null +++ b/redis.go @@ -0,0 +1,126 @@ +package reconnect + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" +) + +// RedisClientConfig Redis客户端配置 +type RedisClientConfig struct { + // Options Redis连接选项 + Options *redis.Options + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// RedisClient 带重连功能的Redis客户端 +type RedisClient struct { + config RedisClientConfig + client *redis.Client + manager *ConnectionManager + mu sync.RWMutex +} + +// redisConnector Redis连接器 +type redisConnector struct { + client *RedisClient +} + +func (r *redisConnector) Connect(ctx context.Context) error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + rdb := redis.NewClient(r.client.config.Options) + + // 验证连接 + if err := rdb.Ping(ctx).Err(); err != nil { + rdb.Close() + return err + } + + r.client.client = rdb + return nil +} + +func (r *redisConnector) Close() error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + if r.client.client != nil { + err := r.client.client.Close() + r.client.client = nil + return err + } + return nil +} + +// redisHealthChecker Redis健康检查器 +type redisHealthChecker struct { + client *RedisClient +} + +func (r *redisHealthChecker) HealthCheck(ctx context.Context) error { + r.client.mu.RLock() + rdb := r.client.client + r.client.mu.RUnlock() + + if rdb == nil { + return context.Canceled + } + + return rdb.Ping(ctx).Err() +} + +// NewRedisClient 创建带重连功能的Redis客户端 +func NewRedisClient(cfg RedisClientConfig) (*RedisClient, error) { + client := &RedisClient{ + config: cfg, + } + + connector := &redisConnector{client: client} + checker := &redisHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetClient 获取Redis客户端 +func (c *RedisClient) GetClient() *redis.Client { + c.mu.RLock() + defer c.mu.RUnlock() + return c.client +} + +// State 获取连接状态 +func (c *RedisClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *RedisClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *RedisClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// Ping 执行Ping命令 +func (c *RedisClient) Ping(ctx context.Context) error { + client := c.GetClient() + if client == nil { + return context.Canceled + } + return client.Ping(ctx).Err() +} + diff --git a/strategy.go b/strategy.go new file mode 100644 index 0000000..c702c87 --- /dev/null +++ b/strategy.go @@ -0,0 +1,125 @@ +package reconnect + +import "time" + +// Strategy 重连策略接口 +type Strategy interface { + // NextDelay 返回下一次重试的延迟时间 + // attempt 是当前重试次数(从1开始) + NextDelay(attempt int) time.Duration + + // Reset 重置策略状态 + Reset() +} + +// ExponentialBackoff 指数退避策略 +type ExponentialBackoff struct { + initialDelay time.Duration + maxDelay time.Duration + multiplier float64 +} + +// NewExponentialBackoff 创建指数退避策略 +func NewExponentialBackoff(initialDelay, maxDelay time.Duration, multiplier float64) *ExponentialBackoff { + if multiplier <= 0 { + multiplier = 2.0 + } + return &ExponentialBackoff{ + initialDelay: initialDelay, + maxDelay: maxDelay, + multiplier: multiplier, + } +} + +// NextDelay 计算下一次延迟时间 +func (e *ExponentialBackoff) NextDelay(attempt int) time.Duration { + if attempt <= 0 { + attempt = 1 + } + delay := float64(e.initialDelay) + for i := 1; i < attempt; i++ { + delay *= e.multiplier + if time.Duration(delay) >= e.maxDelay { + return e.maxDelay + } + } + if time.Duration(delay) > e.maxDelay { + return e.maxDelay + } + return time.Duration(delay) +} + +// Reset 重置策略 +func (e *ExponentialBackoff) Reset() { + // 无状态,无需重置 +} + +// FixedInterval 固定间隔策略 +type FixedInterval struct { + interval time.Duration +} + +// NewFixedInterval 创建固定间隔策略 +func NewFixedInterval(interval time.Duration) *FixedInterval { + return &FixedInterval{ + interval: interval, + } +} + +// NextDelay 返回固定延迟时间 +func (f *FixedInterval) NextDelay(attempt int) time.Duration { + return f.interval +} + +// Reset 重置策略 +func (f *FixedInterval) Reset() { + // 无状态,无需重置 +} + +// LinearBackoff 线性退避策略 +type LinearBackoff struct { + initialDelay time.Duration + maxDelay time.Duration + increment time.Duration +} + +// NewLinearBackoff 创建线性退避策略 +func NewLinearBackoff(initialDelay, maxDelay, increment time.Duration) *LinearBackoff { + return &LinearBackoff{ + initialDelay: initialDelay, + maxDelay: maxDelay, + increment: increment, + } +} + +// NextDelay 计算下一次延迟时间 +func (l *LinearBackoff) NextDelay(attempt int) time.Duration { + if attempt <= 0 { + attempt = 1 + } + delay := l.initialDelay + time.Duration(attempt-1)*l.increment + if delay > l.maxDelay { + return l.maxDelay + } + return delay +} + +// Reset 重置策略 +func (l *LinearBackoff) Reset() { + // 无状态,无需重置 +} + +// NewStrategy 根据配置创建策略 +func NewStrategy(cfg Config) Strategy { + switch cfg.Strategy { + case StrategyFixedInterval: + return NewFixedInterval(cfg.InitialDelay) + case StrategyLinearBackoff: + return NewLinearBackoff(cfg.InitialDelay, cfg.MaxDelay, cfg.LinearIncrement) + case StrategyExponentialBackoff: + fallthrough + default: + return NewExponentialBackoff(cfg.InitialDelay, cfg.MaxDelay, cfg.Multiplier) + } +} +