Skip to content

Commit

Permalink
feat: add retry feature for etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
Skyenought committed Sep 28, 2023
1 parent cab5b29 commit 4d2b580
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 30 deletions.
51 changes: 51 additions & 0 deletions etcd/example/server/retry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"time"

"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"github.com/hertz-contrib/registry/etcd"
)

func main() {
r, _ := etcd.NewEtcdRegistry(
[]string{"127.0.0.1:2379"},
etcd.WithMaxAttemptTimes(10),
etcd.WithObserveDelay(20*time.Second),
etcd.WithRetryDelay(5*time.Second),
)

addr := "127.0.0.1:8888"
h := server.Default(
server.WithHostPorts(addr),
server.WithRegistry(r, &registry.Info{
ServiceName: "hertz.test.demo",
Addr: utils.NewNetAddr("tcp", addr),
Weight: 10,
Tags: nil,
}),
)
h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"})
})
h.Spin()
}
File renamed without changes.
58 changes: 50 additions & 8 deletions etcd/common.go → etcd/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"os"
"strconv"
"time"

"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/hlog"
Expand All @@ -32,6 +33,50 @@ const (
defaultTTL = 60
)

type option struct {
// etcd client config
etcdCfg clientv3.Config
retryCfg *retryCfg
}

type retryCfg struct {
// The maximum number of call attempt times, including the initial call
maxAttemptTimes uint
// observeDelay is the delay time for checking the service status under normal conditions
observeDelay time.Duration
// retryDelay is the delay time for attempting to register the service after disconnecting
retryDelay time.Duration
}

type Option func(o *option)

// WithMaxAttemptTimes sets the maximum number of call attempt times, including the initial call
func WithMaxAttemptTimes(maxAttemptTimes uint) Option {
return func(o *option) {
o.retryCfg.maxAttemptTimes = maxAttemptTimes
}
}

// WithObserveDelay sets the delay time for checking the service status under normal conditions
func WithObserveDelay(observeDelay time.Duration) Option {
return func(o *option) {
o.retryCfg.observeDelay = observeDelay
}
}

// WithRetryDelay sets the delay time of retry
func WithRetryDelay(t time.Duration) Option {
return func(o *option) {
o.retryCfg.retryDelay = t
}
}

func (o *option) apply(opts ...Option) {
for _, opt := range opts {
opt(o)
}
}

// instanceInfo used to stored service basic info in etcd.
type instanceInfo struct {
Network string `json:"network"`
Expand Down Expand Up @@ -74,25 +119,22 @@ func getTTL() int64 {
return ttl
}

// Option sets options such as username, tls etc.
type Option func(cfg *clientv3.Config)

// WithTLSOpt returns a option that authentication by tls/ssl.
func WithTLSOpt(certFile, keyFile, caFile string) Option {
return func(cfg *clientv3.Config) {
return func(o *option) {
tlsCfg, err := newTLSConfig(certFile, keyFile, caFile, "")
if err != nil {
hlog.Errorf("HERTZ: tls failed with err: %v , skipping tls.", err)
}
cfg.TLS = tlsCfg
o.etcdCfg.TLS = tlsCfg
}
}

// WithAuthOpt returns an option that authentication by username and password.
func WithAuthOpt(username, password string) Option {
return func(cfg *clientv3.Config) {
cfg.Username = username
cfg.Password = password
return func(o *option) {
o.etcdCfg.Username = username
o.etcdCfg.Password = password
}
}

Expand Down
66 changes: 66 additions & 0 deletions etcd/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,72 @@ func main() {
}
}
```
## Retry

After the service is registered to `ETCD`, it will regularly check the status of the service. If any abnormal status is found, it will try to register the service again. `observeDelay` is the delay time for checking the service status under normal conditions, and `retryDelay` is the delay time for attempting to register the service after disconnecting.

### Default Retry Config

```go
type retryCfg struct {
// The maximum number of call attempt times, including the initial call
maxAttemptTimes uint
// observeDelay is the delay time for checking the service status under normal conditions
observeDelay time.Duration
// The delay time of observing etcd key
retryDelay time.Duration
}
```

| Config Name | Default Value | Description |
|:--------------------|:-----------------|:------------------------------------------------------------------------------------------|
| WithMaxAttemptTimes | 5 | Used to set the maximum number of attempts, if 0, it means infinite attempts |
| WithObserveDelay | 30 * time.Second | Used to set the delay time for checking service status under normal connection conditions |
| WithRetryDelay | 10 * time.Second | Used to set the retry delay time after disconnecting |

### Example

```go
package main

import (
"context"
"time"

"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"github.com/hertz-contrib/registry/etcd"
)

func main() {
r, _ := etcd.NewEtcdRegistry(
[]string{"127.0.0.1:2379"},
etcd.WithMaxAttemptTimes(10),
etcd.WithObserveDelay(20*time.Second),
etcd.WithRetryDelay(5*time.Second),
)

addr := "127.0.0.1:8888"
h := server.Default(
server.WithHostPorts(addr),
server.WithRegistry(r, &registry.Info{
ServiceName: "hertz.test.demo",
Addr: utils.NewNetAddr("tcp", addr),
Weight: 10,
Tags: nil,
}),
)
h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"})
})
h.Spin()
}

