package reconnect import ( "context" "sync" amqp "github.com/rabbitmq/amqp091-go" ) // RabbitMQClientConfig RabbitMQ客户端配置 type RabbitMQClientConfig struct { // URL RabbitMQ连接URL URL string // Config amqp配置 Config *amqp.Config // ReconnectConfig 重连配置 ReconnectConfig Config } // RabbitMQClient 带重连功能的RabbitMQ客户端 type RabbitMQClient struct { config RabbitMQClientConfig conn *amqp.Connection manager *ConnectionManager mu sync.RWMutex } // rabbitmqConnector RabbitMQ连接器 type rabbitmqConnector struct { client *RabbitMQClient } func (r *rabbitmqConnector) Connect(ctx context.Context) error { r.client.mu.Lock() defer r.client.mu.Unlock() var conn *amqp.Connection var err error if r.client.config.Config != nil { conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config) } else { conn, err = amqp.Dial(r.client.config.URL) } if err != nil { return err } // 验证连接 if conn.IsClosed() { conn.Close() return amqp.ErrClosed } r.client.conn = conn return nil } func (r *rabbitmqConnector) Close() error { r.client.mu.Lock() defer r.client.mu.Unlock() if r.client.conn != nil && !r.client.conn.IsClosed() { err := r.client.conn.Close() r.client.conn = nil return err } return nil } // rabbitmqHealthChecker RabbitMQ健康检查器 type rabbitmqHealthChecker struct { client *RabbitMQClient } func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error { r.client.mu.RLock() conn := r.client.conn r.client.mu.RUnlock() if conn == nil || conn.IsClosed() { return amqp.ErrClosed } // 尝试创建通道来验证连接 ch, err := conn.Channel() if err != nil { return err } defer ch.Close() return nil } // NewRabbitMQClient 创建带重连功能的RabbitMQ客户端 func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) { client := &RabbitMQClient{ config: cfg, } connector := &rabbitmqConnector{client: client} checker := &rabbitmqHealthChecker{client: client} client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) // 首次连接 ctx := context.Background() if err := client.manager.ConnectWithRetry(ctx); err != nil { return nil, err } return client, nil } // GetConn 获取RabbitMQ连接 func (c *RabbitMQClient) GetConn() *amqp.Connection { c.mu.RLock() defer c.mu.RUnlock() return c.conn } // Channel 创建通道 func (c *RabbitMQClient) Channel() (*amqp.Channel, error) { conn := c.GetConn() if conn == nil || conn.IsClosed() { return nil, amqp.ErrClosed } return conn.Channel() } // State 获取连接状态 func (c *RabbitMQClient) State() ConnectionState { return c.manager.State() } // Close 关闭客户端 func (c *RabbitMQClient) Close() error { return c.manager.Close() } // TriggerReconnect 手动触发重连 func (c *RabbitMQClient) TriggerReconnect() { c.manager.TriggerReconnect() } // IsClosed 检查连接是否关闭 func (c *RabbitMQClient) IsClosed() bool { conn := c.GetConn() return conn == nil || conn.IsClosed() } // Publish 发布消息(带重连支持) func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { ch, err := c.Channel() if err != nil { return err } defer ch.Close() return ch.Publish(exchange, key, mandatory, immediate, msg) } // Consume 消费消息(带重连支持) func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { ch, err := c.Channel() if err != nil { return nil, err } return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) } // QueueDeclare 声明队列(带重连支持) func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { ch, err := c.Channel() if err != nil { return amqp.Queue{}, err } defer ch.Close() return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args) } // ExchangeDeclare 声明交换机(带重连支持) func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { ch, err := c.Channel() if err != nil { return err } defer ch.Close() return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args) } // QueueBind 绑定队列到交换机(带重连支持) func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { ch, err := c.Channel() if err != nil { return err } defer ch.Close() return ch.QueueBind(name, key, exchange, noWait, args) }