Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84471: sql: implement CREATE FUNCTION in legacy schema changer. r=chengxiong-ruan a=chengxiong-ruan

Release note (sql change): This commit implements the `CREATE FUNCTION`
statement in legacy schema changer. It allows user to create a
function descriptor in data store with all cross references properly
tracked, but function descriptor cannot be used yet. Follwing commits
will add more components to fullfill the user defined function
feature.

Co-authored-by: Chengxiong Ruan <chengxiongruan@gmail.com>
  • Loading branch information
craig[bot] and chengxiong-ruan committed Jul 28, 2022
2 parents 7c7baeb + 0e8c744 commit 3ed3138
Show file tree
Hide file tree
Showing 140 changed files with 4,133 additions and 432 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ ALL_TESTS = [
"//pkg/sql/catalog/dbdesc:dbdesc_test",
"//pkg/sql/catalog/descpb:descpb_test",
"//pkg/sql/catalog/descs:descs_test",
"//pkg/sql/catalog/funcdesc:funcdesc_test",
"//pkg/sql/catalog/hydratedtables:hydratedtables_test",
"//pkg/sql/catalog/internal/validate:validate_test",
"//pkg/sql/catalog/lease:lease_test",
Expand Down Expand Up @@ -1286,6 +1287,8 @@ GO_TARGETS = [
"//pkg/sql/catalog/descs:descs",
"//pkg/sql/catalog/descs:descs_test",
"//pkg/sql/catalog/desctestutils:desctestutils",
"//pkg/sql/catalog/funcdesc:funcdesc",
"//pkg/sql/catalog/funcdesc:funcdesc_test",
"//pkg/sql/catalog/hydratedtables:hydratedtables",
"//pkg/sql/catalog/hydratedtables:hydratedtables_test",
"//pkg/sql/catalog/ingesting:ingesting",
Expand Down Expand Up @@ -2442,6 +2445,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/catalog/descpb:get_x_data",
"//pkg/sql/catalog/descs:get_x_data",
"//pkg/sql/catalog/desctestutils:get_x_data",
"//pkg/sql/catalog/funcdesc:get_x_data",
"//pkg/sql/catalog/hydratedtables:get_x_data",
"//pkg/sql/catalog/ingesting:get_x_data",
"//pkg/sql/catalog/internal/catkv:get_x_data",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func backup(

pkIDs := make(map[uint64]bool)
for i := range backupManifest.Descriptors {
if t, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); t != nil {
if t, _, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); t != nil {
pkIDs[roachpb.BulkOpSummaryID(uint64(t.ID), uint64(t.PrimaryIndex.ID))] = true
}
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func backup(
}
var tableStatistics []*stats.TableStatisticProto
for i := range backupManifest.Descriptors {
if tbl, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tbl != nil {
if tbl, _, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tbl != nil {
tableDesc := tabledesc.NewBuilder(tbl).BuildImmutableTable()
// Collect all the table stats for this table.
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func spansForAllTableIndexes(
// at least 2 revisions, and the first one should have the table in a PUBLIC
// state. We want (and do) ignore tables that have been dropped for the
// entire interval. DROPPED tables should never later become PUBLIC.
rawTbl, _, _, _ := descpb.FromDescriptor(rev.Desc)
rawTbl, _, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTbl != nil && rawTbl.Public() {
forEachPublicIndexTableSpan(rawTbl, added, execCfg.Codec, insertSpan)
}
Expand Down Expand Up @@ -882,7 +882,7 @@ func getReintroducedSpans(
for _, desc := range lastBackup.Descriptors {
// TODO(pbardea): Also check that lastWriteTime is set once those are
// populated on the table descriptor.
if table, _, _, _ := descpb.FromDescriptor(&desc); table != nil && table.Offline() {
if table, _, _, _, _ := descpb.FromDescriptor(&desc); table != nil && table.Offline() {
offlineInLastBackup[table.GetID()] = struct{}{}
}
}
Expand All @@ -902,7 +902,7 @@ func getReintroducedSpans(
// the time of the current backup, but may have been PUBLIC at some time in
// between.
for _, rev := range revs {
rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc)
rawTable, _, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTable == nil {
continue
}
Expand All @@ -917,7 +917,7 @@ func getReintroducedSpans(
// considered.
allRevs := make([]backuppb.BackupManifest_DescriptorRevision, 0, len(revs))
for _, rev := range revs {
rawTable, _, _, _ := descpb.FromDescriptor(rev.Desc)
rawTable, _, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTable == nil {
continue
}
Expand Down Expand Up @@ -959,7 +959,7 @@ func getProtectedTimestampTargetForBackup(backupManifest backuppb.BackupManifest
// timestamp record on each table being backed up.
tableIDs := make(descpb.IDs, 0)
for _, desc := range backupManifest.Descriptors {
t, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, hlc.Timestamp{})
t, _, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, hlc.Timestamp{})
if t != nil {
tableIDs = append(tableIDs, t.GetID())
}
Expand Down Expand Up @@ -1326,7 +1326,7 @@ func createBackupManifest(
dbsInPrev := make(map[descpb.ID]struct{})
rawDescs := prevBackups[len(prevBackups)-1].Descriptors
for i := range rawDescs {
if t, _, _, _ := descpb.FromDescriptor(&rawDescs[i]); t != nil {
if t, _, _, _, _ := descpb.FromDescriptor(&rawDescs[i]); t != nil {
tablesInPrev[t.ID] = struct{}{}
}
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func writeDescsToMetadata(
k := encodeDescSSTKey(i.ID)
var b []byte
if i.Desc != nil {
t, _, _, _ := descpb.FromDescriptor(i.Desc)
t, _, _, _, _ := descpb.FromDescriptor(i.Desc)
if t == nil || !t.Dropped() {
bytes, err := protoutil.Marshal(i.Desc)
if err != nil {
Expand Down Expand Up @@ -326,7 +326,7 @@ func writeNamesToMetadata(
for i, rev := range revs {
names[i].id = rev.ID
names[i].ts = rev.Time
tb, db, typ, sc := descpb.FromDescriptor(rev.Desc)
tb, db, typ, sc, fn := descpb.FromDescriptor(rev.Desc)
if db != nil {
names[i].name = db.Name
} else if sc != nil {
Expand All @@ -346,6 +346,10 @@ func writeNamesToMetadata(
names[i].name = typ.Name
names[i].parent = typ.ParentID
names[i].parentSchema = typ.ParentSchemaID
} else if fn != nil {
names[i].name = fn.Name
names[i].parent = fn.ParentID
names[i].parentSchema = fn.ParentSchemaID
}
}
sort.Sort(names)
Expand Down Expand Up @@ -1025,8 +1029,8 @@ func (di *DescIterator) Next(desc *descpb.Descriptor) bool {
return false
}

tbl, db, typ, sc := descpb.FromDescriptor(desc)
if tbl != nil || db != nil || typ != nil || sc != nil {
tbl, db, typ, sc, fn := descpb.FromDescriptor(desc)
if tbl != nil || db != nil || typ != nil || sc != nil || fn != nil {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func MakeKeyRewriterFromRekeys(
if err := protoutil.Unmarshal(rekey.NewDesc, &desc); err != nil {
return nil, errors.Wrapf(err, "unmarshalling rekey descriptor for old table id %d", rekey.OldID)
}
table, _, _, _ := descpb.FromDescriptor(&desc)
table, _, _, _, _ := descpb.FromDescriptor(&desc)
if table == nil {
return nil, errors.New("expected a table descriptor")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func spansForAllRestoreTableIndexes(
// entire interval. DROPPED tables should never later become PUBLIC.
// TODO(pbardea): Consider and test the interaction between revision_history
// backups and OFFLINE tables.
rawTbl, _, _, _ := descpb.FromDescriptor(rev.Desc)
rawTbl, _, _, _, _ := descpb.FromDescriptor(rev.Desc)
if rawTbl != nil && !rawTbl.Dropped() {
tbl := tabledesc.NewBuilder(rawTbl).BuildImmutableTable()
// We only import spans for physical tables.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestRestoreWithDroppedSchemaCorruption(t *testing.T) {
if err != nil {
return err
}
_, dbDesc, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, res.Value.Timestamp)
_, dbDesc, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, res.Value.Timestamp)
require.NotNil(t, dbDesc)
for name := range dbDesc.Schemas {
if name == dbName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ func doRestorePlan(
for _, m := range mainBackupManifests {
spans := roachpb.Spans(m.Spans)
for i := range m.Descriptors {
table, _, _, _ := descpb.FromDescriptor(&m.Descriptors[i])
table, _, _, _, _ := descpb.FromDescriptor(&m.Descriptors[i])
if table == nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func backupShowerDefault(
schemaIDToName := make(map[descpb.ID]string)
schemaIDToName[keys.PublicSchemaIDForBackup] = catconstants.PublicSchemaName
for i := range manifest.Descriptors {
_, db, _, schema := descpb.FromDescriptor(&manifest.Descriptors[i])
_, db, _, schema, _ := descpb.FromDescriptor(&manifest.Descriptors[i])
if db != nil {
if _, ok := dbIDToName[db.ID]; !ok {
dbIDToName[db.ID] = db.Name
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func getAllDescChanges(
// descriptors to use during restore.
// Note that the modification time of descriptors on disk is usually 0.
// See the comment on MaybeSetDescriptorModificationTime... for more.
t, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(r.Desc, rev.Timestamp)
t, _, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(r.Desc, rev.Timestamp)
if priorIDs != nil && t != nil && t.ReplacementOf.ID != descpb.InvalidID {
priorIDs[t.ID] = t.ReplacementOf.ID
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@ func fetchDescVersionModificationTime(
if err := value.GetProto(&desc); err != nil {
t.Fatal(err)
}
if tableDesc, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, k.Timestamp); tableDesc != nil {
if tableDesc, _, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, k.Timestamp); tableDesc != nil {
if int(tableDesc.Version) == version {
return tableDesc.ModificationTime
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/cliccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/funcdesc",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/row",
"//pkg/sql/rowenc",
Expand Down
15 changes: 14 additions & 1 deletion pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand Down Expand Up @@ -766,6 +767,7 @@ func (b backupMetaDisplayMsg) MarshalJSON() ([]byte, error) {
TableDescriptors map[descpb.ID]string
TypeDescriptors map[descpb.ID]string
SchemaDescriptors map[descpb.ID]string
FunctionDescriptors map[descpb.ID]string
}{
StartTime: timeutil.Unix(0, b.StartTime.WallTime).Format(time.RFC3339),
EndTime: timeutil.Unix(0, b.EndTime.WallTime).Format(time.RFC3339),
Expand All @@ -782,18 +784,20 @@ func (b backupMetaDisplayMsg) MarshalJSON() ([]byte, error) {
TableDescriptors: make(map[descpb.ID]string),
TypeDescriptors: make(map[descpb.ID]string),
SchemaDescriptors: make(map[descpb.ID]string),
FunctionDescriptors: make(map[descpb.ID]string),
}

dbIDToName := make(map[descpb.ID]string)
schemaIDToFullyQualifiedName := make(map[descpb.ID]string)
schemaIDToFullyQualifiedName[keys.PublicSchemaIDForBackup] = catconstants.PublicSchemaName
typeIDToFullyQualifiedName := make(map[descpb.ID]string)
tableIDToFullyQualifiedName := make(map[descpb.ID]string)
funcIDToFullyQualifiedName := make(map[descpb.ID]string)

for i := range b.Descriptors {
d := &b.Descriptors[i]
id := descpb.GetDescriptorID(d)
tableDesc, databaseDesc, typeDesc, schemaDesc := descpb.FromDescriptor(d)
tableDesc, databaseDesc, typeDesc, schemaDesc, functionDesc := descpb.FromDescriptor(d)
if databaseDesc != nil {
dbIDToName[id] = descpb.GetDescriptorName(d)
} else if schemaDesc != nil {
Expand All @@ -815,12 +819,21 @@ func (b backupMetaDisplayMsg) MarshalJSON() ([]byte, error) {
}
tableName := descpb.GetDescriptorName(d)
tableIDToFullyQualifiedName[id] = parentSchema + "." + tableName
} else if functionDesc != nil {
fnDesc := funcdesc.NewBuilder(functionDesc).BuildImmutable()
parentSchema := schemaIDToFullyQualifiedName[fnDesc.GetParentSchemaID()]
if parentSchema == catconstants.PublicSchemaName {
parentSchema = dbIDToName[functionDesc.GetParentID()] + "." + parentSchema
}
fnName := descpb.GetDescriptorName(d)
funcIDToFullyQualifiedName[id] = parentSchema + "." + fnName
}
}
displayMsg.DatabaseDescriptors = dbIDToName
displayMsg.TableDescriptors = tableIDToFullyQualifiedName
displayMsg.SchemaDescriptors = schemaIDToFullyQualifiedName
displayMsg.TypeDescriptors = typeIDToFullyQualifiedName
displayMsg.FunctionDescriptors = funcIDToFullyQualifiedName

return json.Marshal(displayMsg)
}
2 changes: 2 additions & 0 deletions pkg/ccl/cliccl/debug_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ FROM
"EndTime": "${end_time}",
"DataSize": "0 B",
"Rows": 0,
"FunctionDescriptors": {},
"IndexEntries": 0,
"FormatVersion": 1,
"ClusterID": "${cluster_id}",
Expand Down Expand Up @@ -153,6 +154,7 @@ FROM
"EndTime": "${end_time}",
"DataSize": "21 B",
"Rows": 1,
"FunctionDescriptors": {},
"IndexEntries": 0,
"FormatVersion": 1,
"ClusterID": "${cluster_id}",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/reports/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func visitAncestors(
if err := descVal.GetProto(&desc); err != nil {
return false, err
}
tableDesc, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, descVal.Timestamp)
tableDesc, _, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, descVal.Timestamp)
// If it's a database, the parent is the default zone.
if tableDesc == nil {
return visitDefaultZone(ctx, cfg, visitor), nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func SynthesizeClusterVersionFromEngines(
cv := clusterversion.ClusterVersion{
Version: minStoreVersion.Version,
}
log.Eventf(ctx, "read ClusterVersion %+v", cv)
log.Eventf(ctx, "read clusterVersion %+v", cv)

// Avoid running a binary too new for this store. This is what you'd catch
// if, say, you restarted directly from 1.0 into 1.2 (bumping the min
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/diagnostics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (r *Reporter) collectSchemaInfo(ctx context.Context) ([]descpb.TableDescrip
if err := kv.ValueProto(&desc); err != nil {
return nil, errors.Wrapf(err, "%s: unable to unmarshal SQL descriptor", kv.Key)
}
t, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, kv.Value.Timestamp)
t, _, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, kv.Value.Timestamp)
if t != nil && t.ParentID != keys.SystemDatabaseID {
if err := reflectwalk.Walk(t, redactor); err != nil {
panic(err) // stringRedactor never returns a non-nil err
Expand Down
2 changes: 1 addition & 1 deletion pkg/settings/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/stretchr/testify/require"
)

// dummyVersion mocks out the dependency on the ClusterVersion type. It has a
// dummyVersion mocks out the dependency on the clusterVersion type. It has a
// msg1 prefix, and a growsbyone component that grows by one character on each
// update (which is internally validated and asserted against). They're
// separated by a '.' in string form. Neither component can contain a '.'
Expand Down
8 changes: 4 additions & 4 deletions pkg/settings/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ Setting = &VersionSetting{}
// pkg/clusterversion. See VersionSetting for additional commentary.
type VersionSettingImpl interface {
// Decode takes in an encoded cluster version and returns it as the native
// type (the ClusterVersion proto). Except it does it through the
// type (the clusterVersion proto). Except it does it through the
// ClusterVersionImpl to avoid circular dependencies.
Decode(val []byte) (ClusterVersionImpl, error)

Expand All @@ -62,9 +62,9 @@ type VersionSettingImpl interface {
SettingsListDefault() string
}

// ClusterVersionImpl is used to stub out the dependency on the ClusterVersion
// ClusterVersionImpl is used to stub out the dependency on the clusterVersion
// type (in pkg/clusterversion). The VersionSetting below is used to set
// ClusterVersion values, but we can't import the type directly due to the
// clusterVersion values, but we can't import the type directly due to the
// cyclical dependency structure.
type ClusterVersionImpl interface {
ClusterVersionImpl()
Expand All @@ -81,7 +81,7 @@ func MakeVersionSetting(impl VersionSettingImpl) VersionSetting {
}

// Decode takes in an encoded cluster version and returns it as the native
// type (the ClusterVersion proto). Except it does it through the
// type (the clusterVersion proto). Except it does it through the
// ClusterVersionImpl to avoid circular dependencies.
func (v *VersionSetting) Decode(val []byte) (ClusterVersionImpl, error) {
return v.impl.Decode(val)
Expand Down
5 changes: 4 additions & 1 deletion pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
return
}

table, database, typ, schema := descpb.FromDescriptorWithMVCCTimestamp(&descriptor, ev.Value.Timestamp)
table, database, typ, schema, function := descpb.FromDescriptorWithMVCCTimestamp(&descriptor, ev.Value.Timestamp)

var id descpb.ID
var descType catalog.DescriptorType
Expand All @@ -235,6 +235,9 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
case schema != nil:
id = schema.GetID()
descType = catalog.Schema
case function != nil:
id = function.GetID()
descType = catalog.Function
default:
logcrash.ReportOrPanic(ctx, &s.settings.SV, "unknown descriptor unmarshalled %v", descriptor)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"create_database.go",
"create_extension.go",
"create_external_connection.go",
"create_function.go",
"create_index.go",
"create_role.go",
"create_schema.go",
Expand Down Expand Up @@ -314,6 +315,7 @@ go_library(
"//pkg/sql/catalog/descidgen",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/funcdesc",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/multiregion",
"//pkg/sql/catalog/nstree",
Expand Down Expand Up @@ -524,6 +526,7 @@ go_test(
"copy_in_test.go",
"copy_test.go",
"crdb_internal_test.go",
"create_function_test.go",
"create_stats_test.go",
"create_test.go",
"database_test.go",
Expand Down
Loading

0 comments on commit 3ed3138

Please sign in to comment.