Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual cherry pick of #12448 on release 3.4 #12740

Merged
merged 1 commit into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
Expand Down Expand Up @@ -285,6 +286,9 @@ type Config struct {
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down Expand Up @@ -388,8 +392,9 @@ func NewConfig() *Config {
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ Experimental feature:
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.

Unsafe feature:
--force-new-cluster 'false'
Expand Down
6 changes: 1 addition & 5 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ import (
"go.uber.org/zap"
)

const (
warnApplyDuration = 100 * time.Millisecond
)

type applyResult struct {
resp proto.Message
err error
Expand Down Expand Up @@ -115,7 +111,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
stringer: r,
alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
}
defer warnOfExpensiveRequest(s.getLogger(), time.Now(), stringer, nil, nil)
defer warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, time.Now(), stringer, nil, nil)

switch r.Method {
case "POST":
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

WarningApplyDuration time.Duration

StrictReconfigCheck bool

// ClientCertAuthEnabled is true when cert has been signed by the client CA.
Expand Down
18 changes: 9 additions & 9 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ func (nc *notifier) notify(err error) {
close(nc.c)
}

func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
func warnOfExpensiveRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "", resp, err)
}

func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
Expand All @@ -130,7 +130,7 @@ func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer
}
}

func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
Expand All @@ -145,25 +145,25 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
d := time.Since(now)
if d > warnApplyDuration {
if d > warningApplyDuration {
if lg != nil {
lg.Warn(
"apply request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.Duration("expected-duration", warningApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
if resp != nil {
trace.AddField(
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}

defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err)
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
}(time.Now())

get := func() { resp, err = s.applyV3Base.Txn(r) }
Expand Down