Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttler: Use tmclient pool for CheckThrottler tabletmanager RPC #14979

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Flags:
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Flags:
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Flags:
--tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost")
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
45 changes: 31 additions & 14 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
)

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App})")
fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler)")
fs.StringVar(&cert, "tablet_manager_grpc_cert", cert, "the cert to use to connect")
fs.StringVar(&key, "tablet_manager_grpc_key", key, "the key to use to connect")
fs.StringVar(&ca, "tablet_manager_grpc_ca", ca, "the server ca to use to validate servers when connecting")
Expand Down Expand Up @@ -94,10 +94,9 @@ type tmc struct {

// grpcClient implements both dialer and poolDialer.
type grpcClient struct {
// This cache of connections is to maximize QPS for ExecuteFetch.
// Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are
// one-purpose only.
// This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App} and
// CheckThrottler. Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are one-purpose only.
// The map is protected by the mutex.
mu sync.Mutex
rpcClientMap map[string]chan *tmc
Expand All @@ -115,16 +114,17 @@ type poolDialer interface {
// Client implements tmclient.TabletManagerClient.
//
// Connections are produced by the dialer implementation, which is either the
// grpcClient implementation, which reuses connections only for ExecuteFetch and
// otherwise makes single-purpose connections that are closed after use.
// grpcClient implementation, which reuses connections only for ExecuteFetchAs{Dba,App}
// and CheckThrottler, otherwise making single-purpose connections that are closed
// after use.
//
// In order to more efficiently use the underlying tcp connections, you can
// instead use the cachedConnDialer implementation by specifying
//
// -tablet_manager_protocol "grpc-cached"
// --tablet_manager_protocol "grpc-cached"
//
// The cachedConnDialer keeps connections to up to -tablet_manager_grpc_connpool_size distinct
// tablets open at any given time, for faster per-RPC call time, and less
// The cachedConnDialer keeps connections to up to --tablet_manager_grpc_connpool_size
// distinct tablets open at any given time, for faster per-RPC call time, and less
// connection churn.
type Client struct {
dialer dialer
Expand Down Expand Up @@ -1002,12 +1002,29 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req
}

// CheckThrottler is part of the tmclient.TabletManagerClient interface.
// It always tries to use a cached client via the dialer pool as this is
// called very frequently between tablets when the throttler is enabled in
// a keyspace and the overhead of creating a new gRPC connection/channel
// and dialing the other tablet every time is not practical.
func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
var c tabletmanagerservicepb.TabletManagerClient
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
if err != nil {
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the same pattern is used in ExecuteFetchAsDba and in ExecuteFetchAsApp - so I'm assuming this is correct -- but still unsure -- I'm not sure I understand how this gets a pooled connections; and I don't see the pattern of "get from pool ; defer return to pool", so this does not read like a way of using a cached connection. Maybe ExecuteFetchAsApp's implementation is likewise incorrect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the implementation within this file:

func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
if err != nil {
return nil, err
}
client.mu.Lock()
if client.rpcClientMap == nil {
client.rpcClientMap = make(map[string]chan *tmc)
}
c, ok := client.rpcClientMap[addr]
if !ok {
c = make(chan *tmc, concurrency)
client.rpcClientMap[addr] = c
client.mu.Unlock()
for i := 0; i < cap(c); i++ {
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, err
}
c <- &tmc{
cc: cc,
client: tabletmanagerservicepb.NewTabletManagerClient(cc),
}
}
} else {
client.mu.Unlock()
}
result := <-c
c <- result
return result.client, nil
}
// Close is part of the tmclient.TabletManagerClient interface.
func (client *grpcClient) Close() {
client.mu.Lock()
defer client.mu.Unlock()
for _, c := range client.rpcClientMap {
close(c)
for ch := range c {
ch.cc.Close()
}
}
client.rpcClientMap = nil
}

Copy link
Contributor Author

@mattlord mattlord Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vs dial that was previously being used which creates a new tabletmanagerclient with a new gRPC connection each time:

// dial returns a client to use
func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) {
addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"]))
opt, err := grpcclient.SecureDialOption(cert, key, ca, crl, name)
if err != nil {
return nil, nil, err
}
cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt)
if err != nil {
return nil, nil, err
}
return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil
}

}
defer closer.Close()

if c == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the cases where we don’t have a pooled dialer here?

