Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fd523d34ed |
1
go.mod
1
go.mod
@@ -19,6 +19,7 @@ require (
|
|||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/jinzhu/now v1.1.5 // indirect
|
github.com/jinzhu/now v1.1.5 // indirect
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
|
||||||
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
|
go.etcd.io/etcd/api/v3 v3.6.5 // indirect
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
|
go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -41,6 +41,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
|||||||
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||||
|
|||||||
205
rabbitmq.go
Normal file
205
rabbitmq.go
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
package reconnect
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RabbitMQClientConfig RabbitMQ客户端配置
|
||||||
|
type RabbitMQClientConfig struct {
|
||||||
|
// URL RabbitMQ连接URL
|
||||||
|
URL string
|
||||||
|
// Config amqp配置
|
||||||
|
Config *amqp.Config
|
||||||
|
// ReconnectConfig 重连配置
|
||||||
|
ReconnectConfig Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// RabbitMQClient 带重连功能的RabbitMQ客户端
|
||||||
|
type RabbitMQClient struct {
|
||||||
|
config RabbitMQClientConfig
|
||||||
|
conn *amqp.Connection
|
||||||
|
manager *ConnectionManager
|
||||||
|
mu sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// rabbitmqConnector RabbitMQ连接器
|
||||||
|
type rabbitmqConnector struct {
|
||||||
|
client *RabbitMQClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitmqConnector) Connect(ctx context.Context) error {
|
||||||
|
r.client.mu.Lock()
|
||||||
|
defer r.client.mu.Unlock()
|
||||||
|
|
||||||
|
var conn *amqp.Connection
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if r.client.config.Config != nil {
|
||||||
|
conn, err = amqp.DialConfig(r.client.config.URL, *r.client.config.Config)
|
||||||
|
} else {
|
||||||
|
conn, err = amqp.Dial(r.client.config.URL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 验证连接
|
||||||
|
if conn.IsClosed() {
|
||||||
|
conn.Close()
|
||||||
|
return amqp.ErrClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
r.client.conn = conn
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitmqConnector) Close() error {
|
||||||
|
r.client.mu.Lock()
|
||||||
|
defer r.client.mu.Unlock()
|
||||||
|
|
||||||
|
if r.client.conn != nil && !r.client.conn.IsClosed() {
|
||||||
|
err := r.client.conn.Close()
|
||||||
|
r.client.conn = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// rabbitmqHealthChecker RabbitMQ健康检查器
|
||||||
|
type rabbitmqHealthChecker struct {
|
||||||
|
client *RabbitMQClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rabbitmqHealthChecker) HealthCheck(ctx context.Context) error {
|
||||||
|
r.client.mu.RLock()
|
||||||
|
conn := r.client.conn
|
||||||
|
r.client.mu.RUnlock()
|
||||||
|
|
||||||
|
if conn == nil || conn.IsClosed() {
|
||||||
|
return amqp.ErrClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
// 尝试创建通道来验证连接
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRabbitMQClient 创建带重连功能的RabbitMQ客户端
|
||||||
|
func NewRabbitMQClient(cfg RabbitMQClientConfig) (*RabbitMQClient, error) {
|
||||||
|
client := &RabbitMQClient{
|
||||||
|
config: cfg,
|
||||||
|
}
|
||||||
|
|
||||||
|
connector := &rabbitmqConnector{client: client}
|
||||||
|
checker := &rabbitmqHealthChecker{client: client}
|
||||||
|
|
||||||
|
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
|
||||||
|
|
||||||
|
// 首次连接
|
||||||
|
ctx := context.Background()
|
||||||
|
if err := client.manager.ConnectWithRetry(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConn 获取RabbitMQ连接
|
||||||
|
func (c *RabbitMQClient) GetConn() *amqp.Connection {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.conn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Channel 创建通道
|
||||||
|
func (c *RabbitMQClient) Channel() (*amqp.Channel, error) {
|
||||||
|
conn := c.GetConn()
|
||||||
|
if conn == nil || conn.IsClosed() {
|
||||||
|
return nil, amqp.ErrClosed
|
||||||
|
}
|
||||||
|
return conn.Channel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// State 获取连接状态
|
||||||
|
func (c *RabbitMQClient) State() ConnectionState {
|
||||||
|
return c.manager.State()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close 关闭客户端
|
||||||
|
func (c *RabbitMQClient) Close() error {
|
||||||
|
return c.manager.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerReconnect 手动触发重连
|
||||||
|
func (c *RabbitMQClient) TriggerReconnect() {
|
||||||
|
c.manager.TriggerReconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsClosed 检查连接是否关闭
|
||||||
|
func (c *RabbitMQClient) IsClosed() bool {
|
||||||
|
conn := c.GetConn()
|
||||||
|
return conn == nil || conn.IsClosed()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish 发布消息(带重连支持)
|
||||||
|
func (c *RabbitMQClient) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
|
||||||
|
ch, err := c.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
return ch.Publish(exchange, key, mandatory, immediate, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consume 消费消息(带重连支持)
|
||||||
|
func (c *RabbitMQClient) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
|
||||||
|
ch, err := c.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueDeclare 声明队列(带重连支持)
|
||||||
|
func (c *RabbitMQClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
|
||||||
|
ch, err := c.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return amqp.Queue{}, err
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
return ch.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExchangeDeclare 声明交换机(带重连支持)
|
||||||
|
func (c *RabbitMQClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
|
||||||
|
ch, err := c.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
return ch.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueBind 绑定队列到交换机(带重连支持)
|
||||||
|
func (c *RabbitMQClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
|
||||||
|
ch, err := c.Channel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
return ch.QueueBind(name, key, exchange, noWait, args)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user