Skip to content

Commit

Permalink
roachtest: handle VM overload under tpccbench
Browse files Browse the repository at this point in the history
See #62039.

`tpccbench`, by design, pushes CRDB into overload territory. However,
tpccbench was not handling the resulting conditions at all. Any node
death or stalling VM would fail the whole test run, rather than retry
again with a lower warehouse count.

This commit makes most errors "recoverable" in the sense that they
will simply treat the run as failing, but continue the line search.
Exceptions are communicated via `t.Fatal`, which will abort the
whole run instead.

We also make the `c.Stop()` step at the beginning of each search
step resilient to VMs browning out under memory pressure (from
the previous run), by patiently retrying for a few minutes.

The hope is that this will allow the search to run to completion, even
in the face of overload-imposed temporary VM outages. It is not expected
to do this perfectly - after all, VMs don't generally return from
brown-out within any fixed time period - but at least anecdotally most
VMs seem to come back a few minutes in.

Release note: None
  • Loading branch information
tbg authored and irfansharif committed Apr 1, 2021
1 parent 53425a1 commit 84c0136
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 15 deletions.
37 changes: 37 additions & 0 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,42 @@ var runCmd = &cobra.Command{
}),
}

var resetCmd = &cobra.Command{
Use: "reset <cluster>",
Short: "reset *all* VMs in a cluster",
Long: `Reset a cloud VM. This may not be implemented for all
environments and will fall back to a no-op.`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) (retErr error) {
if numNodes <= 0 || numNodes >= 1000 {
// Upper limit is just for safety.
return fmt.Errorf("number of nodes must be in [1..999]")
}

clusterName, err := verifyClusterName(args[0])
if err != nil {
return err
}

if clusterName == config.Local {
return nil
}

cloud, err := cld.ListCloud()
if err != nil {
return err
}
c, ok := cloud.Clusters[clusterName]
if !ok {
return errors.New("cluster not found")
}

return vm.FanOut(c.VMs, func(p vm.Provider, vms vm.List) error {
return p.Reset(vms)
})
}),
}

