Files
reconnect/grpc.go
2025-12-18 15:11:02 +08:00

163 lines
3.2 KiB
Go

package reconnect
import (
"context"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)
// GRPCClientConfig gRPC客户端配置
type GRPCClientConfig struct {
// Target gRPC服务地址
Target string
// DialOptions gRPC拨号选项
DialOptions []grpc.DialOption
// ReconnectConfig 重连配置
ReconnectConfig Config
}
// GRPCClient 带重连功能的gRPC客户端
type GRPCClient struct {
config GRPCClientConfig
conn *grpc.ClientConn
manager *ConnectionManager
mu sync.RWMutex
}
// grpcConnector gRPC连接器
type grpcConnector struct {
client *GRPCClient
}
func (g *grpcConnector) Connect(ctx context.Context) error {
g.client.mu.Lock()
defer g.client.mu.Unlock()
opts := g.client.config.DialOptions
if len(opts) == 0 {
opts = []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
conn, err := grpc.NewClient(g.client.config.Target, opts...)
if err != nil {
return err
}
// 等待连接就绪
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
conn.Connect()
for {
state := conn.GetState()
if state == connectivity.Ready {
break
}
if state == connectivity.TransientFailure || state == connectivity.Shutdown {
conn.Close()
return context.DeadlineExceeded
}
if !conn.WaitForStateChange(ctx, state) {
conn.Close()
return ctx.Err()
}
}
g.client.conn = conn
return nil
}
func (g *grpcConnector) Close() error {
g.client.mu.Lock()
defer g.client.mu.Unlock()
if g.client.conn != nil {
err := g.client.conn.Close()
g.client.conn = nil
return err
}
return nil
}
// grpcHealthChecker gRPC健康检查器
type grpcHealthChecker struct {
client *GRPCClient
}
func (g *grpcHealthChecker) HealthCheck(ctx context.Context) error {
g.client.mu.RLock()
conn := g.client.conn
g.client.mu.RUnlock()
if conn == nil {
return context.Canceled
}
state := conn.GetState()
if state == connectivity.Ready || state == connectivity.Idle {
return nil
}
// 尝试连接并等待
conn.Connect()
if !conn.WaitForStateChange(ctx, state) {
return ctx.Err()
}
newState := conn.GetState()
if newState != connectivity.Ready && newState != connectivity.Idle {
return context.DeadlineExceeded
}
return nil
}
// NewGRPCClient 创建带重连功能的gRPC客户端
func NewGRPCClient(cfg GRPCClientConfig) (*GRPCClient, error) {
client := &GRPCClient{
config: cfg,
}
connector := &grpcConnector{client: client}
checker := &grpcHealthChecker{client: client}
client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig)
// 首次连接
ctx := context.Background()
if err := client.manager.ConnectWithRetry(ctx); err != nil {
return nil, err
}
return client, nil
}
// GetConn 获取gRPC连接
func (c *GRPCClient) GetConn() *grpc.ClientConn {
c.mu.RLock()
defer c.mu.RUnlock()
return c.conn
}
// State 获取连接状态
func (c *GRPCClient) State() ConnectionState {
return c.manager.State()
}
// Close 关闭客户端
func (c *GRPCClient) Close() error {
return c.manager.Close()
}
// TriggerReconnect 手动触发重连
func (c *GRPCClient) TriggerReconnect() {
c.manager.TriggerReconnect()
}