Skip to content

Commit

Permalink
[Release 16.0]: Online DDL: timeouts for all gRPC calls (#14182) (#14191
Browse files Browse the repository at this point in the history
)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Oct 11, 2023
1 parent 95c3a5d commit 745ed3f
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ const (
readyToCompleteHint = "ready_to_complete"
databasePoolSize = 3
vreplicationCutOverThreshold = 5 * time.Second
grpcTimeout = 30 * time.Second
vreplicationTestSuiteWaitSeconds = 5
)

Expand Down Expand Up @@ -721,9 +722,6 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos mysql.Position, err

// terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error {
tmClient := e.tabletManagerClient()
defer tmClient.Close()

tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
Expand Down Expand Up @@ -870,11 +868,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries)
if !bufferQueries {
grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
defer cancel()
// called after new table is in place.
// unbuffer existing queries:
bufferingContextCancel()
// force re-read of tables
if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil {
if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil {
return err
}
}
Expand Down Expand Up @@ -3799,7 +3799,10 @@ func (e *Executor) vreplicationExec(ctx context.Context, tablet *topodatapb.Tabl
tmClient := e.tabletManagerClient()
defer tmClient.Close()

return tmClient.VReplicationExec(ctx, tablet, query)
grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
defer cancel()

return tmClient.VReplicationExec(grpcCtx, tablet, query)
}

// reloadSchema issues a ReloadSchema on this tablet
Expand All @@ -3811,7 +3814,11 @@ func (e *Executor) reloadSchema(ctx context.Context) error {
if err != nil {
return err
}
return tmClient.ReloadSchema(ctx, tablet.Tablet, "")

grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
defer cancel()

return tmClient.ReloadSchema(grpcCtx, tablet.Tablet, "")
}

// deleteVReplicationEntry cleans up a _vt.vreplication entry; this function is called as part of
Expand Down

0 comments on commit 745ed3f

Please sign in to comment.