Skip to content

Commit

Permalink
kv: delete TestDelayedBeginRetryable, which was replaced by unit tests
Browse files Browse the repository at this point in the history
As of #33523, this test doesn't really make sense. The transaction model
is much more flexible to scenarios like this where a `PushTxn` is issued
before the transaction's record is written. This flexibility required
more targeted testing, which was added in #33523. As a result, we don't
need such a high-level integration test any more.

Instead, the following three subtests of `TestCreateTxnRecord`
sufficiently test that such a situation is handled correctly and
returns a retriable error like we were expecting:
- `TestCreateTxnRecord/begin_transaction_after_end_transaction_(abort)`
- `TestCreateTxnRecord/heartbeat_transaction_after_end_transaction_(abort)`
- `TestCreateTxnRecord/end_transaction_(commit)_after_end_transaction_(abort)`

Release note: None
  • Loading branch information
nvanbenschoten committed Feb 11, 2019
1 parent d4e7883 commit 2e23ae5
Showing 1 changed file with 0 additions and 155 deletions.
155 changes: 0 additions & 155 deletions pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,180 +17,25 @@ package kv_test
import (
"context"
"fmt"
"regexp"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// This file contains contains integration tests that don't fit anywhere else.
// Generally its meant to test scenarios involving both "the client" and "the
// server".

// Test the following scenario:
//
// 1) A client sends a batch with a Begin and other requests
// 2) These requests span ranges, so the DistSender splits the batch and sends
// sub-batches in parallel. Let's say the batch with the Begin is sent to range
// 1 and the rest to r2. The r2 batch executes much quicker than the r1 one, and
// leaves some intents.
// 3) Another txn runs into the intents on r2, tries to push, and succeeds
// because the txn record doesn't exist yet. It writes the txn record as
// Aborted.
// TODO(nvanbenschoten): This test will change when #25437 is fully addressed.
// 4) If the Begin were to execute now, it'd discover the Aborted txn and return
// a retryable TxnAbortedError. But it doesn't execute now; it's still delayed
// somehow.
// 5) The heartbeat loop (which had been started on the client when the Begin
// was sent) has been waiting for 1s, which has now passed, and it sends a
// heartbeat request. This request returns the aborted txn, and so the hb loop
// tries to cleanup - it sends a EndTransaction(commit=false,poison=true). This
// rollback executes and populates the timestamp cache.
// 6) The original BeginTransaction finally executes. It runs into the ts cache
// which generates a TransactionAbortedError
// (ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY).
//
// The point of the test is checking that the client gets a retriable error as a
// result of this convoluted scenario.
func TestDelayedBeginRetryable(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(`Flaky, will be removed when #25437 is addressed`)

// Here's how this test is gonna go:
// - We're going to send a BeginTxn+Put(a)+Put(c). The first two will be split
// from the last one and the sub-batches will run in parallel.
// - We're going to intercept the BeginTxn and block it.
// - We're going to intercept the Put(c) and trigger a Pusher.
// - We're relying on the pusher writing an Aborted txn record and the
// heartbeat loop noticing that, at which point it will send a rollback, which
// we intercept.
// - When we intercept the rollback, we unblock the BeginTxn.

ctx := context.Background()
key := roachpb.Key("a")
key2 := roachpb.Key("c")
unblockBegin := make(chan struct{})
unblockPush := make(chan struct{})
var putCFound int64
var rollbackFound int64
s, _, origDB := serverutils.StartServer(t, base.TestServerArgs{
// We're going to expect a couple of things, in sequence:
// - a BeginTransaction, which we block.
// - a Put("c"), at which point we trigger the pusher
// - a rollback, at which point we unblock the Begin.
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
// We're going to perform manual splits.
DisableMergeQueue: true,
// We use TestingRequestFilter (as opposed to some other filter) in
// order to catch the BeginTransaction before it enters the command
// queue. Otherwise, it would block heartbeats and the rollback.
TestingRequestFilter: func(ba roachpb.BatchRequest) *roachpb.Error {
if btReq, ok := ba.GetArg(roachpb.BeginTransaction); ok {
// We're looking for the batch with BeginTxn, but after it has been
// split in sub-batches (so, when it no longer produces a
// RangeKeyMismatchError). If we find an unsplit one, it's not the
// droid we're looking for.
if len(ba.Requests) == 3 {
return nil
}
bt := btReq.(*roachpb.BeginTransactionRequest)
if bt.Key.Equal(key) {
<-unblockBegin
}
}
return nil
},
TestingResponseFilter: func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error {
if etReq, ok := ba.GetArg(roachpb.EndTransaction); ok {
et := etReq.(*roachpb.EndTransactionRequest)
if !et.Commit && et.Key.Equal(key) && atomic.CompareAndSwapInt64(&rollbackFound, 0, 1) {
close(unblockBegin)
}
return nil
}
if putReq, ok := ba.GetArg(roachpb.Put); ok {
put := putReq.(*roachpb.PutRequest)
if put.Key.Equal(key2) && atomic.CompareAndSwapInt64(&putCFound, 0, 1) {
close(unblockPush)
}
return nil
}
return nil
},
},
},
})
defer s.Stopper().Stop(ctx)

// Create two ranges so that the batch we're about to send gets split.
if err := origDB.AdminSplit(ctx, "b", "b"); err != nil {
t.Fatal(err)
}

// Make a db with a short heartbeat interval.
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
tsf := kv.NewTxnCoordSenderFactory(
kv.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
// Short heartbeat interval.
HeartbeatInterval: time.Millisecond,
Settings: s.ClusterSettings(),
Clock: s.Clock(),
Stopper: s.Stopper(),
},
s.DistSender(),
)
db := client.NewDB(ambient, tsf, s.Clock())
txn := client.NewTxn(ctx, db, 0 /* gatewayNodeID */, client.RootTxn)

pushErr := make(chan error)
go func() {
<-unblockPush
// Conflicting transaction that pushes the above transaction.
conflictTxn := client.NewTxn(ctx, origDB, 0 /* gatewayNodeID */, client.RootTxn)
// Push through a Put, as opposed to a Get, so that the pushee gets aborted.
if err := conflictTxn.Put(ctx, key2, "pusher was here"); err != nil {
pushErr <- err
return
}
pushErr <- conflictTxn.CommitOrCleanup(ctx)
}()

put1 := roachpb.NewPut(key, roachpb.MakeValueFromString("foo")).(*roachpb.PutRequest)
put2 := roachpb.NewPut(key2, roachpb.MakeValueFromString("foo")).(*roachpb.PutRequest)
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn.Serialize()}
ba.Add(put1, put2)
_, pErr := txn.Send(ctx, ba)

if err := <-pushErr; err != nil {
t.Fatal(pushErr)
}

if _, ok := pErr.GetDetail().(*roachpb.TransactionRetryWithProtoRefreshError); !ok {
t.Fatalf("expected TransactionRetryWithProtoRefreshError, got: %v", pErr)
}
exp := "TransactionAbortedError(ABORT_REASON_ABORT_SPAN)"
if !testutils.IsPError(pErr, regexp.QuoteMeta(exp)) {
t.Fatalf("expected %s, got: %s", exp, pErr)
}
}

// Test that waiters on transactions whose commit command is rejected see the
// transaction as Aborted. This test is a regression test for #30792 which was
// causing pushers in the txn wait queue to consider such a transaction
Expand Down

0 comments on commit 2e23ae5

Please sign in to comment.