Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make vttestserver compatible with persistent data directories #7718

Merged
merged 4 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ docker_local:
docker_mini:
${call build_docker_image,docker/mini/Dockerfile,vitess/mini}

DOCKER_VTTESTSERVER_SUFFIX = mysql57 mysql80
DOCKER_VTTESTSERVER_TARGETS = $(addprefix docker_vttestserver_,$(DOCKER_VTTESTSERVER_SUFFIX))
$(DOCKER_VTTESTSERVER_TARGETS): docker_vttestserver_%:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat trick

${call build_docker_image,docker/vttestserver/Dockerfile.$*,vitess/vttestserver:$*}

docker_vttestserver: $(DOCKER_VTTESTSERVER_TARGETS)

# This rule loads the working copy of the code into a bootstrap image,
# and then runs the tests inside Docker.
# Example: $ make docker_test flavor=mariadb
Expand Down
16 changes: 15 additions & 1 deletion go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -81,6 +83,16 @@ func init() {
" Also, the output specifies the mysql unix socket"+
" instead of the vtgate port.")

flag.BoolVar(&config.PersistentMode, "persistent_mode", false,
"If this flag is set, the MySQL data directory is not cleaned up"+
" when LocalCluster.TearDown() is called. This is useful for running"+
" vttestserver as a database container in local developer environments. Note"+
" that db migration files (-schema_dir option) and seeding of"+
" random data (-initialize_with_random_data option) will only run during"+
" cluster startup if the data directory does not already exist. vschema"+
" migrations are run every time the cluster starts, since persistence"+
" for the topology server has not been implemented yet")

flag.BoolVar(&doSeed, "initialize_with_random_data", false,
"If this flag is each table-shard will be initialized"+
" with random data. See also the 'rng_seed' and 'min_shard_size'"+
Expand Down Expand Up @@ -229,7 +241,9 @@ func main() {
log.Fatal(err)
}

select {}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
}

func runCluster() (vttest.LocalCluster, error) {
Expand Down
109 changes: 98 additions & 11 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"strings"
"testing"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/tlstest"

"github.com/stretchr/testify/assert"
Expand All @@ -52,10 +54,12 @@ type columnVindex struct {
}

func TestRunsVschemaMigrations(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster()
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
Expand All @@ -67,12 +71,69 @@ func TestRunsVschemaMigrations(t *testing.T) {
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"})
}

func TestPersistentMode(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

dir, err := ioutil.TempDir("/tmp", "vttestserver_persistent_mode_")
assert.NoError(t, err)
defer os.RemoveAll(dir)

cluster, err := startPersistentCluster(dir)
assert.NoError(t, err)

// basic sanity checks similar to TestRunsVschemaMigrations
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// insert some data to ensure persistence across teardowns
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch("insert into customers (id, name) values (1, 'gopherson')", 1, false)
return err
})
assert.NoError(t, err)

expectedRows := [][]sqltypes.Value{
{sqltypes.NewInt64(1), sqltypes.NewVarChar("gopherson"), sqltypes.NULL},
}

// ensure data was actually inserted
var res *sqltypes.Result
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)

// reboot the persistent cluster
cluster.TearDown()
cluster, err = startPersistentCluster(dir)
defer cluster.TearDown()
assert.NoError(t, err)

// rerun our sanity checks to make sure vschema migrations are run during every startup
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// ensure previous data was successfully persisted
err = execOnCluster(cluster, "app_customer", func(conn *mysql.Conn) (err error) {
res, err = conn.ExecuteFetch("SELECT * FROM customers", 1, false)
return err
})
assert.NoError(t, err)
assert.Equal(t, expectedRows, res.Rows)
}

func TestCanVtGateExecute(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster()
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

client, err := vtctlclient.New(fmt.Sprintf("localhost:%v", cluster.GrpcPort()))
assert.NoError(t, err)
Expand Down Expand Up @@ -109,6 +170,10 @@ Out:
}

func TestMtlsAuth(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
Expand Down Expand Up @@ -141,15 +206,17 @@ func TestMtlsAuth(t *testing.T) {
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
assert.NoError(t, err)
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

// startCluster will apply vschema migrations using vtctl grpc and the clientCert.
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
}

func TestMtlsAuthUnauthorizedFails(t *testing.T) {
args := os.Args
conf := config
defer resetFlags(args, conf)

// Our test root.
root, err := ioutil.TempDir("", "tlstest")
if err != nil {
Expand Down Expand Up @@ -182,13 +249,21 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) {
fmt.Sprintf("-vtctld_grpc_ca=%s", caCert),
fmt.Sprintf("-grpc_auth_mtls_allowed_substrings=%s", "CN=ClientApp"))
defer cluster.TearDown()
args := os.Args
defer resetFlags(args)

assert.Error(t, err)
assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized")
}

func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) {
flags = append(flags, []string{
"-persistent_mode",
// FIXME: if port is not provided, data_dir is not respected
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FIXME can be removed now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a bug unfortunately. -data_dir= needs to always be passed along with -port= or it won't be respected 😬

It's not really something that concerns us day-to-day but I may give it an hour or two today to make a PR for it anyway, if it's a simple fix

Copy link
Member Author

@hkdsun hkdsun Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we move forward without this fix in this PR and tackle this work separately. I can open an issue.

Edit: opened an issue

fmt.Sprintf("-port=%d", randomPort()),
fmt.Sprintf("-data_dir=%s", dir),
}...)
return startCluster(flags...)
}

func startCluster(flags ...string) (vttest.LocalCluster, error) {
schemaDirArg := "-schema_dir=data/schema"
tabletHostname := "-tablet_hostname=localhost"
Expand All @@ -201,6 +276,13 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) {
}

