Skip to content

Commit

Permalink
Address review comments. Add Show/Progress commands to Migrate
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Oct 5, 2023
1 parent c5ab335 commit 43b72d2
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 75 deletions.
9 changes: 5 additions & 4 deletions go/cmd/vtctldclient/command/vreplication/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
var (
// migrate is the base command for all actions related to the migrate command.
migrate = &cobra.Command{
Use: "Migrate [command] [command-flags]",
Use: "Migrate --workflow <workflow> --keyspace <keyspace> [command] [command-flags]",
Short: "Migrate is used to import data from an external cluster into the current cluster.",
DisableFlagsInUseLine: true,
Aliases: []string{"migrate"},
Expand Down Expand Up @@ -112,7 +112,6 @@ func commandCreate(cmd *cobra.Command, args []string) error {

func addCreateFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&createOptions.SourceKeyspace, "source-keyspace", "", "Keyspace where the tables are being moved from.")
cmd.MarkPersistentFlagRequired("source-keyspace")
cmd.Flags().StringVar(&createOptions.SourceTimeZone, "source-time-zone", "", "Specifying this causes any DATETIME fields to be converted from the given time zone into UTC.")
cmd.Flags().BoolVar(&createOptions.AllTables, "all-tables", false, "Copy all tables from the source.")
cmd.Flags().StringSliceVar(&createOptions.IncludeTables, "tables", nil, "Source tables to copy.")
Expand All @@ -130,7 +129,7 @@ func addCreateFlags(cmd *cobra.Command) {
cmd.Flags().BoolVar(&createOptions.StopAfterCopy, "stop-after-copy", false, "Stop the MoveTables workflow after it's finished copying the existing rows and before it starts replicating changes.")
}

func registerMigrateCommands(root *cobra.Command) {
func registerCommands(root *cobra.Command) {
common.AddCommonFlags(migrate)
root.AddCommand(migrate)
addCreateFlags(createCommand)
Expand All @@ -141,8 +140,10 @@ func registerMigrateCommands(root *cobra.Command) {
}
migrate.AddCommand(common.GetCompleteCommand(opts))
migrate.AddCommand(common.GetCancelCommand(opts))
migrate.AddCommand(common.GetShowCommand(opts))
migrate.AddCommand(common.GetStatusCommand(opts))
}

func init() {
common.RegisterCommandHandler("Migrate", registerMigrateCommands)
common.RegisterCommandHandler("Migrate", registerCommands)
}
110 changes: 50 additions & 60 deletions go/cmd/vtctldclient/command/vreplication/mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,36 @@ var (
// mount is the base command for all actions related to the mount action.
mount = &cobra.Command{
Use: "Mount [command] [command-flags]",
Short: "Mount is used to link external Vitess clusters to the current cluster to migrate data into.",
Short: "Mount is used to link an external Vitess cluster in order to migrate data from it.",
DisableFlagsInUseLine: true,
Aliases: []string{"mount"},
Args: cobra.ExactArgs(1),
}
)

var MountOptions struct {
var mountOptions struct {
TopoType string
TopoServer string
TopoRoot string
}

func GetRegisterCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "register",
Short: "Register a mount",
Example: `vtctldclient --server localhost:15999 Mount Register --topo-type etcd2 --topo-server localhost:12379 --topo-root /vitess/global ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Register"},
Args: cobra.ExactArgs(1),
RunE: commandRegister,
}
return cmd
var register = &cobra.Command{
Use: "register",
Short: "Register a mount",
Example: `vtctldclient --server localhost:15999 Mount Register --topo-type etcd2 --topo-server localhost:12379 --topo-root /vitess/global ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Register"},
Args: cobra.ExactArgs(1),
RunE: commandRegister,
}

func commandRegister(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.MountRegisterRequest{
TopoType: MountOptions.TopoType,
TopoServer: MountOptions.TopoServer,
TopoRoot: MountOptions.TopoRoot,
TopoType: mountOptions.TopoType,
TopoServer: mountOptions.TopoServer,
TopoRoot: mountOptions.TopoRoot,
Name: cmd.Flags().Arg(0),
}
_, err := common.GetClient().MountRegister(common.GetCommandCtx(), req)
Expand All @@ -74,17 +71,14 @@ func commandRegister(cmd *cobra.Command, args []string) error {
return nil
}

func GetUnregisterCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "unregister",
Short: "Unregister a mount",
Example: `vtctldclient --server localhost:15999 Mount Unregister ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Unregister"},
Args: cobra.ExactArgs(1),
RunE: commandUnregister,
}
return cmd
var unregister = &cobra.Command{
Use: "unregister",
Short: "Unregister a mount",
Example: `vtctldclient --server localhost:15999 Mount Unregister ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Unregister"},
Args: cobra.ExactArgs(1),
RunE: commandUnregister,
}

func commandUnregister(cmd *cobra.Command, args []string) error {
Expand All @@ -101,17 +95,14 @@ func commandUnregister(cmd *cobra.Command, args []string) error {
return nil
}

func GetShowCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "show",
Short: "Show attributes of a mount",
Example: `vtctldclient --server localhost:15999 Mount Show ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Show"},
Args: cobra.ExactArgs(1),
RunE: commandShow,
}
return cmd
var show = &cobra.Command{
Use: "show",
Short: "Show attributes of a mount",
Example: `vtctldclient --server localhost:15999 Mount Show ext1`,
DisableFlagsInUseLine: true,
Aliases: []string{"Show"},
Args: cobra.ExactArgs(1),
RunE: commandShow,
}

func commandShow(cmd *cobra.Command, args []string) error {
Expand All @@ -132,17 +123,14 @@ func commandShow(cmd *cobra.Command, args []string) error {
return nil
}

func GetListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List all mounts",
Example: `vtctldclient --server localhost:15999 Mount List`,
DisableFlagsInUseLine: true,
Aliases: []string{"List"},
Args: cobra.NoArgs,
RunE: commandList,
}
return cmd
var list = &cobra.Command{
Use: "list",
Short: "List all mounts",
Example: `vtctldclient --server localhost:15999 Mount List`,
DisableFlagsInUseLine: true,
Aliases: []string{"List"},
Args: cobra.NoArgs,
RunE: commandList,
}

func commandList(cmd *cobra.Command, args []string) error {
Expand All @@ -164,22 +152,24 @@ func commandList(cmd *cobra.Command, args []string) error {
return nil
}

func registerMountCommands(root *cobra.Command) {
func registerCommands(root *cobra.Command) {
root.AddCommand(mount)
registerCommand := GetRegisterCommand()
addRegisterFlags(registerCommand)
mount.AddCommand(registerCommand)
mount.AddCommand(GetUnregisterCommand())
mount.AddCommand(GetShowCommand())
mount.AddCommand(GetListCommand())
addRegisterFlags(register)
mount.AddCommand(register)
mount.AddCommand(unregister)
mount.AddCommand(show)
mount.AddCommand(list)
}

func addRegisterFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&MountOptions.TopoType, "topo-type", "", "Topo server implementation to use")
cmd.Flags().StringVar(&MountOptions.TopoServer, "topo-server", "", "Topo server address")
cmd.Flags().StringVar(&MountOptions.TopoRoot, "topo-root", "", "Topo server root path")
cmd.Flags().StringVar(&mountOptions.TopoType, "topo-type", "", "Topo server implementation to use")
cmd.Flags().StringVar(&mountOptions.TopoServer, "topo-server", "", "Topo server address")
cmd.Flags().StringVar(&mountOptions.TopoRoot, "topo-root", "", "Topo server root path")
for _, flag := range []string{"topo-type", "topo-server", "topo-root"} {
cmd.MarkFlagRequired(flag)
}
}

func init() {
common.RegisterCommandHandler("Mount", registerMountCommands)
common.RegisterCommandHandler("Mount", registerCommands)
}
14 changes: 14 additions & 0 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ func TestVtctldMigrate(t *testing.T) {
waitForRowCount(t, vtgateConn, "product:0", "review", 4)
vdiffSideBySide(t, ksWorkflow, "extcell1")

if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Show"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
wf := gjson.Get(output, "workflows").Array()[0]
require.Equal(t, "e1", wf.Get("name").String())
require.Equal(t, "Migrate", wf.Get("workflow_type").String())

if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Progress"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
require.Equal(t, "Running", gjson.Get(output, "shard_streams.product/0.streams.0.status").String())

if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Complete"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2566,10 +2566,10 @@ func (s *VtctldServer) MountRegister(ctx context.Context, req *vtctldatapb.Mount

defer panicHandler(&err)

span.Annotate("topoType", req.TopoType)
span.Annotate("topoServer", req.TopoServer)
span.Annotate("topoRoot", req.TopoRoot)
span.Annotate("mountName", req.Name)
span.Annotate("topo_type", req.TopoType)
span.Annotate("topo_server", req.TopoServer)
span.Annotate("topo_root", req.TopoRoot)
span.Annotate("mount_name", req.Name)

resp, err = s.ws.MountRegister(ctx, req)
return resp, err
Expand All @@ -2582,7 +2582,7 @@ func (s *VtctldServer) MountUnregister(ctx context.Context, req *vtctldatapb.Mou

defer panicHandler(&err)

span.Annotate("mountName", req.Name)
span.Annotate("mount_name", req.Name)

resp, err = s.ws.MountUnregister(ctx, req)
return resp, err
Expand All @@ -2606,7 +2606,7 @@ func (s *VtctldServer) MountShow(ctx context.Context, req *vtctldatapb.MountShow

defer panicHandler(&err)

span.Annotate("mountName", req.Name)
span.Annotate("mount_name", req.Name)

resp, err = s.ws.MountShow(ctx, req)
return resp, err
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtctl/workflow/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func notExistsError(name string) error {
func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegisterRequest) (*vtctldatapb.MountRegisterResponse, error) {
vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name)
if err != nil {
return &vtctldatapb.MountRegisterResponse{}, err
return &vtctldatapb.MountRegisterResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountRegister")
}
if vci != nil {
return &vtctldatapb.MountRegisterResponse{}, notExistsError(req.Name)
Expand All @@ -51,7 +51,7 @@ func (s *Server) MountRegister(ctx context.Context, req *vtctldatapb.MountRegist
func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnregisterRequest) (*vtctldatapb.MountUnregisterResponse, error) {
vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name)
if err != nil {
return &vtctldatapb.MountUnregisterResponse{}, err
return &vtctldatapb.MountUnregisterResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountUnregister")
}
if vci == nil {
return &vtctldatapb.MountUnregisterResponse{}, notExistsError(req.Name)
Expand All @@ -62,15 +62,15 @@ func (s *Server) MountUnregister(ctx context.Context, req *vtctldatapb.MountUnre
func (s *Server) MountList(ctx context.Context, req *vtctldatapb.MountListRequest) (*vtctldatapb.MountListResponse, error) {
vciList, err := s.ts.GetExternalVitessClusters(ctx)
if err != nil {
return &vtctldatapb.MountListResponse{}, err
return &vtctldatapb.MountListResponse{}, vterrors.Wrap(err, "failed to get external vitess clusters in MountList")
}
return &vtctldatapb.MountListResponse{Names: vciList}, nil
}

func (s *Server) MountShow(ctx context.Context, req *vtctldatapb.MountShowRequest) (*vtctldatapb.MountShowResponse, error) {
vci, err := s.ts.GetExternalVitessCluster(ctx, req.Name)
if err != nil {
return &vtctldatapb.MountShowResponse{}, err
return &vtctldatapb.MountShowResponse{}, vterrors.Wrap(err, "failed to get external vitess cluster in MountShow")
}
if vci == nil {
return &vtctldatapb.MountShowResponse{}, notExistsError(req.Name)
Expand Down
8 changes: 7 additions & 1 deletion proto/vtctlservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,17 @@ service Vtctld {
rpc LookupVindexCreate(vtctldata.LookupVindexCreateRequest) returns (vtctldata.LookupVindexCreateResponse) {};
rpc LookupVindexExternalize(vtctldata.LookupVindexExternalizeRequest) returns (vtctldata.LookupVindexExternalizeResponse) {};

// MigrateCreate creates a workflow which migrates one or more tables from an
// external cluster into Vitess.
rpc MigrateCreate(vtctldata.MigrateCreateRequest) returns (vtctldata.WorkflowStatusResponse) {};


// MountRegister registers a new external Vitess cluster.
rpc MountRegister(vtctldata.MountRegisterRequest) returns (vtctldata.MountRegisterResponse) {};
// MountUnregister unregisters an external Vitess cluster.
rpc MountUnregister(vtctldata.MountUnregisterRequest) returns (vtctldata.MountUnregisterResponse) {};
// MountShow returns information about an external Vitess cluster.
rpc MountShow(vtctldata.MountShowRequest) returns (vtctldata.MountShowResponse) {};
// MountList lists all registered external Vitess clusters.
rpc MountList(vtctldata.MountListRequest) returns (vtctldata.MountListResponse) {};

// MoveTablesCreate creates a workflow which moves one or more tables from a
Expand Down

0 comments on commit 43b72d2

Please sign in to comment.