Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2beefb2491 |
105
etcd_registry.go
105
etcd_registry.go
@@ -22,18 +22,23 @@ type EtcdRegistryConfig struct {
|
|||||||
|
|
||||||
// EtcdRegistry etcd 服务注册(带自动重连)
|
// EtcdRegistry etcd 服务注册(带自动重连)
|
||||||
type EtcdRegistry struct {
|
type EtcdRegistry struct {
|
||||||
config EtcdRegistryConfig
|
config EtcdRegistryConfig
|
||||||
etcdClient *EtcdClient
|
etcdClient *EtcdClient
|
||||||
cli *clientv3.Client
|
cli *clientv3.Client
|
||||||
key string
|
key string
|
||||||
val string
|
val string
|
||||||
serviceName string
|
serviceName string
|
||||||
ttl int64
|
ttl int64
|
||||||
leaseID clientv3.LeaseID
|
leaseID clientv3.LeaseID
|
||||||
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
|
keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
watcherCtx context.Context
|
||||||
|
watcherCancel context.CancelFunc
|
||||||
|
watcherMu sync.Mutex
|
||||||
|
lastRegisterTime time.Time
|
||||||
|
registerMu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
|
// NewEtcdRegistry 创建带重连功能的 etcd 服务注册器
|
||||||
@@ -53,12 +58,15 @@ func NewEtcdRegistry(cfg EtcdRegistryConfig) (*EtcdRegistry, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
watcherCtx, watcherCancel := context.WithCancel(context.Background())
|
||||||
return &EtcdRegistry{
|
return &EtcdRegistry{
|
||||||
config: cfg,
|
config: cfg,
|
||||||
etcdClient: etcdClient,
|
etcdClient: etcdClient,
|
||||||
cli: etcdClient.GetClient(),
|
cli: etcdClient.GetClient(),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
watcherCtx: watcherCtx,
|
||||||
|
watcherCancel: watcherCancel,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,6 +83,16 @@ func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
||||||
|
// 防抖:防止短时间内重复注册
|
||||||
|
r.registerMu.Lock()
|
||||||
|
now := time.Now()
|
||||||
|
if !r.lastRegisterTime.IsZero() && now.Sub(r.lastRegisterTime) < 2*time.Second {
|
||||||
|
r.registerMu.Unlock()
|
||||||
|
return fmt.Errorf("register too frequently, please wait")
|
||||||
|
}
|
||||||
|
r.lastRegisterTime = now
|
||||||
|
r.registerMu.Unlock()
|
||||||
|
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
cli := r.cli
|
cli := r.cli
|
||||||
key := r.key
|
key := r.key
|
||||||
@@ -149,9 +167,8 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
|||||||
return fmt.Errorf("failed to put key: %w", err)
|
return fmt.Errorf("failed to put key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second)
|
// KeepAlive 需要使用长期有效的 context,不能使用带超时的 context
|
||||||
defer cancel4()
|
keepAliveCh, err := cli.KeepAlive(context.Background(), resp.ID)
|
||||||
keepAliveCh, err := cli.KeepAlive(ctx4, resp.ID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to start keep-alive: %w", err)
|
return fmt.Errorf("failed to start keep-alive: %w", err)
|
||||||
}
|
}
|
||||||
@@ -160,18 +177,31 @@ func (r *EtcdRegistry) registerWithKV(ttl int64) error {
|
|||||||
r.keepAliveCh = keepAliveCh
|
r.keepAliveCh = keepAliveCh
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
|
|
||||||
go r.watcher(ttl)
|
// 停止旧的 watcher(如果存在)
|
||||||
|
r.watcherMu.Lock()
|
||||||
|
if r.watcherCancel != nil {
|
||||||
|
r.watcherCancel()
|
||||||
|
}
|
||||||
|
// 创建新的 watcher context
|
||||||
|
r.watcherCtx, r.watcherCancel = context.WithCancel(context.Background())
|
||||||
|
watcherCtx := r.watcherCtx
|
||||||
|
r.watcherMu.Unlock()
|
||||||
|
|
||||||
|
go r.watcher(ttl, watcherCtx)
|
||||||
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
|
log.Printf("[EtcdRegistry] 服务注册成功: %s", key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// watcher 监听续约
|
// watcher 监听续约
|
||||||
func (r *EtcdRegistry) watcher(ttl int64) {
|
func (r *EtcdRegistry) watcher(ttl int64, watcherCtx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
log.Println("[EtcdRegistry] context done, watcher exiting.")
|
log.Println("[EtcdRegistry] context done, watcher exiting.")
|
||||||
return
|
return
|
||||||
|
case <-watcherCtx.Done():
|
||||||
|
log.Println("[EtcdRegistry] watcher context done, exiting.")
|
||||||
|
return
|
||||||
case ka, ok := <-r.keepAliveCh:
|
case ka, ok := <-r.keepAliveCh:
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
|
log.Println("[EtcdRegistry] keep-alive channel closed, waiting for connection recovery...")
|
||||||
@@ -185,6 +215,8 @@ func (r *EtcdRegistry) watcher(ttl int64) {
|
|||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-watcherCtx.Done():
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,15 +229,21 @@ func (r *EtcdRegistry) watcher(ttl int64) {
|
|||||||
r.cli = r.etcdClient.GetClient()
|
r.cli = r.etcdClient.GetClient()
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
|
|
||||||
// 等待一小段时间确保连接稳定
|
// 等待一段时间确保连接稳定,避免立即重新注册
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
// 尝试重新注册
|
// 尝试重新注册
|
||||||
if err := r.registerWithKV(ttl); err == nil {
|
if err := r.registerWithKV(ttl); err == nil {
|
||||||
log.Println("[EtcdRegistry] 服务重新注册成功")
|
log.Println("[EtcdRegistry] 服务重新注册成功")
|
||||||
return // 退出当前 goroutine,让新的 watcher 启动
|
return // 退出当前 goroutine,让新的 watcher 启动
|
||||||
} else {
|
} else {
|
||||||
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
|
// 如果是防抖错误,继续等待
|
||||||
|
if err.Error() == "register too frequently, please wait" {
|
||||||
|
log.Println("[EtcdRegistry] 注册过于频繁,继续等待...")
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
} else {
|
||||||
|
log.Printf("[EtcdRegistry] 重新注册失败: %v, 继续等待连接恢复...", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -223,6 +261,8 @@ func (r *EtcdRegistry) watcher(ttl int64) {
|
|||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-watcherCtx.Done():
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -237,6 +277,12 @@ func (r *EtcdRegistry) watcher(ttl int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := r.registerWithKV(ttl); err != nil {
|
if err := r.registerWithKV(ttl); err != nil {
|
||||||
|
// 如果是防抖错误,等待更长时间
|
||||||
|
if err.Error() == "register too frequently, please wait" {
|
||||||
|
log.Println("[EtcdRegistry] 注册过于频繁,等待更长时间...")
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
|
log.Printf("[EtcdRegistry] failed to re-register service: %v, retrying in %v...", err, delay)
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
delay *= 2
|
delay *= 2
|
||||||
@@ -257,7 +303,14 @@ func (r *EtcdRegistry) watcher(ttl int64) {
|
|||||||
|
|
||||||
// UnRegister 注销服务
|
// UnRegister 注销服务
|
||||||
func (r *EtcdRegistry) UnRegister() {
|
func (r *EtcdRegistry) UnRegister() {
|
||||||
r.cancel() // 停止 watcher
|
r.cancel() // 停止主 context
|
||||||
|
|
||||||
|
// 停止 watcher
|
||||||
|
r.watcherMu.Lock()
|
||||||
|
if r.watcherCancel != nil {
|
||||||
|
r.watcherCancel()
|
||||||
|
}
|
||||||
|
r.watcherMu.Unlock()
|
||||||
|
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
cli := r.cli
|
cli := r.cli
|
||||||
|
|||||||
Reference in New Issue
Block a user