Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61600: kvserver: make the StoreRebalancer aware of non-voters r=aayushshah15 a=aayushshah15

This commit teaches the `StoreRebalancer` to rebalance non-voting
replicas.

Release justification: needed for non-voting replicas
Release note: None

62361: roachtest: attempt to handle VM overload under tpccbench r=irfansharif a=tbg

See #62039.

`tpccbench`, by design, pushes CRDB into overload territory. The test
harness handles nodes crashing or tpmc tanking well. However, it was
not prepared to handle the cloud VMs going unresponsive for ~minutes,
which is one common failure mode.

This commit tweaks the line search to be resilient to failures to
communicate with the cloud VM in the one place where it matters
(stopping the cluster at the beginning of a new search attempt).

The hope is that this will allow the search to run to completion,
even in the face of overload-imposed temporary VM outages. It is
not expected to do this reliably, but at least anecdotally most
VMs seem to come back a few minutes in.

Release note: None


62826: sql: add tests for concurrent add/drop region operations r=ajstorm a=arulajmani

This patch generalizes the setup in what was previously
`TestConcurrentDropRegion` and extends it to all combinations of
add/drop region on a multi-region database. The only change is that
I've added a regional by row table into the test setup mixer, so as to
excercise the repartitioning semantics.

Previously, there was a limitation with concurrent add/drop regions
where both the operations were bound to fail in the repartitioning
phase. This limitation was fixed in #60620, but we never had a
regression test for it. Adding a regional by row table during the
test setup serves as one.

Closes #62813

Release note: None

Co-authored-by: Aayush Shah <aayush.shah15@gmail.com>
Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: arulajmani <arulajmani@gmail.com>
  • Loading branch information
4 people committed Apr 1, 2021
4 parents a5f2143 + d70e286 + 84c0136 + d83b25c commit 0966b27
Show file tree
Hide file tree
Showing 16 changed files with 1,046 additions and 275 deletions.
230 changes: 159 additions & 71 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,91 +24,179 @@ import (
"github.com/stretchr/testify/require"
)

func TestConcurrentDropRegion(t *testing.T) {
// TestConcurrentAddDropRegions tests all combinations of add/drop as if they
// were executed by two concurrent sessions. The general sketch of the test is
// as follows:
// - First operation is executed and blocks before the enum members are promoted.
// - The second operation starts once the first operation has reached the type
// schema changer. It continues to completion. It may succeed/fail depending
// on the specific test setup.
// - The first operation is resumed and allowed to complete. We expect it to
// succeed.
// - Verify database regions are as expected.
// Operations act on a multi-region database that contains a REGIONAL BY ROW
// table, so as to exercise the repartitioning semantics.
func TestConcurrentAddDropRegions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

firstDropAtSchemaChange := false
secondDropAtSchemaChange := false
waitForFirstDropRegionToReachSchemaChanger := make(chan struct{})
waitForSecondDropRegionToReachSchemaChanger := make(chan struct{})
waitForFirstDropRegionToFinish := make(chan struct{})
var mu syncutil.Mutex
skip.UnderRace(t, "times out under race")

// Pause the first drop region in the type schema changer before it does enum
// member promotion. Then release it when the second drop region gets to the
// same state. This validates that the second drop region can complete without
// requiring an override (something which was not possible before #62354).
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
mu.Lock()
if !firstDropAtSchemaChange {
firstDropAtSchemaChange = true
close(waitForFirstDropRegionToReachSchemaChanger)
mu.Unlock()
<-waitForSecondDropRegionToReachSchemaChanger
} else if !secondDropAtSchemaChange {
secondDropAtSchemaChange = true
// We're the second DROP REGION, close the channel to free the first
// DROP REGION.
close(waitForSecondDropRegionToReachSchemaChanger)
mu.Unlock()
}
},
testCases := []struct {
name string
firstOp string
secondOp string
expectedRegions []string
secondOpErr string
}{
{
"concurrent-drops",
`ALTER DATABASE db DROP REGION "us-east2"`,
`ALTER DATABASE db DROP REGION "us-east3"`,
[]string{"us-east1"},
"",
},
{
"concurrent-adds",
`ALTER DATABASE db ADD REGION "us-east4"`,
`ALTER DATABASE db ADD REGION "us-east5"`,
[]string{"us-east1", "us-east2", "us-east3", "us-east4", "us-east5"},
"",
},
{
"concurrent-add-drop",
`ALTER DATABASE db ADD REGION "us-east4"`,
`ALTER DATABASE db DROP REGION "us-east3"`,
[]string{"us-east1", "us-east2", "us-east4"},
"",
},
{
"concurrent-drop-add",
`ALTER DATABASE db DROP REGION "us-east2"`,
`ALTER DATABASE db ADD REGION "us-east5"`,
[]string{"us-east1", "us-east3", "us-east5"},
"",
},
{
"concurrent-add-same-region",
`ALTER DATABASE db ADD REGION "us-east5"`,
`ALTER DATABASE db ADD REGION "us-east5"`,
[]string{"us-east1", "us-east2", "us-east3", "us-east5"},
`region "us-east5" already added to database`,
},
{
"concurrent-drop-same-region",
`ALTER DATABASE db DROP REGION "us-east2"`,
`ALTER DATABASE db DROP REGION "us-east2"`,
[]string{"us-east1", "us-east3"},
`enum value "us-east2" is already being dropped`,
},
{
"concurrent-add-drop-same-region",
`ALTER DATABASE db ADD REGION "us-east5"`,
`ALTER DATABASE db DROP REGION "us-east5"`,
[]string{"us-east1", "us-east2", "us-east3", "us-east5"},
`enum value "us-east5" is being added, try again later`,
},
{
"concurrent-drop-add-same-region",
`ALTER DATABASE db DROP REGION "us-east2"`,
`ALTER DATABASE db ADD REGION "us-east2"`,
[]string{"us-east1", "us-east3"},
`enum value "us-east2" is being dropped, try again later`,
},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

_, err := sqlDB.Exec(`CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"`)
if err != nil {
t.Error(err)
}

// Send the first DROP REGION statement on its way.
go func() {
if _, err := sqlDB.Exec(`ALTER DATABASE db DROP REGION "us-east2"`); err != nil {
t.Error(err)
}
close(waitForFirstDropRegionToFinish)
}()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
firstOp := true
firstOpStarted := make(chan struct{})
secondOpFinished := make(chan struct{})
firstOpFinished := make(chan struct{})
var mu syncutil.Mutex

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
mu.Lock()
if firstOp {
firstOp = false
close(firstOpStarted)
mu.Unlock()
// Don't promote any members before the second operation reaches
// the schema changer as well.
<-secondOpFinished
} else {
mu.Unlock()
}
},
},
}

