diff --git a/embed/config.go b/embed/config.go index a084f0a07d7..70a311c6cb1 100644 --- a/embed/config.go +++ b/embed/config.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "fmt" "io/ioutil" + "math" "net" "net/http" "net/url" @@ -55,6 +56,7 @@ const ( DefaultMaxTxnOps = uint(128) DefaultWarningApplyDuration = 100 * time.Millisecond DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultMaxConcurrentStreams = math.MaxUint32 DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second @@ -177,6 +179,10 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` + LPUrls, LCUrls []url.URL APUrls, ACUrls []url.URL ClientTLSInfo transport.TLSInfo @@ -274,7 +280,7 @@ type Config struct { AuthToken string `json:"auth-token"` BcryptCost uint `json:"bcrypt-cost"` - //The AuthTokenTTL in seconds of the simple token + // AuthTokenTTL specifies the TTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` @@ -394,6 +400,7 @@ func NewConfig() *Config { MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + MaxConcurrentStreams: DefaultMaxConcurrentStreams, ExperimentalWarningApplyDuration: DefaultWarningApplyDuration, GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime, diff --git a/embed/etcd.go b/embed/etcd.go index 5d924eb8670..477f02e0603 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -188,6 +188,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, AuthToken: cfg.AuthToken, @@ -331,7 +332,10 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), zap.String("initial-cluster-state", ec.ClusterState), zap.String("initial-cluster-token", sc.InitialClusterToken), - zap.Int64("quota-size-bytes", quota), + zap.Int64("quota-backend-bytes", quota), + zap.Uint("max-request-bytes", sc.MaxRequestBytes), + zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams), + zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), diff --git a/embed/serve.go b/embed/serve.go index b76604d8eeb..2dfe567c0e8 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -17,6 +17,7 @@ package embed import ( "context" "fmt" + "golang.org/x/net/http2" "io/ioutil" defaultLog "log" "math" @@ -132,6 +133,10 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } + if err = configureHttpServer(srvhttp, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() @@ -185,6 +190,10 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } + if err = configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err + } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -202,6 +211,13 @@ func (sctx *serveCtx) serve( return m.Serve() } +func configureHttpServer(srv *http.Server, cfg etcdserver.ServerConfig) error { + // todo (ahrtr): should we support configuring other parameters in the future as well? + return http2.ConfigureServer(srv, &http2.Server{ + MaxConcurrentStreams: cfg.MaxConcurrentStreams, + }) +} + // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC // connections or otherHandler otherwise. Given in gRPC docs. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { diff --git a/etcdmain/config.go b/etcdmain/config.go index ecb16475e76..751257c349b 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -163,6 +163,8 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.ec.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).") fs.DurationVar(&cfg.ec.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.ec.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") + // clustering fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""), @@ -336,6 +338,8 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites") + cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams") + // TODO: remove this in v3.5 cfg.ec.DeprecatedLogOutput = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-output") cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index a2688f5aa31..06670033218 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -45,6 +45,7 @@ import ( "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" ) @@ -86,6 +87,8 @@ var ( grpcProxyEnableOrdering bool grpcProxyDebug bool + + maxConcurrentStreams uint32 ) const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024 @@ -146,6 +149,8 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.") + return &cmd } @@ -195,6 +200,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client) + + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + lg.Fatal("Failed to configure the http server", zap.Error(err)) + } + errc := make(chan error) go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() diff --git a/etcdmain/help.go b/etcdmain/help.go index e71ddc609e2..4e0069123b2 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -77,6 +77,8 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. + --max-concurrent-streams 'math.MaxUint32' + Maximum concurrent streams that each client can open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 3332016617d..0cd09d01642 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -31,7 +31,6 @@ import ( const ( grpcOverheadBytes = 512 * 1024 - maxStreams = math.MaxUint32 maxSendBytes = math.MaxInt32 ) @@ -53,7 +52,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOptio ))) opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) - opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) + opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) grpcServer := grpc.NewServer(append(opts, gopts...)...) pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) diff --git a/etcdserver/config.go b/etcdserver/config.go index dff7c885555..70208d2f223 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -119,6 +119,10 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 + WarningApplyDuration time.Duration StrictReconfigCheck bool diff --git a/pkg/flags/uint32.go b/pkg/flags/uint32.go new file mode 100644 index 00000000000..496730a4549 --- /dev/null +++ b/pkg/flags/uint32.go @@ -0,0 +1,45 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "strconv" +) + +type uint32Value uint32 + +// NewUint32Value creates an uint32 instance with the provided value. +func NewUint32Value(v uint32) *uint32Value { + val := new(uint32Value) + *val = uint32Value(v) + return val +} + +// Set parses a command line uint32 value. +// Implements "flag.Value" interface. +func (i *uint32Value) Set(s string) error { + v, err := strconv.ParseUint(s, 0, 32) + *i = uint32Value(v) + return err +} + +func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) } + +// Uint32FromFlag return the uint32 value of a flag with the given name +func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 { + val := *fs.Lookup(name).Value.(*uint32Value) + return uint32(val) +} diff --git a/pkg/flags/uint32_test.go b/pkg/flags/uint32_test.go new file mode 100644 index 00000000000..aa7487a2320 --- /dev/null +++ b/pkg/flags/uint32_test.go @@ -0,0 +1,111 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUint32Value(t *testing.T) { + cases := []struct { + name string + s string + expectedVal uint32 + expectError bool + }{ + { + name: "normal uint32 value", + s: "200", + expectedVal: 200, + }, + { + name: "zero value", + s: "0", + expectedVal: 0, + }, + { + name: "negative int value", + s: "-200", + expectError: true, + }, + { + name: "invalid integer value", + s: "invalid", + expectError: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var val uint32Value + err := val.Set(tc.s) + + if tc.expectError { + if err == nil { + t.Errorf("Expected failure on parsing uint32 value from %s", tc.s) + } + } else { + if err != nil { + t.Errorf("Unexpected error when parsing %s: %v", tc.s, err) + } + assert.Equal(t, uint32(val), tc.expectedVal) + } + }) + } +} + +func TestUint32FromFlag(t *testing.T) { + const flagName = "max-concurrent-streams" + + cases := []struct { + name string + defaultVal uint32 + arguments []string + expectedVal uint32 + }{ + { + name: "only default value", + defaultVal: 15, + arguments: []string{}, + expectedVal: 15, + }, + { + name: "argument has different value from the default one", + defaultVal: 16, + arguments: []string{"--max-concurrent-streams", "200"}, + expectedVal: 200, + }, + { + name: "argument has the same value from the default one", + defaultVal: 105, + arguments: []string{"--max-concurrent-streams", "105"}, + expectedVal: 105, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fs := flag.NewFlagSet("etcd", flag.ContinueOnError) + fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.") + if err := fs.Parse(tc.arguments); err != nil { + t.Fatalf("Unexpected error: %v\n", err) + } + actualMaxStream := Uint32FromFlag(fs, flagName) + assert.Equal(t, actualMaxStream, tc.expectedVal) + }) + } +} diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 29c32e4370c..87bc376f16a 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -134,6 +134,8 @@ type etcdProcessClusterConfig struct { enableV2 bool initialCorruptCheck bool authTokenOpts string + + MaxConcurrentStreams uint32 // default is math.MaxUint32 } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -263,6 +265,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro args = append(args, "--auth-token", cfg.authTokenOpts) } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) + } + etcdCfgs[i] = &etcdServerProcessConfig{ execPath: cfg.execPath, args: args, diff --git a/tests/e2e/ctl_v3_auth_no_proxy_test.go b/tests/e2e/ctl_v3_auth_no_proxy_test.go index 06bcc2fd0de..e6d53e2f3dc 100644 --- a/tests/e2e/ctl_v3_auth_no_proxy_test.go +++ b/tests/e2e/ctl_v3_auth_no_proxy_test.go @@ -23,11 +23,11 @@ import ( ) func TestCtlV3AuthCertCN(t *testing.T) { - testCtl(t, authTestCertCN, withCfg(*newConfigClientTLSCertAuth())) + testCtl(t, authTestCertCN, withCfg(configClientTLSCertAuth)) } func TestCtlV3AuthCertCNAndUsername(t *testing.T) { - testCtl(t, authTestCertCNAndUsername, withCfg(*newConfigClientTLSCertAuth())) + testCtl(t, authTestCertCNAndUsername, withCfg(configClientTLSCertAuth)) } func TestCtlV3AuthCertCNAndUsernameNoPassword(t *testing.T) { - testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(*newConfigClientTLSCertAuth())) + testCtl(t, authTestCertCNAndUsernameNoPassword, withCfg(configClientTLSCertAuth)) } diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 04f5a6570bf..2800794840a 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -64,6 +64,7 @@ type ctlCtx struct { envMap map[string]struct{} dialTimeout time.Duration + testTimeout time.Duration quorum bool // if true, set up 3-node cluster and linearizable read interactive bool @@ -94,6 +95,10 @@ func withDialTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.dialTimeout = timeout } } +func withTestTimeout(timeout time.Duration) ctlOption { + return func(cx *ctlCtx) { cx.testTimeout = timeout } +} + func withQuorum() ctlOption { return func(cx *ctlCtx) { cx.quorum = true } } @@ -130,14 +135,26 @@ func withFlagByEnv() ctlOption { return func(cx *ctlCtx) { cx.envMap = make(map[string]struct{}) } } -func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { - defer testutil.AfterTest(t) +// This function must be called after the `withCfg`, otherwise its value +// may be overwritten by `withCfg`. +func withMaxConcurrentStreams(streams uint32) ctlOption { + return func(cx *ctlCtx) { + cx.cfg.MaxConcurrentStreams = streams + } +} - ret := ctlCtx{ +func getDefaultCtlCtx(t *testing.T) ctlCtx { + return ctlCtx{ t: t, cfg: configAutoTLS, dialTimeout: 7 * time.Second, } +} + +func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { + defer testutil.AfterTest(t) + + ret := getDefaultCtlCtx(t) ret.applyOpts(opts) mustEtcdctl(t) @@ -175,10 +192,8 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testFunc(ret) }() - timeout := 2*ret.dialTimeout + time.Second - if ret.dialTimeout == 0 { - timeout = 30 * time.Second - } + timeout := ret.getTestTimeout() + select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) @@ -186,6 +201,17 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { } } +func (cx *ctlCtx) getTestTimeout() time.Duration { + timeout := cx.testTimeout + if timeout == 0 { + timeout = 2*cx.dialTimeout + time.Second + if cx.dialTimeout == 0 { + timeout = 30 * time.Second + } + } + return timeout +} + func (cx *ctlCtx) prefixArgs(eps []string) []string { fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go new file mode 100644 index 00000000000..f1a7d0c1fda --- /dev/null +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -0,0 +1,211 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/pkg/testutil" +) + +// NO TLS +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(configNoTLS), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(configNoTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +/* +// There are lots of "device not configured" errors. I suspect it's an issue +// of the project `github.com/creack/pty`. I manually executed the test case +// with 1000 concurrent streams, and confirmed it's working as expected. +// TODO(ahrtr): investigate the test failure in the future. +func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { + f, err := setRLimit(10240) + if err != nil { + t.Fatal(err) + } + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) + f() +} */ + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(configNoTLS), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(configNoTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +// TLS +func TestV3Curl_MaxStreams_BelowLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(configTLS), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(configTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(configTLS), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(configTLS), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { + // Step 1: generate configuration for creating cluster + t.Log("Generating configuration for creating cluster.") + cx := getDefaultCtlCtx(t) + cx.applyOpts(opts) + // We must set the `ClusterSize` to 1, otherwise different streams may + // connect to different members, accordingly it's difficult to test the + // behavior. + cx.cfg.clusterSize = 1 + + // Step 2: create the cluster + t.Log("Creating an etcd cluster") + epc, err := newEtcdProcessCluster(&cx.cfg) + if err != nil { + t.Fatalf("Failed to start etcd cluster: %v", err) + } + cx.epc = epc + + // Step 3: run test + // (a) generate ${concurrentNumber} concurrent watch streams; + // (b) submit a range request. + var wg sync.WaitGroup + concurrentNumber := cx.cfg.MaxConcurrentStreams - 1 + expectedResponse := `"revision":"` + if reachLimit { + concurrentNumber = cx.cfg.MaxConcurrentStreams + expectedResponse = "Operation timed out" + } + wg.Add(int(concurrentNumber)) + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) + errCh := make(chan error, concurrentNumber) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + submitRangeAfterConcurrentWatch(cx, expectedResponse) + + // Step 4: check the watch errors. Note that we only check the watch error + // before closing cluster. Once we close the cluster, the watch must run + // into error, and we should ignore them by then. + t.Log("Checking watch error.") + select { + case werr := <-errCh: + t.Fatal(werr) + default: + } + + // Step 5: Close the cluster + t.Log("Closing test cluster...") + assert.NoError(t, epc.Close()) + t.Log("Closed test cluster") + + // Step 6: Waiting all watch goroutines to exit. + donec := make(chan struct{}) + go func() { + defer close(donec) + wg.Wait() + }() + + timeout := cx.getTestTimeout() + t.Logf("Waiting test case to finish, timeout: %s", timeout) + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + t.Log("All watch goroutines exited.") + } + + t.Log("testV3CurlMaxStream done!") +} + +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { + wreq, err := json.Marshal(&pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}) + if err != nil { + cx.t.Fatal(err) + } + wstr := `{"create_request" : ` + string(wreq) + "}" + + var wgSchedule sync.WaitGroup + wgSchedule.Add(number) + for i := 0; i < number; i++ { + go func(i int) { + wgSchedule.Done() + defer wgDone.Done() + if err := cURLPost(cx.epc, cURLReq{endpoint: "/v3/watch", value: wstr, expected: `"revision":"`}); err != nil { + werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) + cx.t.Error(werr) + errCh <- werr + } + }(i) + } + // make sure all goroutines have already been scheduled. + wgSchedule.Wait() + // We need to make sure all watch streams have already been created. + // For simplicity, we just sleep 3 second. We may consider improving + // it in the future. + time.Sleep(3 * time.Second) +} + +func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { + rangeData, err := json.Marshal(&pb.RangeRequest{ + Key: []byte("foo"), + }) + if err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("Submitting range request...") + if err := cURLPost(cx.epc, cURLReq{endpoint: "/v3/kv/range", value: string(rangeData), expected: expectedValue, timeout: 5}); err != nil { + cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err) + } + cx.t.Log("range request done") +} + +// setRLimit sets the open file limitation, and return a function which +// is used to reset the limitation. +func setRLimit(nofile uint64) (func() error, error) { + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return nil, fmt.Errorf("failed to get open file limit, error: %v", err) + } + + var wLimit syscall.Rlimit + wLimit.Max = nofile + wLimit.Cur = nofile + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &wLimit); err != nil { + return nil, fmt.Errorf("failed to set max open file limit, %v", err) + } + + return func() error { + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return fmt.Errorf("failed reset max open file limit, %v", err) + } + return nil + }, nil +}