- 添加防抖机制,防止短时间内重复注册服务 - 引入 watcher 机制,支持动态监控服务续约状态 - 优化 watcher 的上下文管理,确保资源的正确释放 - 改进日志输出,提供更详细的注册和重试信息
339 lines
8.5 KiB
Go
339 lines
8.5 KiB
Go
package reconnect
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"sync"
|
||
"time"
|
||
|
||
clientv3 "go.etcd.io/etcd/client/v3"
|
||
)
|
||
|
||
// EtcdRegistryConfig etcd 服务注册配置
|
||
type EtcdRegistryConfig struct {
|
||
// Endpoints etcd 服务地址列表
|
||
Endpoints []string
|
||
// DialTimeout 连接超时时间
|
||
DialTimeout time.Duration
|
||
// ReconnectConfig 重连配置
|
||
ReconnectConfig Config
|
||
}
|
||
|
||
// EtcdRegistry etcd 服务注册(带自动重连)
|
||
type EtcdRegistry struct {
|
||
config EtcdRegistryConfig
|
||
etcdClient *EtcdClient
|
||
cli *clientv3.Client
|
||
key string
|
||
val string
|
||
serviceName string
|
||
ttl int64
|
||
leaseID clientv3.LeaseID
|
||
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
|
||
mu sync.RWMutex
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
watcherCtx context.Context
|
||
watcherCancel context.CancelFunc
|
||
watcherMu sync.Mutex
|
||
lastRegisterTime time.Time
|
||
registerMu sync.Mutex
|
||
}
|
||
|
||
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
|
||
func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
// 创建带重连的 etcd 客户端
|
||
etcdCfg := EtcdClientConfig{
|
||
Endpoints: cfg.Endpoints,
|
||
DialTimeout: cfg.DialTimeout,
|
||
ReconnectConfig: cfg.ReconnectConfig,
|
||
}
|
||
|
||
etcdClient, err := NewEtcdClient(etcdCfg)
|
||
if err != nil {
|
||
cancel()
|
||
return nil, err
|
||
}
|
||
|
||
watcherCtx, watcherCancel := context.WithCancel(context.Background())
|
||
return &EtcdRegistry{
|
||
config: cfg,
|
||
etcdClient: etcdClient,
|
||
cli: etcdClient.GetClient(),
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
watcherCtx: watcherCtx,
|
||
watcherCancel: watcherCancel,
|
||
}, nil
|
||
}
|
||
|
||
// Register 注册服务
|
||
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
|
||
r.mu.Lock()
|
||
r.serviceName = serviceName
|
||
r.ttl = ttl
|
||
r.key = fmt.Sprintf("/%s/%s", serviceName, addr)
|
||
r.val = addr
|
||
r.mu.Unlock()
|
||
|
||
return r.registerWithKV(ttl)
|
||
}
|
||
|
||
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
||
// 防抖:防止短时间内重复注册
|
||
r.registerMu.Lock()
|
||
now := time.Now()
|
||
if !r.lastRegisterTime.IsZero() && now.Sub(r.lastRegisterTime) < 2*time.Second {
|
||
r.registerMu.Unlock()
|
||
return fmt.Errorf("register too frequently, please wait")
|
||
}
|
||
r.lastRegisterTime = now
|
||
r.registerMu.Unlock()
|
||
|
||
r.mu.RLock()
|
||
cli := r.cli
|
||
key := r.key
|
||
val := r.val
|
||
r.mu.RUnlock()
|
||
|
||
// 如果 cli 为 nil 或连接已关闭,尝试重新获取客户端
|
||
if cli == nil {
|
||
if r.etcdClient != nil {
|
||
// 检查连接状态
|
||
state := r.etcdClient.State()
|
||
if state != StateConnected {
|
||
return fmt.Errorf("etcd client is not connected, current state: %s", state)
|
||
}
|
||
r.mu.Lock()
|
||
r.cli = r.etcdClient.GetClient()
|
||
cli = r.cli
|
||
r.mu.Unlock()
|
||
}
|
||
if cli == nil {
|
||
return fmt.Errorf("etcd client is nil")
|
||
}
|
||
} else {
|
||
// 即使 cli 不为 nil,也要检查连接状态,因为可能连接已断开但 cli 引用还在
|
||
if r.etcdClient != nil {
|
||
state := r.etcdClient.State()
|
||
if state != StateConnected {
|
||
// 连接已断开,尝试更新 cli 引用
|
||
r.mu.Lock()
|
||
r.cli = r.etcdClient.GetClient()
|
||
cli = r.cli
|
||
r.mu.Unlock()
|
||
if cli == nil {
|
||
return fmt.Errorf("etcd client is not connected, current state: %s", state)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 使用带超时的 context 来避免长时间阻塞
|
||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer cancel()
|
||
|
||
resp, err := cli.Grant(ctx, ttl)
|
||
if err != nil {
|
||
// 如果 Grant 失败,可能是连接已关闭,尝试更新 client 引用
|
||
if r.etcdClient != nil {
|
||
r.mu.Lock()
|
||
r.cli = r.etcdClient.GetClient()
|
||
cli = r.cli
|
||
r.mu.Unlock()
|
||
if cli != nil {
|
||
// 使用新的 client 重试
|
||
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer cancel2()
|
||
resp, err = cli.Grant(ctx2, ttl)
|
||
}
|
||
}
|
||
if err != nil {
|
||
return fmt.Errorf("failed to grant lease: %w", err)
|
||
}
|
||
}
|
||
|
||
r.mu.Lock()
|
||
r.leaseID = resp.ID
|
||
r.mu.Unlock()
|
||
|
||
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
|
||
defer cancel3()
|
||
_, err = cli.Put(ctx3, key, val, clientv3.WithLease(resp.ID))
|
||
if err != nil {
|
||
return fmt.Errorf("failed to put key: %w", err)
|
||
}
|
||
|
||
// KeepAlive 需要使用长期有效的 context,不能使用带超时的 context
|
||
keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to start keep-alive: %w", err)
|
||
}
|
||
|
||
r.mu.Lock()
|
||
r.keepAliveCh = keepAliveCh
|
||
r.mu.Unlock()
|
||
|
||
// 停止旧的 watcher(如果存在)
|
||
r.watcherMu.Lock()
|
||
if r.watcherCancel != nil {
|
||
r.watcherCancel()
|
||
}
|
||
// 创建新的 watcher context
|
||
r.watcherCtx, r.watcherCancel = context.WithCancel(context.Background())
|
||
watcherCtx := r.watcherCtx
|
||
r.watcherMu.Unlock()
|
||
|
||
go r.watcher(ttl, watcherCtx)
|
||
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
|
||
return nil
|
||
}
|
||
|
||
// watcher 监听续约
|
||
func (r *EtcdRegistry) watcher(ttl int64, watcherCtx context.Context) {
|
||
for {
|
||
select {
|
||
case <-r.ctx.Done():
|
||
log.Println("[EtcdRegistry] context done, watcher exiting.")
|
||
return
|
||
case <-watcherCtx.Done():
|
||
log.Println("[EtcdRegistry] watcher context done, exiting.")
|
||
return
|
||
case ka, ok := <-r.keepAliveCh:
|
||
if !ok {
|
||
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
|
||
|
||
// 等待连接恢复,而不是立即重试
|
||
maxWaitTime := 60 * time.Second
|
||
checkInterval := 500 * time.Millisecond
|
||
waited := time.Duration(0)
|
||
|
||
for waited < maxWaitTime {
|
||
select {
|
||
case <-r.ctx.Done():
|
||
return
|
||
case <-watcherCtx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
// 检查连接状态
|
||
if r.etcdClient != nil {
|
||
state := r.etcdClient.State()
|
||
if state == StateConnected {
|
||
// 连接已恢复,更新 cli 引用
|
||
r.mu.Lock()
|
||
r.cli = r.etcdClient.GetClient()
|
||
r.mu.Unlock()
|
||
|
||
// 等待一段时间确保连接稳定,避免立即重新注册
|
||
time.Sleep(1 * time.Second)
|
||
|
||
// 尝试重新注册
|
||
if err := r.registerWithKV(ttl); err == nil {
|
||
log.Println("[EtcdRegistry] 服务重新注册成功")
|
||
return // 退出当前 goroutine,让新的 watcher 启动
|
||
} else {
|
||
// 如果是防抖错误,继续等待
|
||
if err.Error() == "register too frequently, please wait" {
|
||
log.Println("[EtcdRegistry] 注册过于频繁,继续等待...")
|
||
time.Sleep(2 * time.Second)
|
||
} else {
|
||
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
time.Sleep(checkInterval)
|
||
waited += checkInterval
|
||
}
|
||
|
||
// 如果等待超时,使用指数退避重试
|
||
log.Println("[EtcdRegistry] 等待连接恢复超时,使用重试机制...")
|
||
delay := 2 * time.Second
|
||
maxDelay := 30 * time.Second
|
||
|
||
for {
|
||
select {
|
||
case <-r.ctx.Done():
|
||
return
|
||
case <-watcherCtx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
// 再次检查连接状态
|
||
if r.etcdClient != nil {
|
||
state := r.etcdClient.State()
|
||
if state == StateConnected {
|
||
r.mu.Lock()
|
||
r.cli = r.etcdClient.GetClient()
|
||
r.mu.Unlock()
|
||
}
|
||
}
|
||
|
||
if err := r.registerWithKV(ttl); err != nil {
|
||
// 如果是防抖错误,等待更长时间
|
||
if err.Error() == "register too frequently, please wait" {
|
||
log.Println("[EtcdRegistry] 注册过于频繁,等待更长时间...")
|
||
time.Sleep(3 * time.Second)
|
||
continue
|
||
}
|
||
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
|
||
time.Sleep(delay)
|
||
delay *= 2
|
||
if delay > maxDelay {
|
||
delay = maxDelay
|
||
}
|
||
} else {
|
||
log.Println("[EtcdRegistry] 服务重新注册成功")
|
||
return // 退出当前 goroutine,让新的 watcher 启动
|
||
}
|
||
}
|
||
}
|
||
// 续约成功
|
||
_ = ka
|
||
}
|
||
}
|
||
}
|
||
|
||
// UnRegister 注销服务
|
||
func (r *EtcdRegistry) UnRegister() {
|
||
r.cancel() // 停止主 context
|
||
|
||
// 停止 watcher
|
||
r.watcherMu.Lock()
|
||
if r.watcherCancel != nil {
|
||
r.watcherCancel()
|
||
}
|
||
r.watcherMu.Unlock()
|
||
|
||
r.mu.RLock()
|
||
cli := r.cli
|
||
leaseID := r.leaseID
|
||
key := r.key
|
||
r.mu.RUnlock()
|
||
|
||
if cli != nil {
|
||
cli.Revoke(context.Background(), leaseID)
|
||
cli.Delete(context.Background(), key)
|
||
log.Printf("[EtcdRegistry] 服务注销成功: %s", key)
|
||
}
|
||
|
||
if r.etcdClient != nil {
|
||
r.etcdClient.Close()
|
||
}
|
||
}
|
||
|
||
// State 获取连接状态
|
||
func (r *EtcdRegistry) State() ConnectionState {
|
||
if r.etcdClient != nil {
|
||
return r.etcdClient.State()
|
||
}
|
||
return StateDisconnected
|
||
}
|