From fd523d34edfa7c3e1f2b839e590b81ea152ad104 Mon Sep 17 00:00:00 2001 From: ray Date: Thu, 18 Dec 2025 15:17:00 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(rabbitmq):=20=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E5=B8=A6=E9=87=8D=E8=BF=9E=E5=8A=9F=E8=83=BD=E7=9A=84?= =?UTF-8?q?=20RabbitMQ=20=E5=AE=A2=E6=88=B7=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 `rabbitmq.go` 文件,实现了一个具有自动重连和健康检查机制的 RabbitMQ 客户端 - 引入 `github.com/rabbitmq/amqp091-go` 依赖,用于 RabbitMQ 交互 - 封装了连接、通道管理、发布、消费、声明等操作,提供健壮的连接管理 - 简化应用层与 RabbitMQ 的交互,自动处理连接中断和恢复 --- go.mod | 1 + go.sum | 2 + rabbitmq.go | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 rabbitmq.go diff --git a/go.mod b/go.mod index 4f01de7..fc069fc 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect go.etcd.io/etcd/api/v3 v3.6.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 07e50bf..6b6ed0e 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..111c7ab --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,205 @@ +package reconnect + +import ( + "context" + "sync" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// RabbitMQClientConfig RabbitMQ客户端配置 +type RabbitMQClientConfig struct { + // URL RabbitMQ连接URL + URL string + // Config amqp配置 + Config *amqp.Config + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// RabbitMQClient 带重连功能的RabbitMQ客户端 +type RabbitMQClient struct { + config RabbitMQClientConfig + conn *amqp.Connection + manager *ConnectionManager + mu sync.RWMutex +} + +// rabbitmqConnector RabbitMQ连接器 +type rabbitmqConnector struct { + client *RabbitMQClient +} + +func (r *rabbitmqConnector) Connect(ctx context.Context) error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + var conn *amqp.Connection + var err error + + if r.client.config.Config != nil { + conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config) + } else { + conn, err = amqp.Dial(r.client.config.URL) + } + + if err != nil { + return err + } + + // 验证连接 + if conn.IsClosed() { + conn.Close() + return amqp.ErrClosed + } + + r.client.conn = conn + return nil +} + +func (r *rabbitmqConnector) Close() error { + r.client.mu.Lock() + defer r.client.mu.Unlock() + + if r.client.conn != nil && !r.client.conn.IsClosed() { + err := r.client.conn.Close() + r.client.conn = nil + return err + } + return nil +} + +// rabbitmqHealthChecker RabbitMQ健康检查器 +type rabbitmqHealthChecker struct { + client *RabbitMQClient +} + +func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error { + r.client.mu.RLock() + conn := r.client.conn + r.client.mu.RUnlock() + + if conn == nil || conn.IsClosed() { + return amqp.ErrClosed + } + + // 尝试创建通道来验证连接 + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + return nil +} + +// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端 +func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) { + client := &RabbitMQClient{ + config: cfg, + } + + connector := &rabbitmqConnector{client: client} + checker := &rabbitmqHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetConn 获取RabbitMQ连接 +func (c *RabbitMQClient) GetConn() *amqp.Connection { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// Channel 创建通道 +func (c *RabbitMQClient) Channel() (*amqp.Channel, error) { + conn := c.GetConn() + if conn == nil || conn.IsClosed() { + return nil, amqp.ErrClosed + } + return conn.Channel() +} + +// State 获取连接状态 +func (c *RabbitMQClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *RabbitMQClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *RabbitMQClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// IsClosed 检查连接是否关闭 +func (c *RabbitMQClient) IsClosed() bool { + conn := c.GetConn() + return conn == nil || conn.IsClosed() +} + +// Publish 发布消息(带重连支持) +func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + ch, err := c.Channel() + if err != nil { + return err + } + defer ch.Close() + + return ch.Publish(exchange, key, mandatory, immediate, msg) +} + +// Consume 消费消息(带重连支持) +func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { + ch, err := c.Channel() + if err != nil { + return nil, err + } + + return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) +} + +// QueueDeclare 声明队列(带重连支持) +func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { + ch, err := c.Channel() + if err != nil { + return amqp.Queue{}, err + } + defer ch.Close() + + return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args) +} + +// ExchangeDeclare 声明交换机(带重连支持) +func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { + ch, err := c.Channel() + if err != nil { + return err + } + defer ch.Close() + + return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args) +} + +// QueueBind 绑定队列到交换机(带重连支持) +func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { + ch, err := c.Channel() + if err != nil { + return err + } + defer ch.Close() + + return ch.QueueBind(name, key, exchange, noWait, args) +}