Skip to content

Commit

Permalink
schemachanger/rel: fix race due to failure to clone constraint slots
Browse files Browse the repository at this point in the history
The fundamental race here is that while the slots themselves were being copied
by value, the "any" clauses which are a slice were not. The second bug here is
that the "inline" values were not being properly reset. That bug could lead to
problems when the query was run again in the context of a different element
set. We need to reset those inline values too.

Fixes #88628

Release note: None
  • Loading branch information
ajwerner committed Sep 26, 2022
1 parent 18b15cf commit 8126b7e
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 10 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/schemachanger/rel/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ go_test(
"//pkg/sql/schemachanger/rel/internal/cyclegraphtest",
"//pkg/sql/schemachanger/rel/internal/entitynodetest",
"//pkg/sql/schemachanger/rel/reltest",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/rel/query_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (p *queryBuilder) processValueExpr(rawValue expr) slotIdx {
if err != nil {
panic(err)
}
return p.fillSlot(slot{not: tv}, false /* isEntity */)
return p.fillSlot(slot{not: &tv}, false /* isEntity */)
case containsExpr:
return p.processValueExpr(v.v)
default:
Expand Down
25 changes: 23 additions & 2 deletions pkg/sql/schemachanger/rel/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type slot struct {
// not holds a value which this slot must not be equal to. Additionally,
// the value which fills this slot must have the same type as the value
// in the not container.
not typedValue
not *typedValue
}

// typedValue is a value in its comparable form, which is to say, it is a
Expand Down Expand Up @@ -75,6 +75,10 @@ func (tv typedValue) toInterface() interface{} {
return tv.toValue().Interface()
}

// inlineValue populates the inline value for the typedValue. The inline
// value is a single scalar which can be used to efficiently compare
// values, but it only has meaning in the context of the current entitySet.
// It must be cleared when moving to a new entity set.
func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) {
if tv.inlineSet {
return tv.inline, nil
Expand All @@ -87,6 +91,11 @@ func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error)
return tv.inline, nil
}

// resetInline clears the inline value.
func (tv *typedValue) resetInline() {
tv.inlineSet, tv.inline = false, 0
}

func (s *slot) eq(other slot) bool {
// TODO(ajwerner): Deal with types. We may have two slots which both have
// nil values but they differ in terms of types.
Expand All @@ -107,6 +116,18 @@ func (s *slot) empty() bool {
return s.value == nil
}

func (s *slot) reset() {
s.typedValue = typedValue{}
if s.any != nil {
for i := 0; i < len(s.any); i++ {
s.any[i].resetInline()
}
}
if s.not != nil {
s.not.resetInline()
}
}

func maybeSet(
slots []slot, idx slotIdx, tv typedValue, set *util.FastIntSet,
) (foundContradiction bool) {
Expand All @@ -131,7 +152,7 @@ func maybeSet(
}
return false, false
}
if s.not.typ != nil {
if s.not != nil {
if tv.typ != s.not.typ || eqNotNil(s.not.value, tv.value) {
return false, true
}
Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/schemachanger/rel/query_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,32 @@ func newEvalContext(q *Query) *evalContext {
return &evalContext{
q: q,
depth: queryDepth(len(q.entities)),
slots: append(make([]slot, 0, len(q.slots)), q.slots...),
slots: cloneSlots(q.slots),
facts: q.facts,
}
}

// cloneSlots clones the slots of a query for use in an evalContext.
func cloneSlots(slots []slot) []slot {
clone := append(make([]slot, 0, len(slots)), slots...)
for i := range clone {
// If there are any slots which map to a set of allowed values, we need
// to clone those values because during query evaluation, we'll fill in
// inline values in the context of the current entity set. This matters
// in particular for constraints related to entities or strings; their
// inline values depend on the entitySet.
if clone[i].any != nil {
vals := clone[i].any
clone[i].any = append(make([]typedValue, 0, len(vals)), vals...)
}
if clone[i].not != nil {
cloned := *clone[i].not
clone[i].not = &cloned
}
}
return clone
}

type evalResult evalContext

func (ec *evalResult) Var(name Var) interface{} {
Expand Down Expand Up @@ -135,9 +156,7 @@ func (ec *evalContext) visit(e entity) error {
// evaluation and then unset them when we pop out of this stack frame.
var slotsFilled util.FastIntSet
defer func() {
slotsFilled.ForEach(func(i int) {
ec.slots[i].typedValue = typedValue{}
})
slotsFilled.ForEach(func(i int) { ec.slots[i].reset() })
}()

// Fill in the slot corresponding to this entity. It should not be filled
Expand Down Expand Up @@ -408,7 +427,7 @@ func (ec *evalContext) visitSubquery(query int) (done bool, _ error) {
defer sub.query.putEvalContext(sec)
defer func() { // reset the slots populated to run the subquery
sub.inputSlotMappings.ForEach(func(_, subSlot int) {
sec.slots[subSlot].typedValue = typedValue{}
sec.slots[subSlot].reset()
})
}()
if err := ec.bindSubQuerySlots(sub.inputSlotMappings, sec); err != nil {
Expand Down
106 changes: 104 additions & 2 deletions pkg/sql/schemachanger/rel/rel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rel_test

import (
"fmt"
"math/rand"
"reflect"
"testing"

Expand All @@ -20,7 +21,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/cyclegraphtest"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/entitynodetest"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/reltest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestRel(t *testing.T) {
Expand Down Expand Up @@ -337,12 +341,12 @@ func TestTooManyAttributesInValues(t *testing.T) {
}

func TestRuleValidation(t *testing.T) {
type tooManyAttrs struct {
type entity struct {
F1, F2 *uint32
}
a1, a2 := stringAttr("a1"), stringAttr("a2")
sc := rel.MustSchema("rules",
rel.EntityMapping(reflect.TypeOf((*tooManyAttrs)(nil)),
rel.EntityMapping(reflect.TypeOf((*entity)(nil)),
rel.EntityAttr(a1, "F1"),
rel.EntityAttr(a2, "F2"),
),
Expand Down Expand Up @@ -439,3 +443,101 @@ func TestEmbeddedFieldsWork(t *testing.T) {
"embedded pointer outer")
})
}

// TestConcurrentQueryInDifferentDatabases stresses some logic of the
// evalContext pooling to ensure that the state is properly reset between
// queries. An important property of this test is that it uses an any
// clause over entity pointers. When these pointers are inlined in the
// context of different databases, they will have different values. By
// randomizing the insertion order, we ensure that the inline values for
// the different entities differ.
//
// This test exercises the code which resets the inline values of slots in
// in the evalContext corresponding to the "not" and "any" constraints on that
// slot. These fields are pointers and need to be reset explicitly. The bug
// which motivated this test was that the slots were only being reset by value.
func TestConcurrentQueryInDifferentDatabases(t *testing.T) {
defer leaktest.AfterTest(t)()

type entity struct {
Str string
Other *entity
}
var str, other stringAttr = "str", "other"
schema := rel.MustSchema("test",
rel.EntityMapping(
reflect.TypeOf((*entity)(nil)),
rel.EntityAttr(str, "Str"),
rel.EntityAttr(other, "Other"),
),
)
newDB := func() *rel.Database {
db, err := rel.NewDatabase(schema, rel.Index{Attrs: []rel.Attr{other}})
require.NoError(t, err)
return db
}
const (
numDBs = 3
numEntities = 5
numContainsVals = 3
)
makeEntities := func() (ret []*entity) {
for i := 0; i < numEntities; i++ {
ret = append(ret, &entity{Str: fmt.Sprintf("s%d", i)})
}
for i := 0; i < numEntities; i++ {
ret[i].Other = ret[(i+1)%numEntities]
}
return ret
}
makeDBs := func() (ret []*rel.Database) {
for i := 0; i < numDBs; i++ {
ret = append(ret, newDB())
}
return ret
}
addEntitiesToDB := func(db *rel.Database, entities []*entity) {
for _, i := range rand.Perm(len(entities)) {
require.NoError(t, db.Insert(entities[i]))
}
}
addEntitiesToDBs := func(dbs []*rel.Database, entities []*entity) {
for _, db := range dbs {
addEntitiesToDB(db, entities)
}
}

dbs, entities := makeDBs(), makeEntities()
addEntitiesToDBs(dbs, entities)
assert.Less(t, numContainsVals, numEntities)
makeContainsVals := func(entities []*entity) (ret []interface{}) {
for i := 0; i < numContainsVals; i++ {
ret = append(ret, entities[i+1])
}
return ret
}
type v = rel.Var
q, err := rel.NewQuery(schema,
v("e").AttrIn(other, makeContainsVals(entities)...),
v("e").AttrNeq(rel.Self, entities[0]), // exclude the first entity
)
require.NoError(t, err)
var N = 8
exp := entities[1:numContainsVals] // the first entity is excluded
run := func(i int) func() error {
return func() error {
var got []*entity
assert.NoError(t, q.Iterate(dbs[i%len(dbs)], func(r rel.Result) error {
got = append(got, r.Var("e").(*entity))
return nil
}))
assert.EqualValues(t, exp, got)
return nil
}
}
var g errgroup.Group
for i := 0; i < N; i++ {
g.Go(run(i))
}
require.NoError(t, g.Wait())
}
1 change: 1 addition & 0 deletions pkg/sql/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
"rename_column_test.go",
"repair_test.go",
"rsg_test.go",
"schema_changes_in_parallel_test.go",
"split_test.go",
"system_table_test.go",
"table_split_test.go",
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/tests/schema_changes_in_parallel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestSchemaChangesInParallel exists to try to shake out races in the
// declarative schema changer infrastructure. At its time of writing, it
// effectively reproduced a race in the rules engine's object pooling.
func TestSchemaChangesInParallel(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{
SkipWaitingForMVCCGC: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(ctx)

const N = 4
run := func(i int) func() (retErr error) {
return func() (retErr error) {
conn, err := sqlDB.Conn(ctx)
if err != nil {
return err
}
defer func() {
retErr = errors.CombineErrors(retErr, conn.Close())
}()
for _, stmt := range []string{
fmt.Sprintf("CREATE DATABASE db%d", i),
fmt.Sprintf("USE db%d", i),
"CREATE TABLE t (i INT PRIMARY KEY, k INT)",
"ALTER TABLE t ADD COLUMN j INT DEFAULT 42",
"ALTER TABLE t DROP COLUMN k",
"CREATE SEQUENCE s",
"ALTER TABLE t ADD COLUMN l INT DEFAULT nextval('s')",
fmt.Sprintf("DROP DATABASE db%d", i),
} {
if _, err := conn.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
}
}
var g errgroup.Group
for i := 0; i < N; i++ {
g.Go(run(i))
}
require.NoError(t, g.Wait())
}

0 comments on commit 8126b7e

Please sign in to comment.