Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 17, 2024
1 parent 998433c commit 7708be8
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 117 deletions.
10 changes: 6 additions & 4 deletions go/cmd/vtcombo/cli/vschema_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ func loadKeyspacesFromDir(ctx context.Context, dir string, ts *topo.Server) {
log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err)
}

keyspace := &vschemapb.Keyspace{}
err = json.Unmarshal(jsonData, keyspace)
keyspace := &topo.KeyspaceVSchemaInfo{
Name: ks.Name,
}
err = json.Unmarshal(jsonData, keyspace.Keyspace)
if err != nil {
log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err)
}

_, err = vindexes.BuildKeyspace(keyspace, env.Parser())
_, err = vindexes.BuildKeyspace(keyspace.Keyspace, env.Parser())
if err != nil {
log.Fatalf("Invalid keyspace definition: %v", err)
}
ts.SaveVSchema(ctx, ks.Name, keyspace)
ts.SaveVSchema(ctx, keyspace)
log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile)
}
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/helpers/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func CopyKeyspaces(ctx context.Context, fromTS, toTS *topo.Server, parser *sqlpa
vs, err := fromTS.GetVSchema(ctx, keyspace)
switch {
case err == nil:
_, err = vindexes.BuildKeyspace(vs, parser)
_, err = vindexes.BuildKeyspace(vs.Keyspace, parser)
if err != nil {
log.Errorf("BuildKeyspace(%v): %v", keyspace, err)
break
}
if err := toTS.SaveVSchema(ctx, keyspace, vs); err != nil {
if err := toTS.SaveVSchema(ctx, vs); err != nil {
log.Errorf("SaveVSchema(%v): %v", keyspace, err)
}
case topo.IsErrType(err, topo.NoNode):
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/srv_vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error {
k, err := ts.GetVSchema(ctx, keyspace)
if IsErrType(err, NoNode) {
err = nil
k = &vschemapb.Keyspace{}
k = &KeyspaceVSchemaInfo{}
}

mu.Lock()
Expand All @@ -184,7 +184,7 @@ func (ts *Server) RebuildSrvVSchema(ctx context.Context, cells []string) error {
finalErr = err
return
}
srvVSchema.Keyspaces[keyspace] = k
srvVSchema.Keyspaces[keyspace] = k.Keyspace
}(keyspace)
}
wg.Wait()
Expand Down
48 changes: 33 additions & 15 deletions go/vt/topo/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,37 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

// KeyspaceVSchemaInfo is a meta struct that contains metadata to give
// the data more context and convenience. This is the main way we
// interact with a keyspace's vschema.
type KeyspaceVSchemaInfo struct {
Name string
*vschemapb.Keyspace
version Version
}

// SaveVSchema saves a Vschema. A valid Vschema should be passed in. It does not verify its correctness.
// If the VSchema is empty, just remove it.
func (ts *Server) SaveVSchema(ctx context.Context, keyspace string, vschema *vschemapb.Keyspace) error {
func (ts *Server) SaveVSchema(ctx context.Context, ks *KeyspaceVSchemaInfo) error {
if err := ctx.Err(); err != nil {
return err
}

nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile)
data, err := vschema.MarshalVT()
nodePath := path.Join(KeyspacesPath, ks.Name, VSchemaFile)
data, err := ks.MarshalVT()
if err != nil {
return err
}

_, err = ts.globalCell.Update(ctx, nodePath, data, nil)
version, err := ts.globalCell.Update(ctx, nodePath, data, ks.version)
if err != nil {
log.Errorf("failed to update vschema for keyspace %s: %v", keyspace, err)
} else {
log.Infof("successfully updated vschema for keyspace %s: %+v", keyspace, vschema)
ve := vterrors.Wrapf(err, "failed to update vschema for keyspace %s", ks.Name)
log.Error(ve)
return ve
}
return err
ks.version = version
log.Infof("successfully updated vschema for keyspace %s: %+v", ks.Name, ks.Keyspace)
return nil
}

// DeleteVSchema delete the keyspace if it exists
Expand All @@ -61,13 +72,13 @@ func (ts *Server) DeleteVSchema(ctx context.Context, keyspace string) error {
}

// GetVSchema fetches the vschema from the topo.
func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.Keyspace, error) {
func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*KeyspaceVSchemaInfo, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

nodePath := path.Join(KeyspacesPath, keyspace, VSchemaFile)
data, _, err := ts.globalCell.Get(ctx, nodePath)
data, version, err := ts.globalCell.Get(ctx, nodePath)
if err != nil {
return nil, err
}
Expand All @@ -76,7 +87,11 @@ func (ts *Server) GetVSchema(ctx context.Context, keyspace string) (*vschemapb.K
if err != nil {
return nil, vterrors.Wrapf(err, "bad vschema data: %q", data)
}
return &vs, nil
return &KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: &vs,
version: version,
}, nil
}

// EnsureVSchema makes sure that a vschema is present for this keyspace or creates a blank one if it is missing
Expand All @@ -86,10 +101,13 @@ func (ts *Server) EnsureVSchema(ctx context.Context, keyspace string) error {
log.Infof("error in getting vschema for keyspace %s: %v", keyspace, err)
}
if vschema == nil || IsErrType(err, NoNode) {
err = ts.SaveVSchema(ctx, keyspace, &vschemapb.Keyspace{
Sharded: false,
Vindexes: make(map[string]*vschemapb.Vindex),
Tables: make(map[string]*vschemapb.Table),
err = ts.SaveVSchema(ctx, &KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: &vschemapb.Keyspace{
Sharded: false,
Vindexes: make(map[string]*vschemapb.Vindex),
Tables: make(map[string]*vschemapb.Table),
},
})
if err != nil {
log.Errorf("could not create blank vschema: %v", err)
Expand Down
7 changes: 2 additions & 5 deletions go/vt/topotools/vschema_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
Expand All @@ -28,11 +29,7 @@ import (

// ApplyVSchemaDDL applies the given DDL statement to the vschema
// keyspace definition and returns the modified keyspace object.
func ApplyVSchemaDDL(ksName string, ks *vschemapb.Keyspace, alterVschema *sqlparser.AlterVschema) (*vschemapb.Keyspace, error) {
if ks == nil {
ks = new(vschemapb.Keyspace)
}

func ApplyVSchemaDDL(ksName string, ks *topo.KeyspaceVSchemaInfo, alterVschema *sqlparser.AlterVschema) (*topo.KeyspaceVSchemaInfo, error) {
if ks.Tables == nil {
ks.Tables = map[string]*vschemapb.Table{}
}
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,11 @@ func CreateKs(
if err != nil {
return 0, fmt.Errorf("BuildKeyspace(%v) failed: %v", keyspace, err)
}
if err := ts.SaveVSchema(ctx, keyspace, formal); err != nil {
ks := &topo.KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: formal,
}
if err := ts.SaveVSchema(ctx, ks); err != nil {
return 0, fmt.Errorf("SaveVSchema(%v) failed: %v", keyspace, err)
}
} else {
Expand Down
44 changes: 24 additions & 20 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,9 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV
return nil, err
}

var vs *vschemapb.Keyspace
ks := &topo.KeyspaceVSchemaInfo{
Name: req.Keyspace,
}

if req.Sql != "" {
span.Annotate("sql_mode", true)
Expand All @@ -355,29 +357,29 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV
return nil, err
}

vs, err = s.ts.GetVSchema(ctx, req.Keyspace)
ks, err = s.ts.GetVSchema(ctx, req.Keyspace)
if err != nil && !topo.IsErrType(err, topo.NoNode) {
err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace)
return nil, err
} // otherwise, we keep the empty vschema object from above

vs, err = topotools.ApplyVSchemaDDL(req.Keyspace, vs, ddl)
ks, err = topotools.ApplyVSchemaDDL(req.Keyspace, ks, ddl)
if err != nil {
err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, vs, ddl)
err = vterrors.Wrapf(err, "ApplyVSchemaDDL(%s,%v,%v)", req.Keyspace, ks, ddl)
return nil, err
}
} else { // "jsonMode"
span.Annotate("sql_mode", false)
vs = req.VSchema
ks.Keyspace = req.VSchema
}

ksVs, err := vindexes.BuildKeyspace(vs, s.ws.SQLParser())
ksVs, err := vindexes.BuildKeyspace(ks.Keyspace, s.ws.SQLParser())
if err != nil {
err = vterrors.Wrapf(err, "BuildKeyspace(%s)", req.Keyspace)
return nil, err
}
response := &vtctldatapb.ApplyVSchemaResponse{
VSchema: vs,
VSchema: ks.Keyspace,
UnknownVindexParams: make(map[string]*vtctldatapb.ApplyVSchemaResponse_ParamList),
}

Expand Down Expand Up @@ -409,7 +411,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV
return response, err
}

if err = s.ts.SaveVSchema(ctx, req.Keyspace, vs); err != nil {
if err = s.ts.SaveVSchema(ctx, ks); err != nil {
err = vterrors.Wrapf(err, "SaveVSchema(%s, %v)", req.Keyspace, req.VSchema)
return nil, err
}
Expand All @@ -425,7 +427,7 @@ func (s *VtctldServer) ApplyVSchema(ctx context.Context, req *vtctldatapb.ApplyV
err = vterrors.Wrapf(err, "GetVSchema(%s)", req.Keyspace)
return nil, err
}
response.VSchema = updatedVS
response.VSchema = updatedVS.Keyspace
return response, nil
}

Expand Down Expand Up @@ -934,27 +936,29 @@ func (s *VtctldServer) CreateKeyspace(ctx context.Context, req *vtctldatapb.Crea
}

if req.Type == topodatapb.KeyspaceType_SNAPSHOT {
var vs *vschemapb.Keyspace
vs, err = s.ts.GetVSchema(ctx, req.BaseKeyspace)
ks, err := s.ts.GetVSchema(ctx, req.BaseKeyspace)
if err != nil {
log.Infof("error from GetVSchema(%v) = %v", req.BaseKeyspace, err)
if topo.IsErrType(err, topo.NoNode) {
log.Infof("base keyspace %v does not exist; continuing with bare, unsharded vschema", req.BaseKeyspace)
vs = &vschemapb.Keyspace{
Sharded: false,
Tables: map[string]*vschemapb.Table{},
Vindexes: map[string]*vschemapb.Vindex{},
ks = &topo.KeyspaceVSchemaInfo{
Name: req.BaseKeyspace,
Keyspace: &vschemapb.Keyspace{
Sharded: false,
Tables: map[string]*vschemapb.Table{},
Vindexes: map[string]*vschemapb.Vindex{},
},
}
} else {
return nil, err
}
}

// SNAPSHOT keyspaces are excluded from global routing.
vs.RequireExplicitRouting = true
ks.RequireExplicitRouting = true

if err = s.ts.SaveVSchema(ctx, req.Name, vs); err != nil {
err = fmt.Errorf("SaveVSchema(%v) = %w", vs, err)
if err = s.ts.SaveVSchema(ctx, ks); err != nil {
err = fmt.Errorf("SaveVSchema(%v) = %w", ks, err)
return nil, err
}
}
Expand Down Expand Up @@ -2656,13 +2660,13 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche

span.Annotate("keyspace", req.Keyspace)

vschema, err := s.ts.GetVSchema(ctx, req.Keyspace)
ks, err := s.ts.GetVSchema(ctx, req.Keyspace)
if err != nil {
return nil, err
}

return &vtctldatapb.GetVSchemaResponse{
VSchema: vschema,
VSchema: ks.Keyspace,
}, nil
}

Expand Down
35 changes: 22 additions & 13 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,11 +1890,14 @@ func commandCreateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags
if err != nil {
wr.Logger().Infof("error from GetVSchema for base_keyspace: %v, %v", *baseKeyspace, err)
if topo.IsErrType(err, topo.NoNode) {
vs = &vschemapb.Keyspace{
Sharded: false,
Tables: make(map[string]*vschemapb.Table),
Vindexes: make(map[string]*vschemapb.Vindex),
RequireExplicitRouting: true,
vs = &topo.KeyspaceVSchemaInfo{
Name: *baseKeyspace,
Keyspace: &vschemapb.Keyspace{
Sharded: false,
Tables: make(map[string]*vschemapb.Table),
Vindexes: make(map[string]*vschemapb.Vindex),
RequireExplicitRouting: true,
},
}
} else {
return err
Expand All @@ -1903,7 +1906,7 @@ func commandCreateKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags
// SNAPSHOT keyspaces are excluded from global routing.
vs.RequireExplicitRouting = true
}
if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil {
if err := wr.TopoServer().SaveVSchema(ctx, vs); err != nil {
wr.Logger().Infof("error from SaveVSchema %v:%v", vs, err)
return err
}
Expand Down Expand Up @@ -3343,7 +3346,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p
}
keyspace := subFlags.Arg(0)

var vs *vschemapb.Keyspace
var vs *topo.KeyspaceVSchemaInfo
var err error

sqlMode := (*sql != "") != (*sqlFile != "")
Expand Down Expand Up @@ -3378,7 +3381,10 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p
vs, err = wr.TopoServer().GetVSchema(ctx, keyspace)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
vs = &vschemapb.Keyspace{}
vs = &topo.KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: &vschemapb.Keyspace{},
}
} else {
return err
}
Expand All @@ -3402,8 +3408,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p
schema = []byte(*vschema)
}

vs = &vschemapb.Keyspace{}
err := json2.UnmarshalPB(schema, vs)
vs = &topo.KeyspaceVSchemaInfo{
Name: keyspace,
Keyspace: &vschemapb.Keyspace{},
}
err := json2.UnmarshalPB(schema, vs.Keyspace)
if err != nil {
return err
}
Expand All @@ -3417,7 +3426,7 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p
}

// Validate the VSchema.
ksVs, err := vindexes.BuildKeyspace(vs, wr.SQLParser())
ksVs, err := vindexes.BuildKeyspace(vs.Keyspace, wr.SQLParser())
if err != nil {
return err
}
Expand Down Expand Up @@ -3449,11 +3458,11 @@ func commandApplyVSchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *p
return err
}

if _, err := vindexes.BuildKeyspace(vs, wr.SQLParser()); err != nil {
if _, err := vindexes.BuildKeyspace(vs.Keyspace, wr.SQLParser()); err != nil {
return err
}

if err := wr.TopoServer().SaveVSchema(ctx, keyspace, vs); err != nil {
if err := wr.TopoServer().SaveVSchema(ctx, vs); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 7708be8

Please sign in to comment.