Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] gRPC health server sets serving status to NOT_SERVING on defrag #17914

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type ServerConfig struct {
// a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
Expand Down
4 changes: 4 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ type Config struct {
// ExperimentalTxnModeWriteWithSharedBuffer enables write transaction to use a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// V2Deprecation describes phase of API & Storage V2 support
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`
}
Expand Down Expand Up @@ -515,6 +518,7 @@ func NewConfig() *Config {
ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
ExperimentalMemoryMlock: false,
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalStopGRPCServiceOnDefrag: false,

ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
V2Deprecation: cfg.V2DeprecationEffective(),
}
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
fs.BoolVar(&cfg.ec.ExperimentalMemoryMlock, "experimental-memory-mlock", cfg.ec.ExperimentalMemoryMlock, "Enable to enforce etcd pages (in particular bbolt) to stay in RAM.")
fs.BoolVar(&cfg.ec.ExperimentalTxnModeWriteWithSharedBuffer, "experimental-txn-mode-write-with-shared-buffer", true, "Enable the write transaction to use a shared buffer in its readonly check operations.")
fs.BoolVar(&cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding experimental prefix and removing it in future is a breaking change as we discussed in #17657. Also see #17657

Are we still happy to add prefix experimental for an flag?

I know this PR just backports changes from main to 3.5, so the suggestion (adding experimental in description only) is actually for the main branch (update main firstly). Do we want to follow the suggestion for now? Or do we have an agreement on #17657 and #17657 before we add any new flags? @fuweid @jmhbnz @serathius @siyuanfoundation @spzala

Suggested change
fs.BoolVar(&cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "experimental-stop-grpc-service-on-defrag", cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "Enable etcd gRPC service to stop serving client requests on defragmentation.")
fs.BoolVar(&cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "stop-grpc-service-on-defrag", cfg.ec.ExperimentalStopGRPCServiceOnDefrag, "(Experimental) Enable etcd gRPC service to stop serving client requests on defragmentation.")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of #17657 we will still need to support migration from the old flag naming scheme (with prefix) to the new one. One more flag should not change much as we already have other flags to migrate. So I don't think we need to block on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy for this to proceed using older approach. We are still working on the KEP for feature flags so it will be a while off and agree with @serathius above we don't want to block this work.

fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "(WARNING: Use this flag with caution!) Number of entries for a slow follower to catch up after compacting the raft storage entries.")

Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ Experimental feature:
Enable the write transaction to use a shared buffer in its readonly check operations.
--experimental-bootstrap-defrag-threshold-megabytes
Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.
--experimental-stop-grpc-service-on-defrag
Enable etcd gRPC service to stop serving client requests on defragmentation.

Unsafe feature:
--force-new-cluster 'false'
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3client/v3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client {
wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s))
c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)}

mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s))
mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s, nil))
c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c)

clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s))
Expand Down
7 changes: 2 additions & 5 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,10 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))

// server should register all the services manually
// use empty service name for all etcd services' health status,
// see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthNotifier := newHealthNotifier(hsrv, s)
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s, healthNotifier))
healthpb.RegisterHealthServer(grpcServer, hsrv)

// set zero values for metrics registered for this grpc server
Expand Down
77 changes: 77 additions & 0 deletions server/etcdserver/api/v3rpc/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3rpc

import (
"go.etcd.io/etcd/server/v3/etcdserver"
"go.uber.org/zap"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

const (
allGRPCServices = ""
)

type notifier interface {
defragStarted()
defragFinished()
}

func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier {
if hs == nil {
panic("unexpected nil gRPC health server")
}
hc := &healthNotifier{hs: hs, lg: s.Logger(), stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag}
// set grpc health server as serving status blindly since
// the grpc server will serve iff s.ReadyNotify() is closed.
hc.startServe()
return hc
}

type healthNotifier struct {
hs *health.Server
lg *zap.Logger

stopGRPCServiceOnDefrag bool
}

func (hc *healthNotifier) defragStarted() {
if !hc.stopGRPCServiceOnDefrag {
return
}
hc.stopServe("defrag is active")
}

func (hc *healthNotifier) defragFinished() { hc.startServe() }

func (hc *healthNotifier) startServe() {
hc.lg.Info(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_SERVING.String()),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_SERVING)
}

func (hc *healthNotifier) stopServe(reason string) {
hc.lg.Warn(
"grpc service status changed",
zap.String("service", allGRPCServices),
zap.String("status", healthpb.HealthCheckResponse_NOT_SERVING.String()),
zap.String("reason", reason),
)
hc.hs.SetServingStatus(allGRPCServices, healthpb.HealthCheckResponse_NOT_SERVING)
}
8 changes: 6 additions & 2 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ type maintenanceServer struct {
hdr header
cs ClusterStatusGetter
d Downgrader

healthNotifier notifier
}

func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
func NewMaintenanceServer(s *etcdserver.EtcdServer, healthNotifier notifier) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s, healthNotifier: healthNotifier}
if srv.lg == nil {
srv.lg = zap.NewNop()
}
Expand All @@ -88,6 +90,8 @@ func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {

func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
ms.lg.Info("starting defragment")
ms.healthNotifier.defragStarted()
defer ms.healthNotifier.defragFinished()
err := ms.bg.Backend().Defrag()
if err != nil {
ms.lg.Warn("failed to defragment", zap.Error(err))
Expand Down
159 changes: 159 additions & 0 deletions tests/e2e/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
_ "google.golang.org/grpc/health"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

const (
// in sync with how kubernetes uses etcd
// https://github.com/kubernetes/kubernetes/blob/release-1.28/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go#L59-L71
keepaliveTime = 30 * time.Second
keepaliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

clientRuntime = 10 * time.Second
requestTimeout = 100 * time.Millisecond
)

func TestFailoverOnDefrag(t *testing.T) {
tcs := []struct {
name string

experimentalStopGRPCServiceOnDefragEnabled bool
gRPCDialOptions []grpc.DialOption

// common assertion
expectedMinQPS float64
// happy case assertion
expectedMaxFailureRate float64
// negative case assertion
expectedMinFailureRate float64
}{
{
name: "defrag failover happy case",
experimentalStopGRPCServiceOnDefragEnabled: true,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMaxFailureRate: 0.01,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to false",
experimentalStopGRPCServiceOnDefragEnabled: false,
gRPCDialOptions: []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`),
},
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
{
name: "defrag blocks one-third of requests with stopGRPCServiceOnDefrag set to true and client health check disabled",
experimentalStopGRPCServiceOnDefragEnabled: true,
expectedMinQPS: 20,
expectedMinFailureRate: 0.25,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
e2e.BeforeTest(t)
cfg := e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
ExperimentalStopGRPCServiceOnDefrag: tc.experimentalStopGRPCServiceOnDefragEnabled,
}
clus, err := e2e.NewEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
t.Cleanup(func() { clus.Stop() })