Copy link
Contributor Author

@mattlord mattlord Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that we don't have a pooled dialer, it's that dialPool returns a nil TabletManagerClient.client. I would not expect it, from reading the function, but it's a fail-safe to prevent RPC failures in unexpected scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord but wouldn’t there have been an error then? Can the pooled dialer return nil but also not return an error?

Copy link
Contributor Author

@mattlord mattlord Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be unexpected, yes. It's a pointer that anyone could set to nil at any time though as it's a shared object. From looking at the code it (TabletManagerClient) could also exist in the map but the member (client) be nil as we don't check for that there.

What's so concerning about the failsafe here? I could add a comment that it should not generally happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s that it seems very unexpected. In general on Go, you don’t nil check the return value if you have an error to indicate if it went right or wrong. If we’d do that everywhere, we would have very noisy code.

So if the fallback can’t happen, we should imho remove it since it’s useless code. It also for example means it can never be properly tested or covered either if it can never happen.

It’s also that the code being like this caused me to ask these questions which I think is a signal in itself too that it’s confusing 😃.

Copy link
Contributor Author

@mattlord mattlord Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajm188 what do you think? I see that this pattern came from this PR: #8368

I can only guess that the idea was we don't want to consider a pooled nil client value as an RPC level error here, but instead create a new dedicated tmclient and gRPC connection when needed for any reason so that the RPC should always succeed even when there's something odd going on with the tmclient pool? I wonder then if we shouldn't at least do this at the end of the dialPool function so that this nil value isn't returned yet again in the future:

@@ -164,7 +164,7 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table
                client.rpcClientMap = make(map[string]chan *tmc)
        }
        c, ok := client.rpcClientMap[addr]
-       if !ok {
+       if !ok || len(c) == 0 {
                c = make(chan *tmc, concurrency)
                client.rpcClientMap[addr] = c
                client.mu.Unlock()
@@ -183,8 +183,12 @@ func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Table
                client.mu.Unlock()
        }

+       // Get the least recently used/created cached client.
        result := <-c
-       c <- result
+       // If it's not nil, place it back at the end of the queue for future re-use.
+       if result != nil {
+               c <- result
+       }
        return result.client, nil
 }

I'm OK doing a minor refactor here, although it's unrelated to the issue this PR is meant to address.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord yeah, I think we can tackle this separately for sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord cached_client and client are two different paradigms (you linked a PR that added the cached client whereas this pr changes client), so i'm not sure i understand your question.

as to the difference, as i understood them at the time of #8368:

  1. client with normal (non-pooled) dialer opens a new gRPC connection for each RPC, and tears it down (along with the TCP overhead, etc.) at the end of each and every RPC
  2. client with a pooled dialer, on first dial to a given host:port, opens N connections to that host, shoves them onto a channel, and cycles through those conns one at a time for each RPC. no connections are closed, ever, until someone calls Close on the TMC, at which point all connections are closed serially
  3. cached_client is slightly different from (2), in that it dials at most 1 connection per host:port, and the most recent N connections stay alive. if an N+1th host gets dialed, we close the oldest connection by use-time, and add the new connection to the cache

hopefully that helps clarify both this thread and @shlomi-noach's above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To summarize further: this PR is not using (as I would describe it) the cached client. It is updating the plain (non-cached) client to use a pool of connections for the CheckThrottler RPC (which I agree is a good idea).

Separately, I'd like us to consider promoting the cached-client implementation to be the default over the non-cached (or, "oneshot") implementation.

Copy link
Contributor Author

@mattlord mattlord Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ajm188 ! For the cached client part, I meant that more generally — the tmclients that are cached in the buffered channel. But I agree that it’s not very precise/clear since we also have the cachedConn implementation in cached_client.go. I updated the wording in the PR to go from “cached client” to “tmclient pool” to be more clear. The linked PR #8368 is where you added this if c == nil {pattern: https://github.com/vitessio/vitess/pull/8368/files#diff-c0aa163f88048bad1cd140049af3372d8f396784b4c93709fa269f82597a42bbR496-R500 which was being discussed/debated here.

var closer io.Closer
c, closer, err = client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
}
defer closer.Close()
}

response, err := c.CheckThrottler(ctx, req)
if err != nil {
return nil, err
Expand Down
Loading