Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit concurrent creation of healthcheck gRPC connections #15053

Merged
merged 7 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Flags:
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--health_check_interval duration Interval between health checks (default 20s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
--heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Flags:
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
-h, --help help for vtctld
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Flags:
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
-h, --help help for vtgate
Expand Down
10 changes: 9 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"github.com/google/safehtml/template"
"github.com/google/safehtml/template/uncheckedconversions"
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -87,6 +88,9 @@
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand Down Expand Up @@ -168,6 +172,7 @@
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")

Check warning on line 175 in go/vt/discovery/healthcheck.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/healthcheck.go#L175

Added line #L175 was not covered by tests
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -287,6 +292,8 @@
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewHealthCheck creates a new HealthCheck object.
Expand Down Expand Up @@ -321,6 +328,7 @@
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -850,7 +858,7 @@
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(), nil
return thc.Connection(hc), nil

Check warning on line 861 in go/vt/discovery/healthcheck.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/healthcheck.go#L861

Added line #L861 was not covered by tests
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
32 changes: 26 additions & 6 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
Expand All @@ -33,6 +34,7 @@
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -122,8 +124,8 @@
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection()
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(hc)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -136,14 +138,32 @@
return err
}

func (thc *tabletHealthCheck) Connection() queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked()
return thc.connectionLocked(hc)
}

func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService {
func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If other parts of the code could benefit from a similar limit, perhaps this should be moved to a helper/util library or baked into grpcclient? 🤔 cc @deepthi (and anyone else) for thoughts

return func(ctx context.Context, addr string) (net.Conn, error) {

Check warning on line 148 in go/vt/discovery/tablet_health_check.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/tablet_health_check.go#L147-L148

Added lines #L147 - L148 were not covered by tests
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err

Check warning on line 154 in go/vt/discovery/tablet_health_check.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/tablet_health_check.go#L153-L154

Added lines #L153 - L154 were not covered by tests
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)

Check warning on line 158 in go/vt/discovery/tablet_health_check.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/tablet_health_check.go#L156-L158

Added lines #L156 - L158 were not covered by tests
}
}

func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService {
if thc.Conn == nil {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})

Check warning on line 166 in go/vt/discovery/tablet_health_check.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/tablet_health_check.go#L165-L166

Added lines #L165 - L166 were not covered by tests
conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -272,7 +292,7 @@
}()

// Read stream health responses.
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
6 changes: 6 additions & 0 deletions go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpcclient
import (
"context"
"crypto/tls"
"sync"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -39,6 +40,7 @@ import (
)

var (
grpcDialOptionsMu sync.Mutex
keepaliveTime = 10 * time.Second
keepaliveTimeout = 10 * time.Second
initialConnWindowSize int
Expand Down Expand Up @@ -86,6 +88,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error)

// RegisterGRPCDialOptions registers an implementation of AuthServer.
func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) {
grpcDialOptionsMu.Lock()
defer grpcDialOptionsMu.Unlock()
grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc)
}

Expand Down Expand Up @@ -134,12 +138,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ...

newopts = append(newopts, opts...)
var err error
grpcDialOptionsMu.Lock()
for _, grpcDialOptionInitializer := range grpcDialOptions {
newopts, err = grpcDialOptionInitializer(newopts)
if err != nil {
log.Fatalf("There was an error initializing client grpc.DialOption: %v", err)
}
}
grpcDialOptionsMu.Unlock()

newopts = append(newopts, interceptors()...)

Expand Down
Loading