var installCmd = &cobra.Command{
Use: "install <cluster> <software>",
Short: "install 3rd party software",
Expand Down Expand Up @@ -1585,6 +1621,7 @@ func main() {
cobra.EnableCommandSorting = false
rootCmd.AddCommand(
createCmd,
resetCmd,
destroyCmd,
extendCmd,
listCmd,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,11 @@ func (p *Provider) Delete(vms vm.List) error {
return g.Wait()
}

// Reset is part of vm.Provider. It is a no-op.
func (p *Provider) Reset(vms vm.List) error {
return nil // unimplemented
}

// Extend is part of the vm.Provider interface.
// This will update the Lifetime tag on the instances.
func (p *Provider) Extend(vms vm.List, lifetime time.Duration) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (p *Provider) Delete(vms vm.List) error {
return nil
}

// Reset implements the vm.Provider interface. It is a no-op.
func (p *Provider) Reset(vms vm.List) error {
return nil
}

// DeleteCluster implements the vm.DeleteCluster interface, providing
// a fast-path to tear down all resources associated with a cluster.
func (p *Provider) DeleteCluster(name string) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/flagstub/flagstub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (p *provider) Delete(vms vm.List) error {
return errors.Newf("%s", p.unimplemented)
}

// Reset implements vm.Provider and is a no-op.
func (p *provider) Reset(vms vm.List) error {
return nil
}

// Extend implements vm.Provider and returns Unimplemented.
func (p *provider) Extend(vms vm.List, lifetime time.Duration) error {
return errors.Newf("%s", p.unimplemented)
Expand Down
43 changes: 43 additions & 0 deletions pkg/cmd/roachprod/vm/gce/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,49 @@ func (p *Provider) Delete(vms vm.List) error {
return g.Wait()
}

// Reset implements the vm.Provider interface.
func (p *Provider) Reset(vms vm.List) error {
// Map from project to map of zone to list of machines in that project/zone.
projectZoneMap := make(map[string]map[string][]string)
for _, v := range vms {
if v.Provider != ProviderName {
return errors.Errorf("%s received VM instance from %s", ProviderName, v.Provider)
}
if projectZoneMap[v.Project] == nil {
projectZoneMap[v.Project] = make(map[string][]string)
}

projectZoneMap[v.Project][v.Zone] = append(projectZoneMap[v.Project][v.Zone], v.Name)
}

var g errgroup.Group
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
for project, zoneMap := range projectZoneMap {
for zone, names := range zoneMap {
args := []string{
"compute", "instances", "reset",
}

args = append(args, "--project", project)
args = append(args, "--zone", zone)
args = append(args, names...)

g.Go(func() error {
cmd := exec.CommandContext(ctx, "gcloud", args...)

output, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "Command: gcloud %s\nOutput: %s", args, output)
}
return nil
})
}
}

return g.Wait()
}

// Extend TODO(peter): document
func (p *Provider) Extend(vms vm.List, lifetime time.Duration) error {
// The gcloud command only takes a single instance. Unlike Delete() above, we have to
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/roachprod/vm/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (p *Provider) Delete(vms vm.List) error {
return nil
}

// Reset is part of the vm.Provider interface. This implementation is a no-op.
func (p *Provider) Reset(vms vm.List) error {
return nil
}

// Extend is part of the vm.Provider interface. This implementation returns an error.
func (p *Provider) Extend(vms vm.List, lifetime time.Duration) error {
return errors.New("local clusters have unlimited lifetime")
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachprod/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type Provider interface {
CleanSSH() error
ConfigSSH() error
Create(names []string, opts CreateOpts) error
Reset(vms List) error
Delete(vms List) error
Extend(vms List, lifetime time.Duration) error
// Return the account name associated with the provider
Expand Down
17 changes: 17 additions & 0 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,23 @@ func (c *cluster) Stop(ctx context.Context, opts ...option) {
}
}

func (c *cluster) Reset(ctx context.Context) error {
if c.t.Failed() {
return errors.New("already failed")
}
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "cluster.Reset")
}
args := []string{
roachprod,
"reset",
c.name,
}
c.status("resetting cluster")
defer c.status()
return execCmd(ctx, c.l, args...)
}

// WipeE wipes a subset of the nodes in a cluster. See cluster.Start() for a
// description of the nodes parameter.
func (c *cluster) WipeE(ctx context.Context, l *logger, opts ...option) error {
Expand Down
99 changes: 87 additions & 12 deletions pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,48 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
if res, err := s.Search(func(warehouses int) (bool, error) {
iteration++
t.l.Printf("initializing cluster for %d warehouses (search attempt: %d)", warehouses, iteration)

// NB: for goroutines in this monitor, handle errors via `t.Fatal` to
// *abort* the line search and whole tpccbench run. Return the errors
// to indicate that the specific warehouse count failed, but that the
// line search ought to continue.
m := newMonitor(ctx, c, roachNodes)
c.Stop(ctx, roachNodes)

// We overload the clusters in tpccbench, which can lead to transient infra
// failures. These are a) really annoying to debug and b) hide the actual
// passing warehouse count, making the line search sensitive to the choice
// of starting warehouses. Do a best-effort at waiting for the cloud VM(s)
// to recover without failing the line search.
if err := c.Reset(ctx); err != nil {
t.Fatal(err)
}
var ok bool
for i := 0; i < 10; i++ {
if err := ctx.Err(); err != nil {
t.Fatal(err)
}
shortCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
if err := c.StopE(shortCtx, roachNodes); err != nil {
cancel()
t.l.Printf("unable to stop cluster; retrying to allow vm to recover: %s", err)
// We usually spend a long time blocking in StopE anyway, but just in case
// of a fast-failure mode, we still want to spend a little bit of time over
// the course of 10 retries to maximize the chances of things going back to
// working.
select {
case <-time.After(30 * time.Second):
case <-ctx.Done():
}
continue
}
cancel()
ok = true
break
}
if !ok {
t.Fatalf("VM is hosed; giving up")
}

c.Start(ctx, t, append(b.startOpts(), roachNodes)...)
time.Sleep(restartWait)

Expand Down Expand Up @@ -858,7 +898,8 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
b.partitions(), groupIdx)
activeWarehouses = warehouses / numLoadGroups
default:
panic("unexpected")
// Abort the whole test.
t.Fatalf("unimplemented LoadConfig %v", b.LoadConfig)
}
if b.Chaos {
// For chaos tests, we don't want to use the default method because it
Expand All @@ -874,33 +915,67 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
err := c.RunE(ctx, group.loadNodes, cmd)
loadDone <- timeutil.Now()
if err != nil {
// NB: this will let the line search continue at a lower warehouse
// count.
return errors.Wrapf(err, "error running tpcc load generator")
}
roachtestHistogramsPath := filepath.Join(resultsDir, fmt.Sprintf("%d.%d-stats.json", warehouses, groupIdx))
if err := c.Get(
ctx, t.l, histogramsPath, roachtestHistogramsPath, group.loadNodes,
); err != nil {
t.Fatal(err)
// NB: this will let the line search continue. The reason we do this
// is because it's conceivable that we made it here, but a VM just
// froze up on us. The next search iteration will handle this state.
return err
}
snapshots, err := histogram.DecodeSnapshots(roachtestHistogramsPath)
if err != nil {
return errors.Wrapf(err, "failed to decode histogram snapshots")
// If we got this far, and can't decode data, it's not a case of
// overload but something that deserves failing the whole test.
t.Fatal(err)
}
result := tpcc.NewResultWithSnapshots(activeWarehouses, 0, snapshots)
resultChan <- result
return nil
})
}
if err = m.WaitE(); err != nil {
return false, err
}
failErr := m.WaitE()
close(resultChan)
var results []*tpcc.Result
for partial := range resultChan {
results = append(results, partial)

var res *tpcc.Result
if failErr != nil {
if t.Failed() {
// Someone called `t.Fatal` in a monitored goroutine,
// meaning that something went sideways in a way that
// indicates a general problem (i.e. not just that the
// current warehouse count overloaded the cluster.
// Abort the whole test.
return false, err
}
// A goroutine returned an error, but this means only
// that the given warehouse count did not run to completion,
// presumably because it overloaded the cluster. We thus
// "achieved" zero TpmC, but will continue the search.
//
// Note that it's also possible that we get here due to an
// actual bug in CRDB (for example a node crashing due to
// getting into an invalid state); we cannot distinguish
// those here and so tpccbench isn't a good test to rely
// on to catch crash-causing bugs.
res = &tpcc.Result{
ActiveWarehouses: warehouses,
}
} else {
// We managed to run TPCC, which means that we may or may
// not have "passed" TPCC.
var results []*tpcc.Result
for partial := range resultChan {
results = append(results, partial)
}
res = tpcc.MergeResults(results...)
failErr = res.FailureError()
}
res := tpcc.MergeResults(results...)
failErr := res.FailureError()

// Print the result.
if failErr == nil {
ttycolor.Stdout(ttycolor.Green)
Expand Down
2 changes: 2 additions & 0 deletions pkg/workload/tpcc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ go_test(
size = "small",
srcs = [
"partition_test.go",
"result_test.go",
"stats_test.go",
],
embed = [":tpcc"],
deps = [
"//pkg/testutils",
"//pkg/workload",
"@com_github_stretchr_testify//require",
],
)
9 changes: 6 additions & 3 deletions pkg/workload/tpcc/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ var passing90ThPercentile = map[string]time.Duration{

// Result represents the outcome of a TPCC run.
type Result struct {

// ActiveWarehouses is the number of warehouses used in the TPC-C run.
ActiveWarehouses int

Expand Down Expand Up @@ -155,7 +154,11 @@ func NewResultWithSnapshots(
// TpmC returns a tpmC value with a warehouse factor of 12.86.
// TpmC will panic if r does not contain a "newOrder" histogram in Cumulative.
func (r *Result) TpmC() float64 {
return float64(r.Cumulative["newOrder"].TotalCount()) / (r.Elapsed.Seconds() / 60)
no := r.Cumulative["newOrder"]
if no == nil {
return 0
}
return float64(no.TotalCount()) / (r.Elapsed.Seconds() / 60)
}

// Efficiency returns the efficiency of a TPC-C run.
Expand All @@ -181,7 +184,7 @@ func (r *Result) FailureError() error {
var err error
if eff := r.Efficiency(); eff < PassingEfficiency {
err = errors.CombineErrors(err,
errors.Errorf("efficiency value of %v is below ppassing threshold of %v",
errors.Errorf("efficiency value of %v is below passing threshold of %v",
eff, PassingEfficiency))
}
for query, max90th := range passing90ThPercentile {
Expand Down
26 changes: 26 additions & 0 deletions pkg/workload/tpcc/result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tpcc

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewResult(t *testing.T) {
// Ensure you don't get panics when calling common methods
// on a trivial Result that doesn't have any data attached.
res := NewResult(1000, 0, 0, nil)
require.Error(t, res.FailureError())
require.Zero(t, res.Efficiency())
require.Zero(t, res.TpmC())
}

0 comments on commit 84c0136

Please sign in to comment.