Skip to content

Commit

Permalink
Support configuring MaxConcurrentStreams for http2
Browse files Browse the repository at this point in the history
Backport etcd-io#14219 to 3.4

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Jul 21, 2022
1 parent 40ccb8b commit 3106c3e
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 11 deletions.
9 changes: 8 additions & 1 deletion embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -55,6 +56,7 @@ const (
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -177,6 +179,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -274,7 +280,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`

//The AuthTokenTTL in seconds of the simple token
// AuthTokenTTL specifies the TTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
Expand Down Expand Up @@ -394,6 +400,7 @@ func NewConfig() *Config {

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
Expand Down
6 changes: 5 additions & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
Expand Down Expand Up @@ -331,7 +332,10 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.String("initial-cluster", sc.InitialPeerURLsMap.String()),
zap.String("initial-cluster-state", ec.ClusterState),
zap.String("initial-cluster-token", sc.InitialClusterToken),
zap.Int64("quota-size-bytes", quota),
zap.Int64("quota-backend-bytes", quota),
zap.Uint("max-request-bytes", sc.MaxRequestBytes),
zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams),

zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
Expand Down
16 changes: 16 additions & 0 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"context"
"fmt"
"golang.org/x/net/http2"
"io/ioutil"
defaultLog "log"
"math"
Expand Down Expand Up @@ -132,6 +133,10 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srvhttp, s.Cfg); err != nil {
sctx.lg.Error("Configure http server failed", zap.Error(err))
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

Expand Down Expand Up @@ -185,6 +190,10 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHttpServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -202,6 +211,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}

func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error {
// todo (ahrtr): should we support configuring other parameters in the future as well?
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
})
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
Expand Down
4 changes: 4 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).")
fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).")

fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.")

// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),
Expand Down Expand Up @@ -336,6 +338,8 @@ func (cfg *config) configFromCmdLine() error {

cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")

cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")

// TODO: remove this in v3.5
cfg.ec.DeprecatedLogOutput = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-output")
cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")
Expand Down
12 changes: 12 additions & 0 deletions etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)
Expand Down Expand Up @@ -86,6 +87,8 @@ var (
grpcProxyEnableOrdering bool

grpcProxyDebug bool

maxConcurrentStreams uint32
)

const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
Expand Down Expand Up @@ -146,6 +149,8 @@ func newGRPCProxyStartCommand() *cobra.Command {

cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")

cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.")

return &cmd
}

Expand Down Expand Up @@ -195,6 +200,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)

srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client)

if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
lg.Fatal("Failed to configure the http server", zap.Error(err))
}

errc := make(chan error)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
Expand Down
2 changes: 2 additions & 0 deletions etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams 'math.MaxUint32'
Maximum concurrent streams that each client can open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Expand Down
3 changes: 1 addition & 2 deletions etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

Expand All @@ -53,7 +52,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio
)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))
grpcServer := grpc.NewServer(append(opts, gopts...)...)

pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
Expand Down
4 changes: 4 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxConcurrentStreams specifies the maximum number of concurrent
// streams that each client can open at a time.
MaxConcurrentStreams uint32

WarningApplyDuration time.Duration

StrictReconfigCheck bool
Expand Down
45 changes: 45 additions & 0 deletions pkg/flags/uint32.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"strconv"
)

type uint32Value uint32

// NewUint32Value creates an uint32 instance with the provided value.
func NewUint32Value(v uint32) *uint32Value {
val := new(uint32Value)
*val = uint32Value(v)
return val
}

// Set parses a command line uint32 value.
// Implements "flag.Value" interface.
func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}

func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}
111 changes: 111 additions & 0 deletions pkg/flags/uint32_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"testing"

"github.com/stretchr/testify/assert"
)

func TestUint32Value(t *testing.T) {
cases := []struct {
name string
s string
expectedVal uint32
expectError bool
}{
{
name: "normal uint32 value",
s: "200",
expectedVal: 200,
},
{
name: "zero value",
s: "0",
expectedVal: 0,
},
{
name: "negative int value",
s: "-200",
expectError: true,
},
{
name: "invalid integer value",
s: "invalid",
expectError: true,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var val uint32Value
err := val.Set(tc.s)

if tc.expectError {
if err == nil {
t.Errorf("Expected failure on parsing uint32 value from %s", tc.s)
}
} else {
if err != nil {
t.Errorf("Unexpected error when parsing %s: %v", tc.s, err)
}
assert.Equal(t, uint32(val), tc.expectedVal)
}
})
}
}

func TestUint32FromFlag(t *testing.T) {
const flagName = "max-concurrent-streams"

cases := []struct {
name string
defaultVal uint32
arguments []string
expectedVal uint32
}{
{
name: "only default value",
defaultVal: 15,
arguments: []string{},
expectedVal: 15,
},
{
name: "argument has different value from the default one",
defaultVal: 16,
arguments: []string{"--max-concurrent-streams", "200"},
expectedVal: 200,
},
{
name: "argument has the same value from the default one",
defaultVal: 105,
arguments: []string{"--max-concurrent-streams", "105"},
expectedVal: 105,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.")
if err := fs.Parse(tc.arguments); err != nil {
t.Fatalf("Unexpected error: %v\n", err)
}
actualMaxStream := Uint32FromFlag(fs, flagName)
assert.Equal(t, actualMaxStream, tc.expectedVal)
})
}
}
6 changes: 6 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ type etcdProcessClusterConfig struct {
enableV2 bool
initialCorruptCheck bool
authTokenOpts string

MaxConcurrentStreams uint32 // default is math.MaxUint32
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -263,6 +265,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
args = append(args, "--auth-token", cfg.authTokenOpts)
}

if cfg.MaxConcurrentStreams != 0 {
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}

etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
Expand Down
Loading

0 comments on commit 3106c3e

Please sign in to comment.