```

## How to Dynamically specify ip and port

To dynamically specify an IP and port, one should first set the environment variables `HERTZ_IP_TO_REGISTRY` and `HERTZ_PORT_TO_REGISTRY`. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address.
Expand Down
119 changes: 103 additions & 16 deletions etcd/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ const (
)

type etcdRegistry struct {
etcdClient *clientv3.Client
leaseTTL int64
meta *registerMeta
mu sync.Mutex
etcdClient *clientv3.Client
retryConfig *retryCfg

leaseTTL int64
meta *registerMeta
mu sync.Mutex
stop chan struct{}
}

type registerMeta struct {
Expand All @@ -52,19 +55,27 @@ type registerMeta struct {

// NewEtcdRegistry creates a etcd based registry.
func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
}
for _, opt := range opts {
opt(&cfg)
cfg := &option{
etcdCfg: clientv3.Config{
Endpoints: endpoints,
},
retryCfg: &retryCfg{
maxAttemptTimes: 5,
observeDelay: 30 * time.Second,
retryDelay: 10 * time.Second,
},
}
etcdClient, err := clientv3.New(cfg)
cfg.apply(opts...)

etcdClient, err := clientv3.New(cfg.etcdCfg)
if err != nil {
return nil, err
}
return &etcdRegistry{
etcdClient: etcdClient,
leaseTTL: getTTL(),
etcdClient: etcdClient,
leaseTTL: getTTL(),
retryConfig: cfg.retryCfg,
stop: make(chan struct{}, 1),
}, nil
}

Expand All @@ -84,12 +95,13 @@ func (e *etcdRegistry) Register(info *registry.Info) error {
leaseID: leaseID,
}
meta.ctx, meta.cancel = context.WithCancel(context.Background())
if err := e.keepalive(&meta); err != nil {
if err := e.keepalive(meta); err != nil {
return err
}
e.mu.Lock()
e.meta = &meta
e.mu.Unlock()

return nil
}

Expand Down Expand Up @@ -133,7 +145,16 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
return err
if err != nil {
return err
}

// retry start
go func(key, val string) {
e.keepRegister(key, val, e.retryConfig)
}(serviceKey(info.ServiceName, addr), string(val))

return nil
}

func (e *etcdRegistry) deregister(info *registry.Info) error {
Expand All @@ -144,11 +165,15 @@ func (e *etcdRegistry) deregister(info *registry.Info) error {
return err
}
_, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr))
return err
if err != nil {
return err
}
e.stop <- struct{}{}
return nil
}

// keepalive keep the lease alive
func (e *etcdRegistry) keepalive(meta *registerMeta) error {
func (e *etcdRegistry) keepalive(meta registerMeta) error {
keepAlive, err := e.etcdClient.KeepAlive(meta.ctx, meta.leaseID)
if err != nil {
return err
Expand All @@ -168,6 +193,68 @@ func (e *etcdRegistry) keepalive(meta *registerMeta) error {
return nil
}

// keepRegister keep register service by retryConfig
func (e *etcdRegistry) keepRegister(key, val string, retryConfig *retryCfg) {
var failedTimes uint
delay := retryConfig.observeDelay
// if maxAttemptTimes is 0, keep register forever
for retryConfig.maxAttemptTimes == 0 || failedTimes < retryConfig.maxAttemptTimes {
select {
case _, ok := <-e.stop:
if !ok {
close(e.stop)
}
hlog.Infof("stop keep register service %s", key)
return
case <-time.After(delay):
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
resp, err := e.etcdClient.Get(ctx, key)
cancel()

if err != nil {
hlog.Warnf("keep register get %s failed with err: %v", key, err)
delay = retryConfig.retryDelay
failedTimes++
continue
}

if len(resp.Kvs) == 0 {
hlog.Infof("keep register service %s", key)
delay = retryConfig.retryDelay
leaseID, err := e.grantLease()
if err != nil {
hlog.Warnf("keep register grant lease %s failed with err: %v", key, err)
failedTimes++
continue
}

_, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID))
if err != nil {
hlog.Warnf("keep register put %s failed with err: %v", key, err)
failedTimes++
continue
}

meta := registerMeta{
leaseID: leaseID,
}
meta.ctx, meta.cancel = context.WithCancel(context.Background())
if err := e.keepalive(meta); err != nil {
hlog.Warnf("keep register keepalive %s failed with err: %v", key, err)
failedTimes++
continue
}
e.meta.cancel()
e.meta = &meta
delay = retryConfig.observeDelay
}
failedTimes = 0
}
hlog.Errorf("keep register service %s failed times:%d", key, failedTimes)
}

// getAddressOfRegistration returns the address of the service registration.
func (e *etcdRegistry) getAddressOfRegistration(info *registry.Info) (string, error) {
host, port, err := net.SplitHostPort(info.Addr.String())
Expand Down
Loading

0 comments on commit 4d2b580

Please sign in to comment.