171 lines
3.5 KiB
Go
171 lines
3.5 KiB
Go
package reconnect
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"sync"
|
||
"time"
|
||
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
// DatabaseConfig 数据库配置
|
||
type DatabaseConfig struct {
|
||
// Dialector gorm dialector(如 postgres.Open(dsn), mysql.Open(dsn))
|
||
Dialector gorm.Dialector
|
||
// GormConfig gorm配置
|
||
GormConfig *gorm.Config
|
||
// MaxIdleConns 最大空闲连接数
|
||
MaxIdleConns int
|
||
// MaxOpenConns 最大打开连接数
|
||
MaxOpenConns int
|
||
// ConnMaxLifetime 连接最大生命周期
|
||
ConnMaxLifetime time.Duration
|
||
// ReconnectConfig 重连配置
|
||
ReconnectConfig Config
|
||
}
|
||
|
||
// DatabaseClient 带重连功能的数据库客户端
|
||
type DatabaseClient struct {
|
||
config DatabaseConfig
|
||
db *gorm.DB
|
||
sqlDB *sql.DB
|
||
manager *ConnectionManager
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// databaseConnector 数据库连接器
|
||
type databaseConnector struct {
|
||
client *DatabaseClient
|
||
}
|
||
|
||
func (d *databaseConnector) Connect(ctx context.Context) error {
|
||
d.client.mu.Lock()
|
||
defer d.client.mu.Unlock()
|
||
|
||
gormConfig := d.client.config.GormConfig
|
||
if gormConfig == nil {
|
||
gormConfig = &gorm.Config{}
|
||
}
|
||
|
||
db, err := gorm.Open(d.client.config.Dialector, gormConfig)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
sqlDB, err := db.DB()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 配置连接池
|
||
if d.client.config.MaxIdleConns > 0 {
|
||
sqlDB.SetMaxIdleConns(d.client.config.MaxIdleConns)
|
||
}
|
||
if d.client.config.MaxOpenConns > 0 {
|
||
sqlDB.SetMaxOpenConns(d.client.config.MaxOpenConns)
|
||
}
|
||
if d.client.config.ConnMaxLifetime > 0 {
|
||
sqlDB.SetConnMaxLifetime(d.client.config.ConnMaxLifetime)
|
||
}
|
||
|
||
// 验证连接
|
||
if err := sqlDB.PingContext(ctx); err != nil {
|
||
sqlDB.Close()
|
||
return err
|
||
}
|
||
|
||
d.client.db = db
|
||
d.client.sqlDB = sqlDB
|
||
return nil
|
||
}
|
||
|
||
func (d *databaseConnector) Close() error {
|
||
d.client.mu.Lock()
|
||
defer d.client.mu.Unlock()
|
||
|
||
if d.client.sqlDB != nil {
|
||
err := d.client.sqlDB.Close()
|
||
d.client.db = nil
|
||
d.client.sqlDB = nil
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// databaseHealthChecker 数据库健康检查器
|
||
type databaseHealthChecker struct {
|
||
client *DatabaseClient
|
||
}
|
||
|
||
func (d *databaseHealthChecker) HealthCheck(ctx context.Context) error {
|
||
d.client.mu.RLock()
|
||
sqlDB := d.client.sqlDB
|
||
d.client.mu.RUnlock()
|
||
|
||
if sqlDB == nil {
|
||
return context.Canceled
|
||
}
|
||
|
||
return sqlDB.PingContext(ctx)
|
||
}
|
||
|
||
// NewDatabaseClient 创建带重连功能的数据库客户端
|
||
func NewDatabaseClient(cfg DatabaseConfig) (*DatabaseClient, error) {
|
||
client := &DatabaseClient{
|
||
config: cfg,
|
||
}
|
||
|
||
connector := &databaseConnector{client: client}
|
||
checker := &databaseHealthChecker{client: client}
|
||
|
||
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
||
|
||
// 首次连接
|
||
ctx := context.Background()
|
||
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
|
||
// GetDB 获取gorm.DB实例
|
||
func (c *DatabaseClient) GetDB() *gorm.DB {
|
||
c.mu.RLock()
|
||
defer c.mu.RUnlock()
|
||
return c.db
|
||
}
|
||
|
||
// GetSqlDB 获取sql.DB实例
|
||
func (c *DatabaseClient) GetSqlDB() *sql.DB {
|
||
c.mu.RLock()
|
||
defer c.mu.RUnlock()
|
||
return c.sqlDB
|
||
}
|
||
|
||
// State 获取连接状态
|
||
func (c *DatabaseClient) State() ConnectionState {
|
||
return c.manager.State()
|
||
}
|
||
|
||
// Close 关闭客户端
|
||
func (c *DatabaseClient) Close() error {
|
||
return c.manager.Close()
|
||
}
|
||
|
||
// TriggerReconnect 手动触发重连
|
||
func (c *DatabaseClient) TriggerReconnect() {
|
||
c.manager.TriggerReconnect()
|
||
}
|
||
|
||
// Ping 执行Ping命令
|
||
func (c *DatabaseClient) Ping(ctx context.Context) error {
|
||
sqlDB := c.GetSqlDB()
|
||
if sqlDB == nil {
|
||
return context.Canceled
|
||
}
|
||
return sqlDB.PingContext(ctx)
|
||
}
|
||
|