Skip to content

Commit

Permalink
ApplySchema: support --batch-size flag in 'direct' strategy (#13693)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Aug 7, 2023
1 parent 9b93106 commit 67bc76e
Show file tree
Hide file tree
Showing 18 changed files with 2,351 additions and 1,909 deletions.
2 changes: 1 addition & 1 deletion go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func initSchema() {
_, err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor("vtctld/schema", wr.TopoServer(), wr.TabletManagerClient(), wr.Logger(), schemaChangeReplicasTimeout),
schemamanager.NewTabletExecutor("vtctld/schema", wr.TopoServer(), wr.TabletManagerClient(), wr.Logger(), schemaChangeReplicasTimeout, 0),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions go/cmd/vtctldclient/command/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var applySchemaOptions = struct {
WaitReplicasTimeout time.Duration
SkipPreflight bool
CallerID string
BatchSize int64
}{}

func commandApplySchema(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -145,6 +146,7 @@ func commandApplySchema(cmd *cobra.Command, args []string) error {
MigrationContext: applySchemaOptions.MigrationContext,
WaitReplicasTimeout: protoutil.DurationToProto(applySchemaOptions.WaitReplicasTimeout),
CallerId: cid,
BatchSize: applySchemaOptions.BatchSize,
})
if err != nil {
return err
Expand Down Expand Up @@ -296,6 +298,7 @@ func init() {
ApplySchema.Flags().StringVar(&applySchemaOptions.CallerID, "caller-id", "", "Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used).")
ApplySchema.Flags().StringArrayVar(&applySchemaOptions.SQL, "sql", nil, "Semicolon-delimited, repeatable SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().StringVar(&applySchemaOptions.SQLFile, "sql-file", "", "Path to a file containing semicolon-delimited SQL commands to apply. Exactly one of --sql|--sql-file is required.")
ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicabel when all queries are CREATE TABLE|VIEW")

Root.AddCommand(ApplySchema)

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type VtctlClientParams struct {
MigrationContext string
UUIDList string
CallerID string
BatchSize int
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand Down Expand Up @@ -87,7 +88,9 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
if params.UUIDList != "" {
args = append(args, "--uuid_list", params.UUIDList)
}

if params.BatchSize > 0 {
args = append(args, "--batch_size", fmt.Sprintf("%d", params.BatchSize))
}
if params.CallerID != "" {
args = append(args, "--caller_id", params.CallerID)
}
Expand Down
21 changes: 18 additions & 3 deletions go/test/endtoend/vtgate/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestSchemaChange(t *testing.T) {
testWithAlterDatabase(t)
testWithDropCreateSchema(t)
testDropNonExistentTables(t)
testApplySchemaBatch(t)
testCreateInvalidView(t)
testCopySchemaShards(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.TabletPath, 2)
testCopySchemaShards(t, fmt.Sprintf("%s/0", keyspaceName), 3)
Expand All @@ -126,7 +127,6 @@ func testWithInitialSchema(t *testing.T) {

// Check if 4 tables are created
checkTables(t, totalTableCount)
checkTables(t, totalTableCount)

// Also match the vschema for those tablets
matchSchema(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].VttabletProcess.TabletPath, clusterInstance.Keyspaces[0].Shards[1].Vttablets[0].VttabletProcess.TabletPath)
Expand All @@ -144,7 +144,7 @@ func testWithAlterSchema(t *testing.T) {
func testWithAlterDatabase(t *testing.T) {
sql := "create database alter_database_test; alter database alter_database_test default character set = utf8mb4; drop database alter_database_test"
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, sql)
assert.Nil(t, err)
assert.NoError(t, err)
}

// testWithDropCreateSchema , we should be able to drop and create same schema
Expand All @@ -158,7 +158,7 @@ func testWithAlterDatabase(t *testing.T) {
func testWithDropCreateSchema(t *testing.T) {
dropCreateTable := fmt.Sprintf("DROP TABLE vt_select_test_%02d ;", 2) + fmt.Sprintf(createTable, fmt.Sprintf("vt_select_test_%02d", 2))
err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, dropCreateTable)
require.Nil(t, err)
require.NoError(t, err)
checkTables(t, totalTableCount)
}

Expand Down Expand Up @@ -225,6 +225,21 @@ func testCreateInvalidView(t *testing.T) {
}
}

func testApplySchemaBatch(t *testing.T) {
{
sqls := "create table batch1(id int primary key);create table batch2(id int primary key);create table batch3(id int primary key);create table batch4(id int primary key);create table batch5(id int primary key);"
_, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ApplySchema", "--", "--sql", sqls, "--batch_size", "2", keyspaceName)
require.NoError(t, err)
checkTables(t, totalTableCount+5)
}
{
sqls := "drop table batch1; drop table batch2; drop table batch3; drop table batch4; drop table batch5"
_, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ApplySchema", "--", "--sql", sqls, keyspaceName)
require.NoError(t, err)
checkTables(t, totalTableCount)
}
}

// checkTables checks the number of tables in the first two shards.
func checkTables(t *testing.T, count int) {
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0], count)
Expand Down
Loading

0 comments on commit 67bc76e

Please sign in to comment.