// Wait for the first DROP REGION to make it to the schema changer.
<-waitForFirstDropRegionToReachSchemaChanger
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 5 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

// Issue the second DROP REGION statement.
_, err = sqlDB.Exec(`ALTER DATABASE db DROP REGION "us-east3"`)
// Create a multi-region database with a REGIONAL BY ROW table inside of it
// which needs to be re-partitioned on add/drop operations.
_, err := sqlDB.Exec(`
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
CREATE TABLE db.rbr () LOCALITY REGIONAL BY ROW`)
require.NoError(t, err)

if err != nil {
t.Error(err)
}
go func() {
if _, err := sqlDB.Exec(tc.firstOp); err != nil {
t.Error(err)
}
close(firstOpFinished)
}()

// Wait for the first operation to reach the type schema changer.
<-firstOpStarted

// Start the second operation.
_, err = sqlDB.Exec(tc.secondOp)
if tc.secondOpErr == "" {
require.NoError(t, err)
} else {
require.True(t, testutils.IsError(err, tc.secondOpErr))
}
close(secondOpFinished)

<-waitForFirstDropRegionToFinish
<-firstOpFinished

// Validate that both DROP REGION statements completed.
rows, err := sqlDB.Query("SELECT region FROM [SHOW REGIONS FROM DATABASE db]")
if err != nil {
t.Error(err)
}
defer rows.Close()
dbRegions := make([]string, 0, len(tc.expectedRegions))
rows, err := sqlDB.Query("SELECT region FROM [SHOW REGIONS FROM DATABASE db]")
require.NoError(t, err)
for {
done := rows.Next()
if !done {
break
}
var region string
err := rows.Scan(&region)
require.NoError(t, err)

const expectedRegion = "us-east1"
var region string
rows.Next()
if err := rows.Scan(&region); err != nil {
t.Error(err)
}
dbRegions = append(dbRegions, region)
}

if region != expectedRegion {
t.Error(errors.Newf("expected region to be: %q, got %q", expectedRegion, region))
}
if len(dbRegions) != len(tc.expectedRegions) {
t.Fatalf("unexpected number of regions, expected: %v found %v",
tc.expectedRegions,
dbRegions,
)
}

