reconnect-mym #1
106
postgres.go
Normal file
106
postgres.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package reconnect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// PostgresClient PostgreSQL客户端包装器
|
||||
type PostgresClient struct {
|
||||
dsn string
|
||||
db *sql.DB
|
||||
mu sync.RWMutex
|
||||
maxAge time.Duration
|
||||
}
|
||||
|
||||
// NewPostgresClient 创建新的PostgreSQL客户端
|
||||
func NewPostgresClient(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration) *PostgresClient {
|
||||
return &PostgresClient{
|
||||
dsn: dsn,
|
||||
maxAge: maxConnectionAge,
|
||||
}
|
||||
}
|
||||
|
||||
// Connect 建立PostgreSQL连接
|
||||
func (p *PostgresClient) Connect(ctx context.Context) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.db != nil {
|
||||
_ = p.db.Close()
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", p.dsn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("postgres open failed: %w", err)
|
||||
}
|
||||
|
||||
// 设置连接池参数
|
||||
db.SetMaxOpenConns(25)
|
||||
db.SetMaxIdleConns(5)
|
||||
if p.maxAge > 0 {
|
||||
db.SetConnMaxLifetime(p.maxAge)
|
||||
}
|
||||
|
||||
// 测试连接
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
_ = db.Close()
|
||||
return fmt.Errorf("postgres ping failed: %w", err)
|
||||
}
|
||||
|
||||
p.db = db
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close 关闭PostgreSQL连接
|
||||
func (p *PostgresClient) Close() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.db != nil {
|
||||
err := p.db.Close()
|
||||
p.db = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ping 检查PostgreSQL连接是否健康
|
||||
func (p *PostgresClient) Ping(ctx context.Context) error {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
if p.db == nil {
|
||||
return fmt.Errorf("postgres db is nil")
|
||||
}
|
||||
|
||||
return p.db.PingContext(ctx)
|
||||
}
|
||||
|
||||
// IsConnected 检查连接状态
|
||||
func (p *PostgresClient) IsConnected() bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
return p.db != nil
|
||||
}
|
||||
|
||||
// GetDB 获取底层的数据库连接
|
||||
func (p *PostgresClient) GetDB() *sql.DB {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
return p.db
|
||||
}
|
||||
|
||||
// PostgresManager 创建PostgreSQL重连管理器
|
||||
func PostgresManager(dsn string, maxConnections int, maxIdleConnections int, maxConnectionAge time.Duration, config *Config) *Manager {
|
||||
client := NewPostgresClient(dsn, maxConnections, maxIdleConnections, maxConnectionAge)
|
||||
return NewManager(client, config)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user