func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigration string) error {
return execOnCluster(cluster, keyspace, func(conn *mysql.Conn) error {
_, err := conn.ExecuteFetch(vschemaMigration, 1, false)
return err
})
}

func execOnCluster(cluster vttest.LocalCluster, keyspace string, f func(*mysql.Conn) error) error {
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Expand All @@ -213,8 +295,7 @@ func addColumnVindex(cluster vttest.LocalCluster, keyspace string, vschemaMigrat
return err
}
defer conn.Close()
_, err = conn.ExecuteFetch(vschemaMigration, 1, false)
return err
return f(conn)
}

func assertColumnVindex(t *testing.T, cluster vttest.LocalCluster, expected columnVindex) {
Expand Down Expand Up @@ -243,6 +324,12 @@ func assertEqual(t *testing.T, actual string, expected string, message string) {
}
}

func resetFlags(args []string) {
func resetFlags(args []string, conf vttest.Config) {
os.Args = args
config = conf
}

func randomPort() int {
v := rand.Int31n(20000)
return int(v + 10000)
}
62 changes: 43 additions & 19 deletions go/test/endtoend/vtcombo/vttest_sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestMain(m *testing.M) {
cfg.Topology = topology
cfg.SchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema"
cfg.DefaultSchemaDir = os.Getenv("VTROOT") + "/test/vttest_schema/default"
cfg.PersistentMode = true

localCluster = &vttest.LocalCluster{
Config: cfg,
Expand Down Expand Up @@ -116,27 +117,24 @@ func TestStandalone(t *testing.T) {
conn, err := vtgateconn.Dial(ctx, grpcAddress)
require.Nil(t, err)
defer conn.Close()
cur := conn.Session(ks1+":-80@master", nil)

idStart, rowCount := 1000, 500
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, "begin", nil)
require.Nil(t, err)
insertManyRows(ctx, t, conn, idStart, rowCount)
assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertCanInsertRow(ctx, t, conn)
assertTablesPresent(t)

for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}

_, err = cur.Execute(ctx, "commit", nil)
err = localCluster.TearDown()
require.Nil(t, err)
err = localCluster.Setup()
require.Nil(t, err)

assertInsertedRowsExist(ctx, t, conn, idStart, rowCount)
assertTablesPresent(t)
}

cur = conn.Session(ks1+":-80@rdonly", nil)
func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@rdonly", nil)
bindVariables := map[string]*querypb.BindVariable{
"id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))},
}
Expand All @@ -153,23 +151,49 @@ func TestStandalone(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 1, len(res.Rows))
assert.Equal(t, "VARCHAR(\"test1000\")", res.Rows[0][1].String())
}

cur = conn.Session(ks1+":80-@master", nil)
_, err = cur.Execute(ctx, "begin", nil)
func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn) {
cur := conn.Session(ks1+":80-@master", nil)
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)

i := 0x810000000000000
bindVariables = map[string]*querypb.BindVariable{
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)

_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}

func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) {
cur := conn.Session(ks1+":-80@master", nil)

query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)"
_, err := cur.Execute(ctx, "begin", nil)
require.Nil(t, err)

for i := idStart; i < idStart+rowCount; i++ {
bindVariables := map[string]*querypb.BindVariable{
"id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
"msg": {Type: querypb.Type_VARCHAR, Value: []byte("test" + strconv.FormatInt(int64(i), 10))},
"keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))},
}
_, err = cur.Execute(ctx, query, bindVariables)
require.Nil(t, err)
}

_, err = cur.Execute(ctx, "commit", nil)
require.Nil(t, err)
}

func assertTablesPresent(t *testing.T) {
tmpCmd := exec.Command("vtctlclient", "-vtctl_client_protocol", "grpc", "-server", grpcAddress, "-stderrthreshold", "0", "ListAllTablets", "test")

log.Infof("Running vtctlclient with command: %v", tmpCmd.Args)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/mysqlctl/mycnf_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func TabletDir(uid uint32) string {
if *tabletDir != "" {
return fmt.Sprintf("%s/%s", env.VtDataRoot(), *tabletDir)
}
return fmt.Sprintf("%s/vt_%010d", env.VtDataRoot(), uid)
return DefaultTabletDirAtRoot(env.VtDataRoot(), uid)
}

// DefaultTabletDirAtRoot returns the default directory for a tablet given a UID and a VtDataRoot variable
func DefaultTabletDirAtRoot(dataRoot string, uid uint32) string {
return fmt.Sprintf("%s/vt_%010d", dataRoot, uid)
}

// MycnfFile returns the default location of the my.cnf file.
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vttest/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (env *LocalTestEnv) MySQLManager(mycnf []string, snapshot string) (MySQLMan
Port: env.PortForProtocol("mysql", ""),
MyCnf: append(env.DefaultMyCnf, mycnf...),
Env: env.EnvVars(),
UID: 1,
}, nil
}

Expand Down Expand Up @@ -241,9 +242,11 @@ func NewLocalTestEnv(flavor string, basePort int) (*LocalTestEnv, error) {
// NewLocalTestEnvWithDirectory returns a new instance of the default test
// environment with a directory explicitly specified.
func NewLocalTestEnvWithDirectory(flavor string, basePort int, directory string) (*LocalTestEnv, error) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
if _, err := os.Stat(directory); os.IsNotExist(err) {
err := os.Mkdir(path.Join(directory, "logs"), 0700)
if err != nil {
return nil, err
}
}

flavor, mycnf, err := GetMySQLOptions(flavor)
Expand Down
Loading