From 1df38ff4bcf4bdce188577daea26dc6760c16706 Mon Sep 17 00:00:00 2001 From: ray Date: Thu, 18 Dec 2025 16:51:35 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(etcd):=20=E6=B7=BB=E5=8A=A0=20?= =?UTF-8?q?etcd=20=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=92=8C=20gRPC=20?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 `etcd_registry.go` 文件,实现带自动重连的 etcd 服务注册功能 - 新增 `grpc_etcd.go` 文件,提供基于 etcd 的 gRPC 客户端,支持服务发现和重连机制 - 更新 `go.mod` 文件,添加 `github.com/rabbitmq/amqp091-go` 依赖 - 实现了服务注册、注销、续约及健康检查等功能,增强了连接管理能力 --- etcd_registry.go | 200 ++++++++++++++++++++++++++++++++++ go.mod | 2 +- grpc_etcd.go | 274 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 475 insertions(+), 1 deletion(-) create mode 100644 etcd_registry.go create mode 100644 grpc_etcd.go diff --git a/etcd_registry.go b/etcd_registry.go new file mode 100644 index 0000000..6332907 --- /dev/null +++ b/etcd_registry.go @@ -0,0 +1,200 @@ +package reconnect + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +// EtcdRegistryConfig etcd 服务注册配置 +type EtcdRegistryConfig struct { + // Endpoints etcd 服务地址列表 + Endpoints []string + // DialTimeout 连接超时时间 + DialTimeout time.Duration + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// EtcdRegistry etcd 服务注册(带自动重连) +type EtcdRegistry struct { + config EtcdRegistryConfig + etcdClient *EtcdClient + cli *clientv3.Client + key string + val string + serviceName string + ttl int64 + leaseID clientv3.LeaseID + keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器 +func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // 创建带重连的 etcd 客户端 + etcdCfg := EtcdClientConfig{ + Endpoints: cfg.Endpoints, + DialTimeout: cfg.DialTimeout, + ReconnectConfig: cfg.ReconnectConfig, + } + + etcdClient, err := NewEtcdClient(etcdCfg) + if err != nil { + cancel() + return nil, err + } + + return &EtcdRegistry{ + config: cfg, + etcdClient: etcdClient, + cli: etcdClient.GetClient(), + ctx: ctx, + cancel: cancel, + }, nil +} + +// Register 注册服务 +func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error { + r.mu.Lock() + r.serviceName = serviceName + r.ttl = ttl + r.key = fmt.Sprintf("/%s/%s", serviceName, addr) + r.val = addr + r.mu.Unlock() + + return r.registerWithKV(ttl) +} + +func (r *EtcdRegistry) registerWithKV(ttl int64) error { + r.mu.RLock() + cli := r.cli + key := r.key + val := r.val + r.mu.RUnlock() + + if cli == nil { + // 尝试重新获取客户端 + if r.etcdClient != nil { + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + cli = r.cli + r.mu.Unlock() + } + if cli == nil { + return fmt.Errorf("etcd client is nil") + } + } + + resp, err := cli.Grant(context.Background(), ttl) + if err != nil { + return err + } + + r.mu.Lock() + r.leaseID = resp.ID + r.mu.Unlock() + + _, err = cli.Put(context.Background(), key, val, clientv3.WithLease(resp.ID)) + if err != nil { + return err + } + + keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID) + if err != nil { + return err + } + + r.mu.Lock() + r.keepAliveCh = keepAliveCh + r.mu.Unlock() + + go r.watcher(ttl) + log.Printf("[EtcdRegistry] 服务注册成功: %s", key) + return nil +} + +// watcher 监听续约 +func (r *EtcdRegistry) watcher(ttl int64) { + for { + select { + case <-r.ctx.Done(): + log.Println("[EtcdRegistry] context done, watcher exiting.") + return + case ka, ok := <-r.keepAliveCh: + if !ok { + log.Println("[EtcdRegistry] keep-alive channel closed, attempting to re-register...") + + // 更新 cli 引用 + if r.etcdClient != nil { + r.mu.Lock() + r.cli = r.etcdClient.GetClient() + r.mu.Unlock() + } + + // 使用指数退避重试 + delay := time.Second + maxDelay := 30 * time.Second + + for { + select { + case <-r.ctx.Done(): + return + default: + } + + if err := r.registerWithKV(ttl); err != nil { + log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay) + time.Sleep(delay) + delay *= 2 + if delay > maxDelay { + delay = maxDelay + } + } else { + return // 退出当前 goroutine,让新的 watcher 启动 + } + } + } + // 续约成功 + _ = ka + } + } +} + +// UnRegister 注销服务 +func (r *EtcdRegistry) UnRegister() { + r.cancel() // 停止 watcher + + r.mu.RLock() + cli := r.cli + leaseID := r.leaseID + key := r.key + r.mu.RUnlock() + + if cli != nil { + cli.Revoke(context.Background(), leaseID) + cli.Delete(context.Background(), key) + log.Printf("[EtcdRegistry] 服务注销成功: %s", key) + } + + if r.etcdClient != nil { + r.etcdClient.Close() + } +} + +// State 获取连接状态 +func (r *EtcdRegistry) State() ConnectionState { + if r.etcdClient != nil { + return r.etcdClient.State() + } + return StateDisconnected +} + diff --git a/go.mod b/go.mod index f667b1b..f7b7179 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.0 require ( github.com/go-redis/redis/v8 v8.11.5 + github.com/rabbitmq/amqp091-go v1.10.0 go.etcd.io/etcd/client/v3 v3.6.5 google.golang.org/grpc v1.75.1 gorm.io/gorm v1.31.1 @@ -19,7 +20,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/rabbitmq/amqp091-go v1.10.0 // indirect go.etcd.io/etcd/api/v3 v3.6.5 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.5 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/grpc_etcd.go b/grpc_etcd.go new file mode 100644 index 0000000..32eed0a --- /dev/null +++ b/grpc_etcd.go @@ -0,0 +1,274 @@ +package reconnect + +import ( + "context" + "log" + "sync" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/resolver" +) + +// GRPCEtcdClientConfig 使用 etcd 服务发现的 gRPC 客户端配置 +type GRPCEtcdClientConfig struct { + // EtcdEndpoints etcd 服务地址列表 + EtcdEndpoints []string + // EtcdDialTimeout etcd 连接超时时间 + EtcdDialTimeout time.Duration + // ServiceName 服务名称(在 etcd 中注册的 key 前缀) + ServiceName string + // DialOptions 额外的 gRPC 拨号选项 + DialOptions []grpc.DialOption + // ReconnectConfig 重连配置 + ReconnectConfig Config +} + +// GRPCEtcdClient 带 etcd 服务发现和重连功能的 gRPC 客户端 +type GRPCEtcdClient struct { + config GRPCEtcdClientConfig + etcdClient *clientv3.Client + conn *grpc.ClientConn + manager *ConnectionManager + mu sync.RWMutex +} + +// grpcEtcdConnector gRPC + etcd 连接器 +type grpcEtcdConnector struct { + client *GRPCEtcdClient +} + +func (g *grpcEtcdConnector) Connect(ctx context.Context) error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + cfg := g.client.config + + // 创建 etcd 客户端 + etcdTimeout := cfg.EtcdDialTimeout + if etcdTimeout == 0 { + etcdTimeout = 5 * time.Second + } + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: cfg.EtcdEndpoints, + DialTimeout: etcdTimeout, + }) + if err != nil { + return err + } + + // 注册 etcd resolver + etcdResolver := &etcdResolverBuilder{cli: cli} + resolver.Register(etcdResolver) + + target := "etcd:///" + cfg.ServiceName + "/" + + // 默认 DialOptions + opts := []grpc.DialOption{ + grpc.WithResolvers(etcdResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`), + } + // 追加用户自定义选项 + opts = append(opts, cfg.DialOptions...) + + conn, err := grpc.NewClient(target, opts...) + if err != nil { + cli.Close() + return err + } + + // 等待连接就绪 + connCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + conn.Connect() + for { + state := conn.GetState() + if state == connectivity.Ready { + break + } + if state == connectivity.TransientFailure || state == connectivity.Shutdown { + conn.Close() + cli.Close() + return context.DeadlineExceeded + } + if !conn.WaitForStateChange(connCtx, state) { + conn.Close() + cli.Close() + return connCtx.Err() + } + } + + g.client.etcdClient = cli + g.client.conn = conn + return nil +} + +func (g *grpcEtcdConnector) Close() error { + g.client.mu.Lock() + defer g.client.mu.Unlock() + + if g.client.conn != nil { + g.client.conn.Close() + g.client.conn = nil + } + if g.client.etcdClient != nil { + g.client.etcdClient.Close() + g.client.etcdClient = nil + } + return nil +} + +// grpcEtcdHealthChecker gRPC + etcd 健康检查器 +type grpcEtcdHealthChecker struct { + client *GRPCEtcdClient +} + +func (g *grpcEtcdHealthChecker) HealthCheck(ctx context.Context) error { + g.client.mu.RLock() + conn := g.client.conn + g.client.mu.RUnlock() + + if conn == nil { + return context.Canceled + } + + state := conn.GetState() + if state == connectivity.Ready || state == connectivity.Idle { + return nil + } + + conn.Connect() + if !conn.WaitForStateChange(ctx, state) { + return ctx.Err() + } + + newState := conn.GetState() + if newState != connectivity.Ready && newState != connectivity.Idle { + return context.DeadlineExceeded + } + + return nil +} + +// NewGRPCClientWithEtcd 创建带 etcd 服务发现和重连功能的 gRPC 客户端 +func NewGRPCClientWithEtcd(cfg GRPCEtcdClientConfig) (*GRPCEtcdClient, error) { + client := &GRPCEtcdClient{ + config: cfg, + } + + connector := &grpcEtcdConnector{client: client} + checker := &grpcEtcdHealthChecker{client: client} + + client.manager = NewConnectionManager(connector, checker, cfg.ReconnectConfig) + + // 首次连接 + ctx := context.Background() + if err := client.manager.ConnectWithRetry(ctx); err != nil { + return nil, err + } + + return client, nil +} + +// GetConn 获取 gRPC 连接 +func (c *GRPCEtcdClient) GetConn() *grpc.ClientConn { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// GetEtcdClient 获取 etcd 客户端 +func (c *GRPCEtcdClient) GetEtcdClient() *clientv3.Client { + c.mu.RLock() + defer c.mu.RUnlock() + return c.etcdClient +} + +// State 获取连接状态 +func (c *GRPCEtcdClient) State() ConnectionState { + return c.manager.State() +} + +// Close 关闭客户端 +func (c *GRPCEtcdClient) Close() error { + return c.manager.Close() +} + +// TriggerReconnect 手动触发重连 +func (c *GRPCEtcdClient) TriggerReconnect() { + c.manager.TriggerReconnect() +} + +// ================== etcd resolver 实现 ================== + +// etcdResolverBuilder 实现 resolver.Builder +type etcdResolverBuilder struct { + cli *clientv3.Client +} + +func (b *etcdResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + r := &etcdResolver{ + cli: b.cli, + cc: cc, + target: target, + } + go r.watch(target.Endpoint()) + return r, nil +} + +func (b *etcdResolverBuilder) Scheme() string { + return "etcd" +} + +// etcdResolver 实现 resolver.Resolver +type etcdResolver struct { + cli *clientv3.Client + cc resolver.ClientConn + target resolver.Target +} + +func (r *etcdResolver) watch(keyPrefix string) { + // 首次获取服务列表 + resp, err := r.cli.Get(context.Background(), "/"+keyPrefix, clientv3.WithPrefix()) + if err != nil { + log.Printf("[etcdResolver] 获取服务列表失败: %v", err) + return + } + + addrs := make([]resolver.Address, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + addrs = append(addrs, resolver.Address{Addr: string(kv.Value)}) + } + r.cc.UpdateState(resolver.State{Addresses: addrs}) + + // 监听变化 + watchCh := r.cli.Watch(context.Background(), "/"+keyPrefix, clientv3.WithPrefix()) + for wresp := range watchCh { + for _, ev := range wresp.Events { + switch ev.Type { + case clientv3.EventTypePut: + addrs = append(addrs, resolver.Address{Addr: string(ev.Kv.Value)}) + case clientv3.EventTypeDelete: + newAddrs := make([]resolver.Address, 0) + for _, addr := range addrs { + if addr.Addr != string(ev.Kv.Value) { + newAddrs = append(newAddrs, addr) + } + } + addrs = newAddrs + } + r.cc.UpdateState(resolver.State{Addresses: addrs}) + } + } +} + +func (r *etcdResolver) ResolveNow(opts resolver.ResolveNowOptions) {} + +func (r *etcdResolver) Close() {} +