Files
reconnect/rabbitmq.go
ray fd523d34ed feat(rabbitmq): 引入带重连功能的 RabbitMQ 客户端
- 新增 `rabbitmq.go` 文件,实现了一个具有自动重连和健康检查机制的 RabbitMQ 客户端
- 引入 `github.com/rabbitmq/amqp091-go` 依赖,用于 RabbitMQ 交互
- 封装了连接、通道管理、发布、消费、声明等操作,提供健壮的连接管理
- 简化应用层与 RabbitMQ 的交互,自动处理连接中断和恢复
2025-12-18 15:17:00 +08:00

206 lines
4.6 KiB
Go

package reconnect
import (
"context"
"sync"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitMQClientConfig RabbitMQ客户端配置
type RabbitMQClientConfig struct {
// URL RabbitMQ连接URL
URL string
// Config amqp配置
Config *amqp.Config
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// RabbitMQClient 带重连功能的RabbitMQ客户端
type RabbitMQClient struct {
config RabbitMQClientConfig
conn *amqp.Connection
manager *ConnectionManager
mu sync.RWMutex
}
// rabbitmqConnector RabbitMQ连接器
type rabbitmqConnector struct {
client *RabbitMQClient
}
func (r *rabbitmqConnector) Connect(ctx context.Context) error {
r.client.mu.Lock()
defer r.client.mu.Unlock()
var conn *amqp.Connection
var err error
if r.client.config.Config != nil {
conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config)
} else {
conn, err = amqp.Dial(r.client.config.URL)
}
if err != nil {
return err
}
// 验证连接
if conn.IsClosed() {
conn.Close()
return amqp.ErrClosed
}
r.client.conn = conn
return nil
}
func (r *rabbitmqConnector) Close() error {
r.client.mu.Lock()
defer r.client.mu.Unlock()
if r.client.conn != nil && !r.client.conn.IsClosed() {
err := r.client.conn.Close()
r.client.conn = nil
return err
}
return nil
}
// rabbitmqHealthChecker RabbitMQ健康检查器
type rabbitmqHealthChecker struct {
client *RabbitMQClient
}
func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error {
r.client.mu.RLock()
conn := r.client.conn
r.client.mu.RUnlock()
if conn == nil || conn.IsClosed() {
return amqp.ErrClosed
}
// 尝试创建通道来验证连接
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
return nil
}
// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端
func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) {
client := &RabbitMQClient{
config: cfg,
}
connector := &rabbitmqConnector{client: client}
checker := &rabbitmqHealthChecker{client: client}
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
// 首次连接
ctx := context.Background()
if err := client.manager.ConnectWithRetry(ctx); err != nil {
return nil, err
}
return client, nil
}
// GetConn 获取RabbitMQ连接
func (c *RabbitMQClient) GetConn() *amqp.Connection {
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn
}
// Channel 创建通道
func (c *RabbitMQClient) Channel() (*amqp.Channel, error) {
conn := c.GetConn()
if conn == nil || conn.IsClosed() {
return nil, amqp.ErrClosed
}
return conn.Channel()
}
// State 获取连接状态
func (c *RabbitMQClient) State() ConnectionState {
return c.manager.State()
}
// Close 关闭客户端
func (c *RabbitMQClient) Close() error {
return c.manager.Close()
}
// TriggerReconnect 手动触发重连
func (c *RabbitMQClient) TriggerReconnect() {
c.manager.TriggerReconnect()
}
// IsClosed 检查连接是否关闭
func (c *RabbitMQClient) IsClosed() bool {
conn := c.GetConn()
return conn == nil || conn.IsClosed()
}
// Publish 发布消息(带重连支持)
func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
// Consume 消费消息(带重连支持)
func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
ch, err := c.Channel()
if err != nil {
return nil, err
}
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
// QueueDeclare 声明队列(带重连支持)
func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
ch, err := c.Channel()
if err != nil {
return amqp.Queue{}, err
}
defer ch.Close()
return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}
// ExchangeDeclare 声明交换机(带重连支持)
func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}
// QueueBind 绑定队列到交换机(带重连支持)
func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
ch, err := c.Channel()
if err != nil {
return err
}
defer ch.Close()
return ch.QueueBind(name, key, exchange, noWait, args)
}