diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 071deb99aa9..1a36e71f5d2 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -147,6 +147,7 @@ const ( readyToCompleteHint = "ready_to_complete" databasePoolSize = 3 vreplicationCutOverThreshold = 5 * time.Second + grpcTimeout = 30 * time.Second vreplicationTestSuiteWaitSeconds = 5 ) @@ -721,9 +722,6 @@ func (e *Executor) primaryPosition(ctx context.Context) (pos mysql.Position, err // terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) error { - tmClient := e.tabletManagerClient() - defer tmClient.Close() - tablet, err := e.ts.GetTablet(ctx, e.tabletAlias) if err != nil { return err @@ -870,11 +868,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries) if !bufferQueries { + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() // called after new table is in place. // unbuffer existing queries: bufferingContextCancel() // force re-read of tables - if err := tmClient.RefreshState(ctx, tablet.Tablet); err != nil { + if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil { return err } } @@ -3799,7 +3799,10 @@ func (e *Executor) vreplicationExec(ctx context.Context, tablet *topodatapb.Tabl tmClient := e.tabletManagerClient() defer tmClient.Close() - return tmClient.VReplicationExec(ctx, tablet, query) + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.VReplicationExec(grpcCtx, tablet, query) } // reloadSchema issues a ReloadSchema on this tablet @@ -3811,7 +3814,11 @@ func (e *Executor) reloadSchema(ctx context.Context) error { if err != nil { return err } - return tmClient.ReloadSchema(ctx, tablet.Tablet, "") + + grpcCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + defer cancel() + + return tmClient.ReloadSchema(grpcCtx, tablet.Tablet, "") } // deleteVReplicationEntry cleans up a _vt.vreplication entry; this function is called as part of