From 43b72d23838aa79292d4a244a58e89046d9ee3b8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 5 Oct 2023 16:23:18 +0200 Subject: [PATCH] Address review comments. Add Show/Progress commands to Migrate Signed-off-by: Rohit Nayak --- .../command/vreplication/migrate/migrate.go | 9 +- .../command/vreplication/mount/mount.go | 110 ++++++++---------- go/test/endtoend/vreplication/migrate_test.go | 14 +++ go/vt/vtctl/grpcvtctldserver/server.go | 12 +- go/vt/vtctl/workflow/mount.go | 8 +- proto/vtctlservice.proto | 8 +- 6 files changed, 86 insertions(+), 75 deletions(-) diff --git a/go/cmd/vtctldclient/command/vreplication/migrate/migrate.go b/go/cmd/vtctldclient/command/vreplication/migrate/migrate.go index dbf2ff0603f..c7b491ecc2d 100644 --- a/go/cmd/vtctldclient/command/vreplication/migrate/migrate.go +++ b/go/cmd/vtctldclient/command/vreplication/migrate/migrate.go @@ -33,7 +33,7 @@ import ( var ( // migrate is the base command for all actions related to the migrate command. migrate = &cobra.Command{ - Use: "Migrate [command] [command-flags]", + Use: "Migrate --workflow --keyspace [command] [command-flags]", Short: "Migrate is used to import data from an external cluster into the current cluster.", DisableFlagsInUseLine: true, Aliases: []string{"migrate"}, @@ -112,7 +112,6 @@ func commandCreate(cmd *cobra.Command, args []string) error { func addCreateFlags(cmd *cobra.Command) { cmd.PersistentFlags().StringVar(&createOptions.SourceKeyspace, "source-keyspace", "", "Keyspace where the tables are being moved from.") - cmd.MarkPersistentFlagRequired("source-keyspace") cmd.Flags().StringVar(&createOptions.SourceTimeZone, "source-time-zone", "", "Specifying this causes any DATETIME fields to be converted from the given time zone into UTC.") cmd.Flags().BoolVar(&createOptions.AllTables, "all-tables", false, "Copy all tables from the source.") cmd.Flags().StringSliceVar(&createOptions.IncludeTables, "tables", nil, "Source tables to copy.") @@ -130,7 +129,7 @@ func addCreateFlags(cmd *cobra.Command) { cmd.Flags().BoolVar(&createOptions.StopAfterCopy, "stop-after-copy", false, "Stop the MoveTables workflow after it's finished copying the existing rows and before it starts replicating changes.") } -func registerMigrateCommands(root *cobra.Command) { +func registerCommands(root *cobra.Command) { common.AddCommonFlags(migrate) root.AddCommand(migrate) addCreateFlags(createCommand) @@ -141,8 +140,10 @@ func registerMigrateCommands(root *cobra.Command) { } migrate.AddCommand(common.GetCompleteCommand(opts)) migrate.AddCommand(common.GetCancelCommand(opts)) + migrate.AddCommand(common.GetShowCommand(opts)) + migrate.AddCommand(common.GetStatusCommand(opts)) } func init() { - common.RegisterCommandHandler("Migrate", registerMigrateCommands) + common.RegisterCommandHandler("Migrate", registerCommands) } diff --git a/go/cmd/vtctldclient/command/vreplication/mount/mount.go b/go/cmd/vtctldclient/command/vreplication/mount/mount.go index 4c06a47e2ec..bb57f713d9a 100644 --- a/go/cmd/vtctldclient/command/vreplication/mount/mount.go +++ b/go/cmd/vtctldclient/command/vreplication/mount/mount.go @@ -31,39 +31,36 @@ var ( // mount is the base command for all actions related to the mount action. mount = &cobra.Command{ Use: "Mount [command] [command-flags]", - Short: "Mount is used to link external Vitess clusters to the current cluster to migrate data into.", + Short: "Mount is used to link an external Vitess cluster in order to migrate data from it.", DisableFlagsInUseLine: true, Aliases: []string{"mount"}, Args: cobra.ExactArgs(1), } ) -var MountOptions struct { +var mountOptions struct { TopoType string TopoServer string TopoRoot string } -func GetRegisterCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "register", - Short: "Register a mount", - Example: `vtctldclient --server localhost:15999 Mount Register --topo-type etcd2 --topo-server localhost:12379 --topo-root /vitess/global ext1`, - DisableFlagsInUseLine: true, - Aliases: []string{"Register"}, - Args: cobra.ExactArgs(1), - RunE: commandRegister, - } - return cmd +var register = &cobra.Command{ + Use: "register", + Short: "Register a mount", + Example: `vtctldclient --server localhost:15999 Mount Register --topo-type etcd2 --topo-server localhost:12379 --topo-root /vitess/global ext1`, + DisableFlagsInUseLine: true, + Aliases: []string{"Register"}, + Args: cobra.ExactArgs(1), + RunE: commandRegister, } func commandRegister(cmd *cobra.Command, args []string) error { cli.FinishedParsing(cmd) req := &vtctldatapb.MountRegisterRequest{ - TopoType: MountOptions.TopoType, - TopoServer: MountOptions.TopoServer, - TopoRoot: MountOptions.TopoRoot, + TopoType: mountOptions.TopoType, + TopoServer: mountOptions.TopoServer, + TopoRoot: mountOptions.TopoRoot, Name: cmd.Flags().Arg(0), } _, err := common.GetClient().MountRegister(common.GetCommandCtx(), req) @@ -74,17 +71,14 @@ func commandRegister(cmd *cobra.Command, args []string) error { return nil } -func GetUnregisterCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "unregister", - Short: "Unregister a mount", - Example: `vtctldclient --server localhost:15999 Mount Unregister ext1`, - DisableFlagsInUseLine: true, - Aliases: []string{"Unregister"}, - Args: cobra.ExactArgs(1), - RunE: commandUnregister, - } - return cmd +var unregister = &cobra.Command{ + Use: "unregister", + Short: "Unregister a mount", + Example: `vtctldclient --server localhost:15999 Mount Unregister ext1`, + DisableFlagsInUseLine: true, + Aliases: []string{"Unregister"}, + Args: cobra.ExactArgs(1), + RunE: commandUnregister, } func commandUnregister(cmd *cobra.Command, args []string) error { @@ -101,17 +95,14 @@ func commandUnregister(cmd *cobra.Command, args []string) error { return nil } -func GetShowCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "show", - Short: "Show attributes of a mount", - Example: `vtctldclient --server localhost:15999 Mount Show ext1`, - DisableFlagsInUseLine: true, - Aliases: []string{"Show"}, - Args: cobra.ExactArgs(1), - RunE: commandShow, - } - return cmd +var show = &cobra.Command{ + Use: "show", + Short: "Show attributes of a mount", + Example: `vtctldclient --server localhost:15999 Mount Show ext1`, + DisableFlagsInUseLine: true, + Aliases: []string{"Show"}, + Args: cobra.ExactArgs(1), + RunE: commandShow, } func commandShow(cmd *cobra.Command, args []string) error { @@ -132,17 +123,14 @@ func commandShow(cmd *cobra.Command, args []string) error { return nil } -func GetListCommand() *cobra.Command { - cmd := &cobra.Command{ - Use: "list", - Short: "List all mounts", - Example: `vtctldclient --server localhost:15999 Mount List`, - DisableFlagsInUseLine: true, - Aliases: []string{"List"}, - Args: cobra.NoArgs, - RunE: commandList, - } - return cmd +var list = &cobra.Command{ + Use: "list", + Short: "List all mounts", + Example: `vtctldclient --server localhost:15999 Mount List`, + DisableFlagsInUseLine: true, + Aliases: []string{"List"}, + Args: cobra.NoArgs, + RunE: commandList, } func commandList(cmd *cobra.Command, args []string) error { @@ -164,22 +152,24 @@ func commandList(cmd *cobra.Command, args []string) error { return nil } -func registerMountCommands(root *cobra.Command) { +func registerCommands(root *cobra.Command) { root.AddCommand(mount) - registerCommand := GetRegisterCommand() - addRegisterFlags(registerCommand) - mount.AddCommand(registerCommand) - mount.AddCommand(GetUnregisterCommand()) - mount.AddCommand(GetShowCommand()) - mount.AddCommand(GetListCommand()) + addRegisterFlags(register) + mount.AddCommand(register) + mount.AddCommand(unregister) + mount.AddCommand(show) + mount.AddCommand(list) } func addRegisterFlags(cmd *cobra.Command) { - cmd.Flags().StringVar(&MountOptions.TopoType, "topo-type", "", "Topo server implementation to use") - cmd.Flags().StringVar(&MountOptions.TopoServer, "topo-server", "", "Topo server address") - cmd.Flags().StringVar(&MountOptions.TopoRoot, "topo-root", "", "Topo server root path") + cmd.Flags().StringVar(&mountOptions.TopoType, "topo-type", "", "Topo server implementation to use") + cmd.Flags().StringVar(&mountOptions.TopoServer, "topo-server", "", "Topo server address") + cmd.Flags().StringVar(&mountOptions.TopoRoot, "topo-root", "", "Topo server root path") + for _, flag := range []string{"topo-type", "topo-server", "topo-root"} { + cmd.MarkFlagRequired(flag) + } } func init() { - common.RegisterCommandHandler("Mount", registerMountCommands) + common.RegisterCommandHandler("Mount", registerCommands) } diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 6233be6cded..f235d293cf3 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -258,6 +258,20 @@ func TestVtctldMigrate(t *testing.T) { waitForRowCount(t, vtgateConn, "product:0", "review", 4) vdiffSideBySide(t, ksWorkflow, "extcell1") + if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", + "--target-keyspace", "product", "--workflow", "e1", "Show"); err != nil { + t.Fatalf("Migrate command failed with %+v : %s\n", err, output) + } + wf := gjson.Get(output, "workflows").Array()[0] + require.Equal(t, "e1", wf.Get("name").String()) + require.Equal(t, "Migrate", wf.Get("workflow_type").String()) + + if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", + "--target-keyspace", "product", "--workflow", "e1", "Progress"); err != nil { + t.Fatalf("Migrate command failed with %+v : %s\n", err, output) + } + require.Equal(t, "Running", gjson.Get(output, "shard_streams.product/0.streams.0.status").String()) + if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", "--target-keyspace", "product", "--workflow", "e1", "Complete"); err != nil { t.Fatalf("Migrate command failed with %+v : %s\n", err, output) diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 1b5de713c4a..6a8e69a2a47 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -2566,10 +2566,10 @@ func (s *VtctldServer) MountRegister(ctx context.Context, req *vtctldatapb.Mount defer panicHandler(&err) - span.Annotate("topoType", req.TopoType) - span.Annotate("topoServer", req.TopoServer) - span.Annotate("topoRoot", req.TopoRoot) - span.Annotate("mountName", req.Name) + span.Annotate("topo_type", req.TopoType) + span.Annotate("topo_server", req.TopoServer) + span.Annotate("topo_root", req.TopoRoot) + span.Annotate("mount_name", req.Name) resp, err = s.ws.MountRegister(ctx, req) return resp, err @@ -2582,7 +2582,7 @@ func (s *VtctldServer) MountUnregister(ctx context.Context, req *vtctldatapb.Mou defer panicHandler(&err) - span.Annotate("mountName", req.Name) + span.Annotate("mount_name", req.Name) resp, err = s.ws.MountUnregister(ctx, req) return resp, err @@ -2606,7 +2606,7 @@ func (s *VtctldServer) MountShow(ctx context.Context, req *vtctldatapb.MountShow defer panicHandler(&err) - span.Annotate("mountName", req.Name) + span.Annotate("mount_name", req.Name) resp, err = s.ws.MountShow(ctx, req) return resp, err diff --git a/go/vt/vtctl/workflow/mount.go b/go/vt/vtctl/workflow/mount.go index b0df74bc329..ce287f549b8 100644 --- a/go/vt/vtctl/workflow/mount.go +++ b/go/vt/vtctl/workflow/mount.go @@ -33,7 +33,7 @@ func notExistsError(name string) error { func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegisterRequest) (*vtctldatapb.MountRegisterResponse, error) { vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name) if err != nil { - return &vtctldatapb.MountRegisterResponse{}, err + return &vtctldatapb.MountRegisterResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountRegister") } if vci != nil { return &vtctldatapb.MountRegisterResponse{}, notExistsError(req.Name) @@ -51,7 +51,7 @@ func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegist func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnregisterRequest) (*vtctldatapb.MountUnregisterResponse, error) { vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name) if err != nil { - return &vtctldatapb.MountUnregisterResponse{}, err + return &vtctldatapb.MountUnregisterResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountUnregister") } if vci == nil { return &vtctldatapb.MountUnregisterResponse{}, notExistsError(req.Name) @@ -62,7 +62,7 @@ func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnre func (s *Server) MountList(ctx context.Context, req *vtctldatapb.MountListRequest) (*vtctldatapb.MountListResponse, error) { vciList, err := s.ts.GetExternalVitessClusters(ctx) if err != nil { - return &vtctldatapb.MountListResponse{}, err + return &vtctldatapb.MountListResponse{}, vterrors.Wrap(err, "failed to get external vitess clusters in MountList") } return &vtctldatapb.MountListResponse{Names: vciList}, nil } @@ -70,7 +70,7 @@ func (s *Server) MountList(ctx context.Context, req *vtctldatapb.MountListReques func (s *Server) MountShow(ctx context.Context, req *vtctldatapb.MountShowRequest) (*vtctldatapb.MountShowResponse, error) { vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name) if err != nil { - return &vtctldatapb.MountShowResponse{}, err + return &vtctldatapb.MountShowResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountShow") } if vci == nil { return &vtctldatapb.MountShowResponse{}, notExistsError(req.Name) diff --git a/proto/vtctlservice.proto b/proto/vtctlservice.proto index ad687f5cc48..16a96453ca9 100644 --- a/proto/vtctlservice.proto +++ b/proto/vtctlservice.proto @@ -174,11 +174,17 @@ service Vtctld { rpc LookupVindexCreate(vtctldata.LookupVindexCreateRequest) returns (vtctldata.LookupVindexCreateResponse) {}; rpc LookupVindexExternalize(vtctldata.LookupVindexExternalizeRequest) returns (vtctldata.LookupVindexExternalizeResponse) {}; + // MigrateCreate creates a workflow which migrates one or more tables from an + // external cluster into Vitess. rpc MigrateCreate(vtctldata.MigrateCreateRequest) returns (vtctldata.WorkflowStatusResponse) {}; - + + // MountRegister registers a new external Vitess cluster. rpc MountRegister(vtctldata.MountRegisterRequest) returns (vtctldata.MountRegisterResponse) {}; + // MountUnregister unregisters an external Vitess cluster. rpc MountUnregister(vtctldata.MountUnregisterRequest) returns (vtctldata.MountUnregisterResponse) {}; + // MountShow returns information about an external Vitess cluster. rpc MountShow(vtctldata.MountShowRequest) returns (vtctldata.MountShowResponse) {}; + // MountList lists all registered external Vitess clusters. rpc MountList(vtctldata.MountListRequest) returns (vtctldata.MountListResponse) {}; // MoveTablesCreate creates a workflow which moves one or more tables from a