endpoints := clus.EndpointsGRPC()

requestVolume, successfulRequestCount := 0, 0
start := time.Now()
g := new(errgroup.Group)
g.Go(func() (lastErr error) {
clusterClient, cerr := clientv3.New(clientv3.Config{
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
Endpoints: endpoints,
DialOptions: tc.gRPCDialOptions,
})
if cerr != nil {
return cerr
}
defer clusterClient.Close()

timeout := time.After(clientRuntime)
for {
select {
case <-timeout:
return lastErr
default:
}
getContext, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := clusterClient.Get(getContext, "health")
cancel()
requestVolume++
if err != nil {
lastErr = err
continue
}
successfulRequestCount++
}
})

triggerDefrag(t, clus.Procs[0])

err = g.Wait()
if err != nil {
t.Logf("etcd client failed to fail over, error (%v)", err)
}
qps := float64(requestVolume) / float64(time.Since(start)) * float64(time.Second)
failureRate := 1 - float64(successfulRequestCount)/float64(requestVolume)
t.Logf("request failure rate is %.2f%%, qps is %.2f requests/second", failureRate*100, qps)

require.GreaterOrEqual(t, qps, tc.expectedMinQPS)
if tc.expectedMaxFailureRate != 0.0 {
require.LessOrEqual(t, failureRate, tc.expectedMaxFailureRate)
}
if tc.expectedMinFailureRate != 0.0 {
require.GreaterOrEqual(t, failureRate, tc.expectedMinFailureRate)
}
})
}
}

func triggerDefrag(t *testing.T, member e2e.EtcdProcess) {
require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragBeforeCopy", `sleep("10s")`))
require.NoError(t, member.Etcdctl(e2e.ClientNonTLS, false, false).Defragment(time.Minute))
}
5 changes: 5 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int

ExperimentalStopGRPCServiceOnDefrag bool
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -329,6 +331,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.InitialCorruptCheck {
args = append(args, "--experimental-initial-corrupt-check")
}
if cfg.ExperimentalStopGRPCServiceOnDefrag {
args = append(args, "--experimental-stop-grpc-service-on-defrag")
}
var murl string
if cfg.MetricsURLScheme != "" {
murl = (&url.URL{
Expand Down
10 changes: 10 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
Expand Down Expand Up @@ -166,6 +167,15 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Defragment(timeout time.Duration) error {
args := append(ctl.cmdArgs(), "defrag")
if timeout != 0 {
args = append(args, fmt.Sprintf("--command-timeout=%s", timeout))
}

return SpawnWithExpectWithEnv(args, ctl.env(), "Finished defragmenting etcd member")
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Expand Down
Loading