From 96858c70e686022d2d29dbdd1545fb296a419e9c Mon Sep 17 00:00:00 2001 From: ray Date: Thu, 18 Dec 2025 15:11:02 +0800 Subject: [PATCH] init --- config.go | 117 +++++++++++++++++++++++++++ database.go | 170 +++++++++++++++++++++++++++++++++++++++ etcd.go | 144 +++++++++++++++++++++++++++++++++ go.mod | 32 ++++++++ go.sum | 121 ++++++++++++++++++++++++++++ grpc.go | 162 +++++++++++++++++++++++++++++++++++++ manager.go | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++++ redis.go | 126 +++++++++++++++++++++++++++++ strategy.go | 125 +++++++++++++++++++++++++++++ 9 files changed, 1223 insertions(+) create mode 100644 config.go create mode 100644 database.go create mode 100644 etcd.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 grpc.go create mode 100644 manager.go create mode 100644 redis.go create mode 100644 strategy.go diff --git a/config.go b/config.go new file mode 100644 index 0000000..5518f8d --- /dev/null +++ b/config.go @@ -0,0 +1,117 @@ +package reconnect + +import "time" + +// StrategyType 定义重连策略类型 +type StrategyType string + +const ( + // StrategyExponentialBackoff 指数退避策略 + StrategyExponentialBackoff StrategyType = "exponential" + // StrategyFixedInterval 固定间隔策略 + StrategyFixedInterval StrategyType = "fixed" + // StrategyLinearBackoff 线性退避策略 + StrategyLinearBackoff StrategyType = "linear" +) + +// ConnectionState 连接状态 +type ConnectionState int + +const ( + // StateDisconnected 断开连接 + StateDisconnected ConnectionState = iota + // StateConnecting 正在连接 + StateConnecting + // StateConnected 已连接 + StateConnected + // StateReconnecting 正在重连 + StateReconnecting +) + +func (s ConnectionState) String() string { + switch s { + case StateDisconnected: + return "disconnected" + case StateConnecting: + return "connecting" + case StateConnected: + return "connected" + case StateReconnecting: + return "reconnecting" + default: + return "unknown" + } +} + +// Config 重连配置 +type Config struct { + // Strategy 重连策略类型 + Strategy StrategyType + + // MaxRetries 最大重试次数,-1 表示无限重试 + MaxRetries int + + // InitialDelay 初始延迟时间 + InitialDelay time.Duration + + // MaxDelay 最大延迟时间 + MaxDelay time.Duration + + // Multiplier 指数退避乘数(仅用于指数退避策略) + Multiplier float64 + + // LinearIncrement 线性增量(仅用于线性退避策略) + LinearIncrement time.Duration + + // HealthCheckInterval 健康检查间隔 + HealthCheckInterval time.Duration + + // HealthCheckTimeout 健康检查超时时间 + HealthCheckTimeout time.Duration + + // OnStateChange 状态变化回调函数 + OnStateChange func(oldState, newState ConnectionState) + + // OnReconnect 重连回调函数(重连成功时调用) + OnReconnect func(attempt int) + + // OnError 错误回调函数 + OnError func(err error) +} + +// DefaultConfig 返回默认配置 +func DefaultConfig() Config { + return Config{ + Strategy: StrategyExponentialBackoff, + MaxRetries: -1, // 无限重试 + InitialDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + Multiplier: 2.0, + LinearIncrement: 1 * time.Second, + HealthCheckInterval: 10 * time.Second, + HealthCheckTimeout: 5 * time.Second, + } +} + +// Validate 验证配置 +func (c *Config) Validate() { + if c.InitialDelay <= 0 { + c.InitialDelay = 1 * time.Second + } + if c.MaxDelay <= 0 { + c.MaxDelay = 30 * time.Second + } + if c.Multiplier <= 0 { + c.Multiplier = 2.0 + } + if c.LinearIncrement <= 0 { + c.LinearIncrement = 1 * time.Second + } + if c.HealthCheckInterval <= 0 { + c.HealthCheckInterval = 10 * time.Second + } + if c.HealthCheckTimeout <= 0 { + c.HealthCheckTimeout = 5 * time.Second + } +} + diff --git a/database.go b/database.go new file mode 100644 index 0000000..cd99b3b --- /dev/null +++ b/database.go @@ -0,0 +1,170 @@ +package reconnect + +import ( + "context" + "database/sql" + "sync" + "time" + + "gorm.io/gorm" +) + +// DatabaseConfig 数据库配置 +type DatabaseConfig struct { + // Dialector gorm dialector(如 postgres.Open(dsn), mysql.Open(dsn)) + Dialector gorm.Dialector + // GormConfig gorm配置 + GormConfig *gorm.Config + // MaxIdleConns 最大空闲连接数 + MaxIdleConns int + // MaxOpenConns 最大打开连接数 + MaxOpenConns int + // ConnMaxLifetime 连接最大生命周期 + ConnMaxLifetime time.Duration + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// DatabaseClient 带重连功能的数据库客户端 +type DatabaseClient struct { + config DatabaseConfig + db *gorm.DB + sqlDB *sql.DB + manager *ConnectionManager + mu sync.RWMutex +} + +// databaseConnector 数据库连接器 +type databaseConnector struct { + client *DatabaseClient +} + +func (d *databaseConnector) Connect(ctx context.Context) error { + d.client.mu.Lock() + defer d.client.mu.Unlock() + + gormConfig := d.client.config.GormConfig + if gormConfig == nil { + gormConfig = &gorm.Config{} + } + + db, err := gorm.Open(d.client.config.Dialector, gormConfig) + if err != nil { + return err + } + + sqlDB, err := db.DB() + if err != nil { + return err + } + + // 配置连接池 + if d.client.config.MaxIdleConns > 0 { + sqlDB.SetMaxIdleConns(d.client.config.MaxIdleConns) + } + if d.client.config.MaxOpenConns > 0 { + sqlDB.SetMaxOpenConns(d.client.config.MaxOpenConns) + } + if d.client.config.ConnMaxLifetime > 0 { + sqlDB.SetConnMaxLifetime(d.client.config.ConnMaxLifetime) + } + + // 验证连接 + if err := sqlDB.PingContext(ctx); err != nil { + sqlDB.Close() + return err + } + + d.client.db = db + d.client.sqlDB = sqlDB + return nil +} + +func (d *databaseConnector) Close() error { + d.client.mu.Lock() + defer d.client.mu.Unlock() + + if d.client.sqlDB != nil { + err := d.client.sqlDB.Close() + d.client.db = nil + d.client.sqlDB = nil + return err + } + return nil +} + +// databaseHealthChecker 数据库健康检查器 +type databaseHealthChecker struct { + client *DatabaseClient +} + +func (d *databaseHealthChecker) HealthCheck(ctx context.Context) error { + d.client.mu.RLock() + sqlDB := d.client.sqlDB + d.client.mu.RUnlock() + + if sqlDB == nil { + return context.Canceled + } + + return sqlDB.PingContext(ctx) +} + +// NewDatabaseClient 创建带重连功能的数据库客户端 +func NewDatabaseClient(cfg DatabaseConfig) (*DatabaseClient, error) { + client := &DatabaseClient{ + config: cfg, + } + + connector := &databaseConnector{client: client} + checker := &databaseHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetDB 获取gorm.DB实例 +func (c *DatabaseClient) GetDB() *gorm.DB { + c.mu.RLock() + defer c.mu.RUnlock() + return c.db +} + +// GetSqlDB 获取sql.DB实例 +func (c *DatabaseClient) GetSqlDB() *sql.DB { + c.mu.RLock() + defer c.mu.RUnlock() + return c.sqlDB +} + +// State 获取连接状态 +func (c *DatabaseClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *DatabaseClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *DatabaseClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// Ping 执行Ping命令 +func (c *DatabaseClient) Ping(ctx context.Context) error { + sqlDB := c.GetSqlDB() + if sqlDB == nil { + return context.Canceled + } + return sqlDB.PingContext(ctx) +} + diff --git a/etcd.go b/etcd.go new file mode 100644 index 0000000..eae982e --- /dev/null +++ b/etcd.go @@ -0,0 +1,144 @@ +package reconnect + +import ( + "context" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EtcdClientConfig etcd客户端配置 +type EtcdClientConfig struct { + // Endpoints etcd服务地址列表 + Endpoints []string + // DialTimeout 连接超时时间 + DialTimeout time.Duration + // Username 用户名(可选) + Username string + // Password 密码(可选) + Password string + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// EtcdClient 带重连功能的etcd客户端 +type EtcdClient struct { + config EtcdClientConfig + client *clientv3.Client + manager *ConnectionManager + mu sync.RWMutex +} + +// etcdConnector etcd连接器 +type etcdConnector struct { + client *EtcdClient +} + +func (e *etcdConnector) Connect(ctx context.Context) error { + e.client.mu.Lock() + defer e.client.mu.Unlock() + + cfg := clientv3.Config{ + Endpoints: e.client.config.Endpoints, + DialTimeout: e.client.config.DialTimeout, + Username: e.client.config.Username, + Password: e.client.config.Password, + } + + if cfg.DialTimeout == 0 { + cfg.DialTimeout = 5 * time.Second + } + + cli, err := clientv3.New(cfg) + if err != nil { + return err + } + + // 验证连接 + ctx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) + defer cancel() + + _, err = cli.Status(ctx, cfg.Endpoints[0]) + if err != nil { + cli.Close() + return err + } + + e.client.client = cli + return nil +} + +func (e *etcdConnector) Close() error { + e.client.mu.Lock() + defer e.client.mu.Unlock() + + if e.client.client != nil { + err := e.client.client.Close() + e.client.client = nil + return err + } + return nil +} + +// etcdHealthChecker etcd健康检查器 +type etcdHealthChecker struct { + client *EtcdClient +} + +func (e *etcdHealthChecker) HealthCheck(ctx context.Context) error { + e.client.mu.RLock() + cli := e.client.client + endpoints := e.client.config.Endpoints + e.client.mu.RUnlock() + + if cli == nil || len(endpoints) == 0 { + return context.Canceled + } + + _, err := cli.Status(ctx, endpoints[0]) + return err +} + +// NewEtcdClient 创建带重连功能的etcd客户端 +func NewEtcdClient(cfg EtcdClientConfig) (*EtcdClient, error) { + client := &EtcdClient{ + config: cfg, + } + + connector := &etcdConnector{client: client} + checker := &etcdHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetClient 获取etcd客户端 +func (c *EtcdClient) GetClient() *clientv3.Client { + c.mu.RLock() + defer c.mu.RUnlock() + return c.client +} + +// State 获取连接状态 +func (c *EtcdClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *EtcdClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *EtcdClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4f01de7 --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module blueocean.local/reconnect + +go 1.24.0 + +require ( + github.com/go-redis/redis/v8 v8.11.5 + go.etcd.io/etcd/client/v3 v3.6.5 + google.golang.org/grpc v1.75.1 + gorm.io/gorm v1.31.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + go.etcd.io/etcd/api/v3 v3.6.5 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..07e50bf --- /dev/null +++ b/go.sum @@ -0,0 +1,121 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= +github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/etcd/api/v3 v3.6.5 h1:pMMc42276sgR1j1raO/Qv3QI9Af/AuyQUW6CBAWuntA= +go.etcd.io/etcd/api/v3 v3.6.5/go.mod h1:ob0/oWA/UQQlT1BmaEkWQzI0sJ1M0Et0mMpaABxguOQ= +go.etcd.io/etcd/client/pkg/v3 v3.6.5 h1:Duz9fAzIZFhYWgRjp/FgNq2gO1jId9Yae/rLn3RrBP8= +go.etcd.io/etcd/client/pkg/v3 v3.6.5/go.mod h1:8Wx3eGRPiy0qOFMZT/hfvdos+DjEaPxdIDiCDUv/FQk= +go.etcd.io/etcd/client/v3 v3.6.5 h1:yRwZNFBx/35VKHTcLDeO7XVLbCBFbPi+XV4OC3QJf2U= +go.etcd.io/etcd/client/v3 v3.6.5/go.mod h1:ZqwG/7TAFZ0BJ0jXRPoJjKQJtbFo/9NIY8uoFFKcCyo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 h1:FiusG7LWj+4byqhbvmB+Q93B/mOxJLN2DTozDuZm4EU= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= +gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= diff --git a/grpc.go b/grpc.go new file mode 100644 index 0000000..26d0a3d --- /dev/null +++ b/grpc.go @@ -0,0 +1,162 @@ +package reconnect + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" +) + +// GRPCClientConfig gRPC客户端配置 +type GRPCClientConfig struct { + // Target gRPC服务地址 + Target string + // DialOptions gRPC拨号选项 + DialOptions []grpc.DialOption + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// GRPCClient 带重连功能的gRPC客户端 +type GRPCClient struct { + config GRPCClientConfig + conn *grpc.ClientConn + manager *ConnectionManager + mu sync.RWMutex +} + +// grpcConnector gRPC连接器 +type grpcConnector struct { + client *GRPCClient +} + +func (g *grpcConnector) Connect(ctx context.Context) error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + opts := g.client.config.DialOptions + if len(opts) == 0 { + opts = []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + } + + conn, err := grpc.NewClient(g.client.config.Target, opts...) + if err != nil { + return err + } + + // 等待连接就绪 + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + conn.Connect() + for { + state := conn.GetState() + if state == connectivity.Ready { + break + } + if state == connectivity.TransientFailure || state == connectivity.Shutdown { + conn.Close() + return context.DeadlineExceeded + } + if !conn.WaitForStateChange(ctx, state) { + conn.Close() + return ctx.Err() + } + } + + g.client.conn = conn + return nil +} + +func (g *grpcConnector) Close() error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + if g.client.conn != nil { + err := g.client.conn.Close() + g.client.conn = nil + return err + } + return nil +} + +// grpcHealthChecker gRPC健康检查器 +type grpcHealthChecker struct { + client *GRPCClient +} + +func (g *grpcHealthChecker) HealthCheck(ctx context.Context) error { + g.client.mu.RLock() + conn := g.client.conn + g.client.mu.RUnlock() + + if conn == nil { + return context.Canceled + } + + state := conn.GetState() + if state == connectivity.Ready || state == connectivity.Idle { + return nil + } + + // 尝试连接并等待 + conn.Connect() + if !conn.WaitForStateChange(ctx, state) { + return ctx.Err() + } + + newState := conn.GetState() + if newState != connectivity.Ready && newState != connectivity.Idle { + return context.DeadlineExceeded + } + + return nil +} + +// NewGRPCClient 创建带重连功能的gRPC客户端 +func NewGRPCClient(cfg GRPCClientConfig) (*GRPCClient, error) { + client := &GRPCClient{ + config: cfg, + } + + connector := &grpcConnector{client: client} + checker := &grpcHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetConn 获取gRPC连接 +func (c *GRPCClient) GetConn() *grpc.ClientConn { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// State 获取连接状态 +func (c *GRPCClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *GRPCClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *GRPCClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..7d6913d --- /dev/null +++ b/manager.go @@ -0,0 +1,226 @@ +package reconnect + +import ( + "context" + "log" + "sync" + "sync/atomic" + "time" +) + +// HealthChecker 健康检查接口 +type HealthChecker interface { + // HealthCheck 执行健康检查,返回 nil 表示健康 + HealthCheck(ctx context.Context) error +} + +// Connector 连接器接口 +type Connector interface { + // Connect 建立连接 + Connect(ctx context.Context) error + // Close 关闭连接 + Close() error +} + +// ConnectionManager 连接管理器 +type ConnectionManager struct { + config Config + strategy Strategy + connector Connector + checker HealthChecker + + state atomic.Int32 + attempts atomic.Int32 + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + closeOnce sync.Once +} + +// NewConnectionManager 创建连接管理器 +func NewConnectionManager(connector Connector, checker HealthChecker, cfg Config) *ConnectionManager { + cfg.Validate() + ctx, cancel := context.WithCancel(context.Background()) + + cm := &ConnectionManager{ + config: cfg, + strategy: NewStrategy(cfg), + connector: connector, + checker: checker, + ctx: ctx, + cancel: cancel, + } + + cm.state.Store(int32(StateDisconnected)) + return cm +} + +// State 获取当前连接状态 +func (cm *ConnectionManager) State() ConnectionState { + return ConnectionState(cm.state.Load()) +} + +// setState 设置连接状态 +func (cm *ConnectionManager) setState(newState ConnectionState) { + oldState := ConnectionState(cm.state.Swap(int32(newState))) + if oldState != newState && cm.config.OnStateChange != nil { + cm.config.OnStateChange(oldState, newState) + } +} + +// Connect 建立连接并启动健康检查 +func (cm *ConnectionManager) Connect(ctx context.Context) error { + cm.setState(StateConnecting) + + if err := cm.connector.Connect(ctx); err != nil { + cm.setState(StateDisconnected) + if cm.config.OnError != nil { + cm.config.OnError(err) + } + return err + } + + cm.setState(StateConnected) + cm.strategy.Reset() + cm.attempts.Store(0) + + // 启动健康检查 + if cm.checker != nil && cm.config.HealthCheckInterval > 0 { + cm.wg.Add(1) + go cm.healthCheckLoop() + } + + return nil +} + +// ConnectWithRetry 带重试的连接 +func (cm *ConnectionManager) ConnectWithRetry(ctx context.Context) error { + cm.setState(StateConnecting) + + for { + select { + case <-ctx.Done(): + cm.setState(StateDisconnected) + return ctx.Err() + case <-cm.ctx.Done(): + cm.setState(StateDisconnected) + return cm.ctx.Err() + default: + } + + attempt := int(cm.attempts.Add(1)) + + if err := cm.connector.Connect(ctx); err != nil { + if cm.config.OnError != nil { + cm.config.OnError(err) + } + + // 检查是否超过最大重试次数 + if cm.config.MaxRetries >= 0 && attempt >= cm.config.MaxRetries { + cm.setState(StateDisconnected) + return err + } + + delay := cm.strategy.NextDelay(attempt) + log.Printf("[reconnect] 连接失败 (尝试 %d): %v, %v 后重试...", attempt, err, delay) + + select { + case <-ctx.Done(): + cm.setState(StateDisconnected) + return ctx.Err() + case <-cm.ctx.Done(): + cm.setState(StateDisconnected) + return cm.ctx.Err() + case <-time.After(delay): + continue + } + } + + // 连接成功 + cm.setState(StateConnected) + cm.strategy.Reset() + cm.attempts.Store(0) + + if cm.config.OnReconnect != nil && attempt > 1 { + cm.config.OnReconnect(attempt) + } + + // 启动健康检查 + if cm.checker != nil && cm.config.HealthCheckInterval > 0 { + cm.wg.Add(1) + go cm.healthCheckLoop() + } + + log.Printf("[reconnect] 连接成功") + return nil + } +} + +// healthCheckLoop 健康检查循环 +func (cm *ConnectionManager) healthCheckLoop() { + defer cm.wg.Done() + + ticker := time.NewTicker(cm.config.HealthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-cm.ctx.Done(): + return + case <-ticker.C: + if cm.State() != StateConnected { + continue + } + + ctx, cancel := context.WithTimeout(cm.ctx, cm.config.HealthCheckTimeout) + err := cm.checker.HealthCheck(ctx) + cancel() + + if err != nil { + log.Printf("[reconnect] 健康检查失败: %v, 开始重连...", err) + if cm.config.OnError != nil { + cm.config.OnError(err) + } + cm.reconnect() + } + } + } +} + +// reconnect 执行重连 +func (cm *ConnectionManager) reconnect() { + if cm.State() == StateReconnecting { + return // 已经在重连中 + } + + cm.setState(StateReconnecting) + + // 关闭旧连接 + _ = cm.connector.Close() + + // 重新连接 + go func() { + if err := cm.ConnectWithRetry(cm.ctx); err != nil { + log.Printf("[reconnect] 重连失败: %v", err) + } + }() +} + +// Close 关闭连接管理器 +func (cm *ConnectionManager) Close() error { + var err error + cm.closeOnce.Do(func() { + cm.cancel() + cm.wg.Wait() + err = cm.connector.Close() + cm.setState(StateDisconnected) + }) + return err +} + +// TriggerReconnect 手动触发重连 +func (cm *ConnectionManager) TriggerReconnect() { + cm.reconnect() +} + diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..86b25b8 --- /dev/null +++ b/redis.go @@ -0,0 +1,126 @@ +package reconnect + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" +) + +// RedisClientConfig Redis客户端配置 +type RedisClientConfig struct { + // Options Redis连接选项 + Options *redis.Options + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// RedisClient 带重连功能的Redis客户端 +type RedisClient struct { + config RedisClientConfig + client *redis.Client + manager *ConnectionManager + mu sync.RWMutex +} + +// redisConnector Redis连接器 +type redisConnector struct { + client *RedisClient +} + +func (r *redisConnector) Connect(ctx context.Context) error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + rdb := redis.NewClient(r.client.config.Options) + + // 验证连接 + if err := rdb.Ping(ctx).Err(); err != nil { + rdb.Close() + return err + } + + r.client.client = rdb + return nil +} + +func (r *redisConnector) Close() error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + if r.client.client != nil { + err := r.client.client.Close() + r.client.client = nil + return err + } + return nil +} + +// redisHealthChecker Redis健康检查器 +type redisHealthChecker struct { + client *RedisClient +} + +func (r *redisHealthChecker) HealthCheck(ctx context.Context) error { + r.client.mu.RLock() + rdb := r.client.client + r.client.mu.RUnlock() + + if rdb == nil { + return context.Canceled + } + + return rdb.Ping(ctx).Err() +} + +// NewRedisClient 创建带重连功能的Redis客户端 +func NewRedisClient(cfg RedisClientConfig) (*RedisClient, error) { + client := &RedisClient{ + config: cfg, + } + + connector := &redisConnector{client: client} + checker := &redisHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetClient 获取Redis客户端 +func (c *RedisClient) GetClient() *redis.Client { + c.mu.RLock() + defer c.mu.RUnlock() + return c.client +} + +// State 获取连接状态 +func (c *RedisClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *RedisClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *RedisClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// Ping 执行Ping命令 +func (c *RedisClient) Ping(ctx context.Context) error { + client := c.GetClient() + if client == nil { + return context.Canceled + } + return client.Ping(ctx).Err() +} + diff --git a/strategy.go b/strategy.go new file mode 100644 index 0000000..c702c87 --- /dev/null +++ b/strategy.go @@ -0,0 +1,125 @@ +package reconnect + +import "time" + +// Strategy 重连策略接口 +type Strategy interface { + // NextDelay 返回下一次重试的延迟时间 + // attempt 是当前重试次数(从1开始) + NextDelay(attempt int) time.Duration + + // Reset 重置策略状态 + Reset() +} + +// ExponentialBackoff 指数退避策略 +type ExponentialBackoff struct { + initialDelay time.Duration + maxDelay time.Duration + multiplier float64 +} + +// NewExponentialBackoff 创建指数退避策略 +func NewExponentialBackoff(initialDelay, maxDelay time.Duration, multiplier float64) *ExponentialBackoff { + if multiplier <= 0 { + multiplier = 2.0 + } + return &ExponentialBackoff{ + initialDelay: initialDelay, + maxDelay: maxDelay, + multiplier: multiplier, + } +} + +// NextDelay 计算下一次延迟时间 +func (e *ExponentialBackoff) NextDelay(attempt int) time.Duration { + if attempt <= 0 { + attempt = 1 + } + delay := float64(e.initialDelay) + for i := 1; i < attempt; i++ { + delay *= e.multiplier + if time.Duration(delay) >= e.maxDelay { + return e.maxDelay + } + } + if time.Duration(delay) > e.maxDelay { + return e.maxDelay + } + return time.Duration(delay) +} + +// Reset 重置策略 +func (e *ExponentialBackoff) Reset() { + // 无状态,无需重置 +} + +// FixedInterval 固定间隔策略 +type FixedInterval struct { + interval time.Duration +} + +// NewFixedInterval 创建固定间隔策略 +func NewFixedInterval(interval time.Duration) *FixedInterval { + return &FixedInterval{ + interval: interval, + } +} + +// NextDelay 返回固定延迟时间 +func (f *FixedInterval) NextDelay(attempt int) time.Duration { + return f.interval +} + +// Reset 重置策略 +func (f *FixedInterval) Reset() { + // 无状态,无需重置 +} + +// LinearBackoff 线性退避策略 +type LinearBackoff struct { + initialDelay time.Duration + maxDelay time.Duration + increment time.Duration +} + +// NewLinearBackoff 创建线性退避策略 +func NewLinearBackoff(initialDelay, maxDelay, increment time.Duration) *LinearBackoff { + return &LinearBackoff{ + initialDelay: initialDelay, + maxDelay: maxDelay, + increment: increment, + } +} + +// NextDelay 计算下一次延迟时间 +func (l *LinearBackoff) NextDelay(attempt int) time.Duration { + if attempt <= 0 { + attempt = 1 + } + delay := l.initialDelay + time.Duration(attempt-1)*l.increment + if delay > l.maxDelay { + return l.maxDelay + } + return delay +} + +// Reset 重置策略 +func (l *LinearBackoff) Reset() { + // 无状态,无需重置 +} + +// NewStrategy 根据配置创建策略 +func NewStrategy(cfg Config) Strategy { + switch cfg.Strategy { + case StrategyFixedInterval: + return NewFixedInterval(cfg.InitialDelay) + case StrategyLinearBackoff: + return NewLinearBackoff(cfg.InitialDelay, cfg.MaxDelay, cfg.LinearIncrement) + case StrategyExponentialBackoff: + fallthrough + default: + return NewExponentialBackoff(cfg.InitialDelay, cfg.MaxDelay, cfg.Multiplier) + } +} +