Skip to content

Commit

Permalink
gRPC health server sets serving status to NOT_SERVING on defrag
Browse files Browse the repository at this point in the history
gRPC health server sets serving status to NOT_SERVING on defrag
Backport from 3.6 in #16278

Co-authored-by: Chao Chen <chaochn@amazon.com>
Signed-off-by: Thomas Jungblut <tjungblu@redhat.com>
  • Loading branch information
tjungblu and chaochn47 committed Apr 30, 2024
1 parent 31a87cf commit 750bc0b
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 8 deletions.
3 changes: 3 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type ServerConfig struct {
// a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
Expand Down
4 changes: 4 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ type Config struct {
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// V2Deprecation describes phase of API & Storage V2 support
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`
}
Expand Down Expand Up @@ -515,6 +518,7 @@ func NewConfig() *Config {
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalStopGRPCServiceOnDefrag: false,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
V2Deprecation: cfg.V2DeprecationEffective(),
}
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.BoolVar(&cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "(WARNING: Use this flag with caution!) Number of entries for a slow follower to catch up after compacting the raft storage entries.")

Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ Experimental feature:
Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
--experimental-stop-grpc-service-on-defrag
Enable etcd gRPC service to stop serving client requests on defragmentation.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
6 changes: 1 addition & 5 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, NewHealthNotifier(hsrv, s.Logger())))
healthpb.RegisterHealthServer(grpcServer, hsrv)

// set zero values for metrics registered for this grpc server
Expand Down
68 changes: 68 additions & 0 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3rpc

import (
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

const (
allGRPCServices = ""
)

type HealthNotifier interface {
StartServe()
StopServe(reason string)
}

func NewHealthNotifier(hs *health.Server, lg *zap.Logger) HealthNotifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
if lg == nil {
lg = zap.NewNop()
}
hc := &healthChecker{hs: hs, lg: lg}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.StartServe()
return hc
}

type healthChecker struct {
hs *health.Server
lg *zap.Logger
}

func (hc *healthChecker) StartServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_SERVING.String()),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}

func (hc *healthChecker) StopServe(reason string) {
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_NOT_SERVING.String()),
zap.String("reason", reason),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_NOT_SERVING)
}
11 changes: 9 additions & 2 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ type maintenanceServer struct {
hdr header
cs ClusterStatusGetter
d Downgrader
hn HealthNotifier

stopServingOnDefrag bool
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
func NewMaintenanceServer(s *etcdserver.EtcdServer, hn HealthNotifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, hn: hn, stopServingOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -88,6 +91,10 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
if ms.stopServingOnDefrag {
ms.hn.StopServe("defrag is active")
defer ms.hn.StartServe()
}
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
157 changes: 157 additions & 0 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

clientRuntime = 10 * time.Second
requestTimeout = 100 * time.Millisecond
)

func TestFailoverOnDefrag(t *testing.T) {
tcs := []struct {
name string

experimentalStopGRPCServiceOnDefragEnabled bool
gRPCDialOptions []grpc.DialOption

// common assertion
expectedMinTotalRequestsCount int
// happy case assertion
expectedMaxFailedRequestsCount int
// negative case assertion
expectedMinFailedRequestsCount int
}{
{
name: "defrag failover happy case",
experimentalStopGRPCServiceOnDefragEnabled: true,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMaxFailedRequestsCount: 5,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
experimentalStopGRPCServiceOnDefragEnabled: false,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
experimentalStopGRPCServiceOnDefragEnabled: true,
expectedMinTotalRequestsCount: 300,
expectedMinFailedRequestsCount: 90,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
ExperimentalStopGRPCServiceOnDefrag: tc.experimentalStopGRPCServiceOnDefragEnabled,
}
clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
t.Cleanup(func() { clus.Stop() })

endpoints := clus.EndpointsGRPC()

requestVolume, successfulRequestCount := 0, 0
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: tc.gRPCDialOptions,
})
if cerr != nil {
return cerr
}
defer clusterClient.Close()

timeout := time.After(clientRuntime)
for {
select {
case <-timeout:
return lastErr
default:
}
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := clusterClient.Get(getContext, "health")
cancel()
requestVolume++
if err != nil {
lastErr = err
continue
}
successfulRequestCount++
}
})

triggerDefrag(t, clus.Procs[0])

err = g.Wait()
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
t.Logf("request failure rate is %.2f%%, traffic volume successfulRequestCount %d requests, total %d requests", (1-float64(successfulRequestCount)/float64(requestVolume))*100, successfulRequestCount, requestVolume)

require.GreaterOrEqual(t, requestVolume, tc.expectedMinTotalRequestsCount)
failedRequestCount := requestVolume - successfulRequestCount
if tc.expectedMaxFailedRequestsCount != 0 {
require.LessOrEqual(t, failedRequestCount, tc.expectedMaxFailedRequestsCount)
}
if tc.expectedMinFailedRequestsCount != 0 {
require.GreaterOrEqual(t, failedRequestCount, tc.expectedMinFailedRequestsCount)
}
})
}
}

func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
require.NoError(t, member.Etcdctl(e2e.ClientNonTLS, false, false).Defragment(time.Minute))
}
5 changes: 5 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int

ExperimentalStopGRPCServiceOnDefrag bool
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -329,6 +331,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.InitialCorruptCheck {
args = append(args, "--experimental-initial-corrupt-check")
}
if cfg.ExperimentalStopGRPCServiceOnDefrag {
args = append(args, "--experimental-stop-grpc-service-on-defrag")
}
var murl string
if cfg.MetricsURLScheme != "" {
murl = (&url.URL{
Expand Down
10 changes: 10 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
Expand Down Expand Up @@ -166,6 +167,15 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Defragment(timeout time.Duration) error {
args := append(ctl.cmdArgs(), "defrag")
if timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", timeout))
}

return SpawnWithExpectWithEnv(args, ctl.env(), "Finished defragmenting etcd member")
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Expand Down

0 comments on commit 750bc0b

Please sign in to comment.