- 新增 `rabbitmq.go` 文件,实现了一个具有自动重连和健康检查机制的 RabbitMQ 客户端 - 引入 `github.com/rabbitmq/amqp091-go` 依赖,用于 RabbitMQ 交互 - 封装了连接、通道管理、发布、消费、声明等操作,提供健壮的连接管理 - 简化应用层与 RabbitMQ 的交互,自动处理连接中断和恢复
206 lines
4.6 KiB
Go
206 lines
4.6 KiB
Go
package reconnect
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
// RabbitMQClientConfig RabbitMQ客户端配置
|
|
type RabbitMQClientConfig struct {
|
|
// URL RabbitMQ连接URL
|
|
URL string
|
|
// Config amqp配置
|
|
Config *amqp.Config
|
|
// ReconnectConfig 重连配置
|
|
ReconnectConfig Config
|
|
}
|
|
|
|
// RabbitMQClient 带重连功能的RabbitMQ客户端
|
|
type RabbitMQClient struct {
|
|
config RabbitMQClientConfig
|
|
conn *amqp.Connection
|
|
manager *ConnectionManager
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// rabbitmqConnector RabbitMQ连接器
|
|
type rabbitmqConnector struct {
|
|
client *RabbitMQClient
|
|
}
|
|
|
|
func (r *rabbitmqConnector) Connect(ctx context.Context) error {
|
|
r.client.mu.Lock()
|
|
defer r.client.mu.Unlock()
|
|
|
|
var conn *amqp.Connection
|
|
var err error
|
|
|
|
if r.client.config.Config != nil {
|
|
conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config)
|
|
} else {
|
|
conn, err = amqp.Dial(r.client.config.URL)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 验证连接
|
|
if conn.IsClosed() {
|
|
conn.Close()
|
|
return amqp.ErrClosed
|
|
}
|
|
|
|
r.client.conn = conn
|
|
return nil
|
|
}
|
|
|
|
func (r *rabbitmqConnector) Close() error {
|
|
r.client.mu.Lock()
|
|
defer r.client.mu.Unlock()
|
|
|
|
if r.client.conn != nil && !r.client.conn.IsClosed() {
|
|
err := r.client.conn.Close()
|
|
r.client.conn = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// rabbitmqHealthChecker RabbitMQ健康检查器
|
|
type rabbitmqHealthChecker struct {
|
|
client *RabbitMQClient
|
|
}
|
|
|
|
func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error {
|
|
r.client.mu.RLock()
|
|
conn := r.client.conn
|
|
r.client.mu.RUnlock()
|
|
|
|
if conn == nil || conn.IsClosed() {
|
|
return amqp.ErrClosed
|
|
}
|
|
|
|
// 尝试创建通道来验证连接
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端
|
|
func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) {
|
|
client := &RabbitMQClient{
|
|
config: cfg,
|
|
}
|
|
|
|
connector := &rabbitmqConnector{client: client}
|
|
checker := &rabbitmqHealthChecker{client: client}
|
|
|
|
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
|
|
|
// 首次连接
|
|
ctx := context.Background()
|
|
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// GetConn 获取RabbitMQ连接
|
|
func (c *RabbitMQClient) GetConn() *amqp.Connection {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.conn
|
|
}
|
|
|
|
// Channel 创建通道
|
|
func (c *RabbitMQClient) Channel() (*amqp.Channel, error) {
|
|
conn := c.GetConn()
|
|
if conn == nil || conn.IsClosed() {
|
|
return nil, amqp.ErrClosed
|
|
}
|
|
return conn.Channel()
|
|
}
|
|
|
|
// State 获取连接状态
|
|
func (c *RabbitMQClient) State() ConnectionState {
|
|
return c.manager.State()
|
|
}
|
|
|
|
// Close 关闭客户端
|
|
func (c *RabbitMQClient) Close() error {
|
|
return c.manager.Close()
|
|
}
|
|
|
|
// TriggerReconnect 手动触发重连
|
|
func (c *RabbitMQClient) TriggerReconnect() {
|
|
c.manager.TriggerReconnect()
|
|
}
|
|
|
|
// IsClosed 检查连接是否关闭
|
|
func (c *RabbitMQClient) IsClosed() bool {
|
|
conn := c.GetConn()
|
|
return conn == nil || conn.IsClosed()
|
|
}
|
|
|
|
// Publish 发布消息(带重连支持)
|
|
func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
|
|
ch, err := c.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
|
|
return ch.Publish(exchange, key, mandatory, immediate, msg)
|
|
}
|
|
|
|
// Consume 消费消息(带重连支持)
|
|
func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
|
|
ch, err := c.Channel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
|
|
}
|
|
|
|
// QueueDeclare 声明队列(带重连支持)
|
|
func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
|
|
ch, err := c.Channel()
|
|
if err != nil {
|
|
return amqp.Queue{}, err
|
|
}
|
|
defer ch.Close()
|
|
|
|
return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
|
|
}
|
|
|
|
// ExchangeDeclare 声明交换机(带重连支持)
|
|
func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
|
|
ch, err := c.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
|
|
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
|
|
}
|
|
|
|
// QueueBind 绑定队列到交换机(带重连支持)
|
|
func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
|
|
ch, err := c.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
|
|
return ch.QueueBind(name, key, exchange, noWait, args)
|
|
}
|