✨ feat(rabbitmq): 引入带重连功能的 RabbitMQ 客户端
- 新增 `rabbitmq.go` 文件,实现了一个具有自动重连和健康检查机制的 RabbitMQ 客户端 - 引入 `github.com/rabbitmq/amqp091-go` 依赖,用于 RabbitMQ 交互 - 封装了连接、通道管理、发布、消费、声明等操作,提供健壮的连接管理 - 简化应用层与 RabbitMQ 的交互,自动处理连接中断和恢复
This commit is contained in:
1
go.mod
1
go.mod
@@ -19,6 +19,7 @@ require (
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/jinzhu/now v1.1.5 // indirect
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
|
||||
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -41,6 +41,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
||||
205
rabbitmq.go
Normal file
205
rabbitmq.go
Normal file
@@ -0,0 +1,205 @@
|
||||
package reconnect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// RabbitMQClientConfig RabbitMQ客户端配置
|
||||
type RabbitMQClientConfig struct {
|
||||
// URL RabbitMQ连接URL
|
||||
URL string
|
||||
// Config amqp配置
|
||||
Config *amqp.Config
|
||||
// ReconnectConfig 重连配置
|
||||
ReconnectConfig Config
|
||||
}
|
||||
|
||||
// RabbitMQClient 带重连功能的RabbitMQ客户端
|
||||
type RabbitMQClient struct {
|
||||
config RabbitMQClientConfig
|
||||
conn *amqp.Connection
|
||||
manager *ConnectionManager
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// rabbitmqConnector RabbitMQ连接器
|
||||
type rabbitmqConnector struct {
|
||||
client *RabbitMQClient
|
||||
}
|
||||
|
||||
func (r *rabbitmqConnector) Connect(ctx context.Context) error {
|
||||
r.client.mu.Lock()
|
||||
defer r.client.mu.Unlock()
|
||||
|
||||
var conn *amqp.Connection
|
||||
var err error
|
||||
|
||||
if r.client.config.Config != nil {
|
||||
conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config)
|
||||
} else {
|
||||
conn, err = amqp.Dial(r.client.config.URL)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 验证连接
|
||||
if conn.IsClosed() {
|
||||
conn.Close()
|
||||
return amqp.ErrClosed
|
||||
}
|
||||
|
||||
r.client.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitmqConnector) Close() error {
|
||||
r.client.mu.Lock()
|
||||
defer r.client.mu.Unlock()
|
||||
|
||||
if r.client.conn != nil && !r.client.conn.IsClosed() {
|
||||
err := r.client.conn.Close()
|
||||
r.client.conn = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// rabbitmqHealthChecker RabbitMQ健康检查器
|
||||
type rabbitmqHealthChecker struct {
|
||||
client *RabbitMQClient
|
||||
}
|
||||
|
||||
func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error {
|
||||
r.client.mu.RLock()
|
||||
conn := r.client.conn
|
||||
r.client.mu.RUnlock()
|
||||
|
||||
if conn == nil || conn.IsClosed() {
|
||||
return amqp.ErrClosed
|
||||
}
|
||||
|
||||
// 尝试创建通道来验证连接
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端
|
||||
func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) {
|
||||
client := &RabbitMQClient{
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
connector := &rabbitmqConnector{client: client}
|
||||
checker := &rabbitmqHealthChecker{client: client}
|
||||
|
||||
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
||||
|
||||
// 首次连接
|
||||
ctx := context.Background()
|
||||
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// GetConn 获取RabbitMQ连接
|
||||
func (c *RabbitMQClient) GetConn() *amqp.Connection {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.conn
|
||||
}
|
||||
|
||||
// Channel 创建通道
|
||||
func (c *RabbitMQClient) Channel() (*amqp.Channel, error) {
|
||||
conn := c.GetConn()
|
||||
if conn == nil || conn.IsClosed() {
|
||||
return nil, amqp.ErrClosed
|
||||
}
|
||||
return conn.Channel()
|
||||
}
|
||||
|
||||
// State 获取连接状态
|
||||
func (c *RabbitMQClient) State() ConnectionState {
|
||||
return c.manager.State()
|
||||
}
|
||||
|
||||
// Close 关闭客户端
|
||||
func (c *RabbitMQClient) Close() error {
|
||||
return c.manager.Close()
|
||||
}
|
||||
|
||||
// TriggerReconnect 手动触发重连
|
||||
func (c *RabbitMQClient) TriggerReconnect() {
|
||||
c.manager.TriggerReconnect()
|
||||
}
|
||||
|
||||
// IsClosed 检查连接是否关闭
|
||||
func (c *RabbitMQClient) IsClosed() bool {
|
||||
conn := c.GetConn()
|
||||
return conn == nil || conn.IsClosed()
|
||||
}
|
||||
|
||||
// Publish 发布消息(带重连支持)
|
||||
func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
return ch.Publish(exchange, key, mandatory, immediate, msg)
|
||||
}
|
||||
|
||||
// Consume 消费消息(带重连支持)
|
||||
func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
|
||||
}
|
||||
|
||||
// QueueDeclare 声明队列(带重连支持)
|
||||
func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
return amqp.Queue{}, err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
|
||||
}
|
||||
|
||||
// ExchangeDeclare 声明交换机(带重连支持)
|
||||
func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
|
||||
}
|
||||
|
||||
// QueueBind 绑定队列到交换机(带重连支持)
|
||||
func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
|
||||
ch, err := c.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
return ch.QueueBind(name, key, exchange, noWait, args)
|
||||
}
|
||||
Reference in New Issue
Block a user