if rows.Next() {
t.Error(errors.New("unexpected number of rows returned"))
for i, expectedRegion := range tc.expectedRegions {
if expectedRegion != dbRegions[i] {
t.Fatalf("unexpected regions, expected: %v found %v",
tc.expectedRegions,
dbRegions,
)
}
}
})
}
}

Expand Down Expand Up @@ -164,7 +252,7 @@ ALTER DATABASE db PRIMARY REGION "us-east2";
t.Fatalf("expected error, found nil")
}
if !testutils.IsError(err, `"us-east2" has not been added to the database`) {
t.Fatalf(`expected err, got %v`, err)
t.Fatalf(`expected secondOpErr, got %v`, err)
}

close(waitForPrimaryRegionSwitch)
Expand Down
37 changes: 37 additions & 0 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,42 @@ var runCmd = &cobra.Command{
}),
}

var resetCmd = &cobra.Command{
Use: "reset <cluster>",
Short: "reset *all* VMs in a cluster",
Long: `Reset a cloud VM. This may not be implemented for all
environments and will fall back to a no-op.`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) (retErr error) {
if numNodes <= 0 || numNodes >= 1000 {
// Upper limit is just for safety.
return fmt.Errorf("number of nodes must be in [1..999]")
}

clusterName, err := verifyClusterName(args[0])
if err != nil {
return err
}

if clusterName == config.Local {
return nil
}

cloud, err := cld.ListCloud()
if err != nil {
return err
}
c, ok := cloud.Clusters[clusterName]
if !ok {
return errors.New("cluster not found")
}

return vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
return p.Reset(vms)
})
}),
}

var installCmd = &cobra.Command{
Use: "install <cluster> <software>",
Short: "install 3rd party software",
Expand Down Expand Up @@ -1737,6 +1773,7 @@ func main() {
cobra.EnableCommandSorting = false
rootCmd.AddCommand(
createCmd,
resetCmd,
destroyCmd,
extendCmd,
listCmd,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ func (p *Provider) Delete(vms vm.List) error {
return g.Wait()
}

// Reset is part of vm.Provider. It is a no-op.
func (p *Provider) Reset(vms vm.List) error {
return nil // unimplemented
}

// Extend is part of the vm.Provider interface.
// This will update the Lifetime tag on the instances.
func (p *Provider) Extend(vms vm.List, lifetime time.Duration) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (p *Provider) Delete(vms vm.List) error {
return nil
}

// Reset implements the vm.Provider interface. It is a no-op.
func (p *Provider) Reset(vms vm.List) error {
return nil
}

// DeleteCluster implements the vm.DeleteCluster interface, providing
// a fast-path to tear down all resources associated with a cluster.
func (p *Provider) DeleteCluster(name string) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/flagstub/flagstub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (p *provider) Delete(vms vm.List) error {
return errors.Newf("%s", p.unimplemented)
}

// Reset implements vm.Provider and is a no-op.
func (p *provider) Reset(vms vm.List) error {
return nil
}

// Extend implements vm.Provider and returns Unimplemented.
func (p *provider) Extend(vms vm.List, lifetime time.Duration) error {
return errors.Newf("%s", p.unimplemented)
Expand Down
43 changes: 43 additions & 0 deletions pkg/cmd/roachprod/vm/gce/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,49 @@ func (p *Provider) Delete(vms vm.List) error {
return g.Wait()
}

// Reset implements the vm.Provider interface.
func (p *Provider) Reset(vms vm.List) error {
// Map from project to map of zone to list of machines in that project/zone.
projectZoneMap := make(map[string]map[string][]string)
for _, v := range vms {
if v.Provider != ProviderName {
return errors.Errorf("%s received VM instance from %s", ProviderName, v.Provider)
}
if projectZoneMap[v.Project] == nil {
projectZoneMap[v.Project] = make(map[string][]string)
}

projectZoneMap[v.Project][v.Zone] = append(projectZoneMap[v.Project][v.Zone], v.Name)
}

var g errgroup.Group
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
for project, zoneMap := range projectZoneMap {
for zone, names := range zoneMap {
args := []string{
"compute", "instances", "reset",
}

args = append(args, "--project", project)
args = append(args, "--zone", zone)
args = append(args, names...)

g.Go(func() error {
cmd := exec.CommandContext(ctx, "gcloud", args...)

output, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "Command: gcloud %s\nOutput: %s", args, output)
}
return nil
})
}
}

return g.Wait()
}

// Extend TODO(peter): document
func (p *Provider) Extend(vms vm.List, lifetime time.Duration) error {
// The gcloud command only takes a single instance. Unlike Delete() above, we have to
Expand Down
Loading

0 comments on commit 0966b27

Please sign in to comment.