Skip to content

Commit

Permalink
Merge #33566
Browse files Browse the repository at this point in the history
33566: kv: stop using BeginTransaction requests r=nvanbenschoten a=nvanbenschoten

Based on #33523.
Closes #25437.

This is the final part of addressing #32971.

The change gates the use of BeginTransaction on a cluster version. When a cluster's version is sufficiently high, it will stop sending them.

To make this work, a small change was needed for the detection of 1PC transactions. We now use sequence numbers to determine that a batch includes all of the writes in a transaction. This is in line with the proposal from the RFC's `1PC Transactions` section and will work correctly with parallel commits.

Release note (performance improvement): Reduce the network and storage
overhead of multi-Range transactions.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jan 10, 2019
2 parents 8e55d19 + c5516ef commit f921c46
Show file tree
Hide file tree
Showing 24 changed files with 314 additions and 171 deletions.
2 changes: 1 addition & 1 deletion docs/RFCS/20181209_lazy_txn_record_creation.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- Feature Name: Lazy Transaction Record Creation (a.k.a Deprecate BeginTransaction)
- Status: in-progress
- Status: completed
- Start Date: 2018-12-09
- Authors: Nathan VanBenschoten
- RFC PR: #32971
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-3</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.1-4</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
3 changes: 2 additions & 1 deletion pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
tcs.interceptorAlloc.txnHeartbeat.init(
&tcs.mu.Mutex,
&tcs.mu.txn,
tcf.st,
tcs.clock,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
Expand Down Expand Up @@ -581,7 +582,7 @@ func (tc *TxnCoordSender) Send(
return nil, pErr
}

if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnHeartbeat.mu.everSentBeginTxn {
if ba.IsSingleEndTransactionRequest() && !tc.interceptorAlloc.txnHeartbeat.mu.everWroteIntents {
return nil, tc.commitReadOnlyTxnLocked(ctx, ba.Requests[0].GetEndTransaction().Deadline)
}

Expand Down
49 changes: 23 additions & 26 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -1263,13 +1264,11 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
txn := ba.Txn.Clone()
br.Txn = &txn

if _, hasBT := ba.GetArg(roachpb.BeginTransaction); hasBT {
if _, ok := ba.Requests[1].GetInner().(*roachpb.PutRequest); !ok {
if _, hasPut := ba.GetArg(roachpb.Put); hasPut {
if _, ok := ba.Requests[0].GetInner().(*roachpb.PutRequest); !ok {
t.Fatalf("expected Put")
}
union := &br.Responses[0] // avoid operating on copy
union.MustSetInner(&roachpb.BeginTransactionResponse{})
union = &br.Responses[1] // avoid operating on copy
union.MustSetInner(&roachpb.PutResponse{})
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
Expand Down Expand Up @@ -1629,8 +1628,8 @@ func TestCommitMutatingTransaction(t *testing.T) {
var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
if !bytes.Equal(ba.Txn.Key, roachpb.Key("a")) {
t.Errorf("expected transaction key to be \"a\"; got %s", ba.Txn.Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok && !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
Expand Down Expand Up @@ -1692,7 +1691,7 @@ func TestCommitMutatingTransaction(t *testing.T) {
if err := db.Txn(ctx, test.f); err != nil {
t.Fatalf("%d: unexpected error on commit: %s", i, err)
}
expectedCalls := []roachpb.Method{roachpb.BeginTransaction, test.expMethod}
expectedCalls := []roachpb.Method{test.expMethod}
if test.pointWrite {
expectedCalls = append(expectedCalls, roachpb.QueryIntent)
}
Expand All @@ -1706,6 +1705,7 @@ func TestCommitMutatingTransaction(t *testing.T) {

// TestTxnInsertBeginTransaction verifies that a begin transaction
// request is inserted just before the first mutating command.
// TODO(nvanbenschoten): Remove in 2.3.
func TestTxnInsertBeginTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1727,9 +1727,12 @@ func TestTxnInsertBeginTransaction(t *testing.T) {
return nil, nil
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
Expand Down Expand Up @@ -1758,6 +1761,7 @@ func TestTxnInsertBeginTransaction(t *testing.T) {

// TestBeginTransactionErrorIndex verifies that the error index is cleared
// when a BeginTransaction command causes an error.
// TODO(nvanbenschoten): Remove in 2.3.
func TestBeginTransactionErrorIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1773,9 +1777,12 @@ func TestBeginTransactionErrorIndex(t *testing.T) {
return nil, pErr
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
Expand Down Expand Up @@ -1885,7 +1892,7 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
sender,
)
db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
expCalls := []roachpb.Method{roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction}
expCalls := []roachpb.Method{roachpb.Put, roachpb.EndTransaction}

testutils.RunTrueAndFalse(t, "success", func(t *testing.T, success bool) {
calls = nil
Expand All @@ -1911,8 +1918,8 @@ func TestEndWriteRestartReadOnlyTransaction(t *testing.T) {
}

// TestTransactionKeyNotChangedInRestart verifies that if the transaction
// already has a key (we're in a restart), the key in the begin transaction
// request is not changed.
// already has a key (we're in a restart), the key in the transaction request is
// not changed.
func TestTransactionKeyNotChangedInRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -1933,13 +1940,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
return nil, nil
}

// Attempt 0 should have a BeginTxnRequest, and a PutRequest.
// Attempt 1 should have a PutRequest.
if attempt == 0 {
if _, ok := ba.GetArg(roachpb.BeginTransaction); !ok {
t.Fatalf("failed to find a begin transaction request: %v", ba)
}
}
// Both attempts should have a PutRequest.
if _, ok := ba.GetArg(roachpb.Put); !ok {
t.Fatalf("failed to find a put request: %v", ba)
}
Expand Down Expand Up @@ -2004,9 +2005,6 @@ func TestSequenceNumbers(t *testing.T) {
expSequence, seq, args)
}
}
if expSequence != ba.Txn.Sequence {
t.Errorf("expected header sequence %d; got %d", expSequence, ba.Txn.Sequence)
}
br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
Expand Down Expand Up @@ -2081,10 +2079,9 @@ func TestConcurrentTxnRequests(t *testing.T) {
}

expectedCallCounts := map[roachpb.Method]int{
roachpb.BeginTransaction: 1,
roachpb.Put: 26,
roachpb.QueryIntent: 26,
roachpb.EndTransaction: 1,
roachpb.Put: 26,
roachpb.QueryIntent: 26,
roachpb.EndTransaction: 1,
}
if !reflect.DeepEqual(expectedCallCounts, callCounts) {
t.Errorf("expected %v, got %v", expectedCallCounts, callCounts)
Expand Down Expand Up @@ -2265,8 +2262,8 @@ func TestTxnCoordSenderPipelining(t *testing.T) {
}

require.Equal(t, []roachpb.Method{
roachpb.BeginTransaction, roachpb.Put, roachpb.QueryIntent, roachpb.EndTransaction,
roachpb.BeginTransaction, roachpb.Put, roachpb.EndTransaction,
roachpb.Put, roachpb.QueryIntent, roachpb.EndTransaction,
roachpb.Put, roachpb.EndTransaction,
}, calls)

for _, action := range []func(ctx context.Context, txn *client.Txn) error{
Expand Down
73 changes: 49 additions & 24 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -50,6 +51,7 @@ type txnHeartbeat struct {
// sends got through `wrapped`, not directly through `gatekeeper`.
gatekeeper lockedSender

st *cluster.Settings
clock *hlc.Clock
heartbeatInterval time.Duration
metrics *TxnMetrics
Expand Down Expand Up @@ -87,16 +89,21 @@ type txnHeartbeat struct {
// successful BeginTxn (in which case we know that there is a txn record)
// but as of May 2018 we don't do that. Note that the server accepts a
// BeginTxn with a higher epoch if a transaction record already exists.
// TODO(nvanbenschoten): Once we stop sending BeginTxn entirely (v2.3)
// we can get rid of this. For now, we keep it to ensure compatibility.
// It can't be collapsed into everWroteIntents because 2.1 nodes expect
// a new BeginTxn request on each epoch (e.g. to detect 1PC txns).
needBeginTxn bool

// everSentBeginTxn is set once a BeginTransactionRequest (out of possibly
// many) was sent to the server. If a BeginTxn was ever sent, then an
// EndTransaction needs to eventually be sent and cannot be elided.
// Note that simply looking at txnEnd == nil to see if a heartbeat loop is
// currently running is not always sufficient for deciding whether an
// EndTransaction can be elided - we want to allow multiple rollback attempts
// to be sent and the first one stops the heartbeat loop.
everSentBeginTxn bool
// everWroteIntents is set once the transaction's first write is sent to
// the server. If a write was ever sent, then an EndTransaction needs to
// eventually be sent and cannot be elided. Note that simply looking at
// txnEnd == nil to see if a heartbeat loop is currently running is not
// always sufficient for deciding whether an EndTransaction can be
// elided - we want to allow multiple rollback attempts to be sent and
// the first one stops the heartbeat loop.
// TODO(nvanbenschoten): Can this be replaced with h.mu.txn.Writing?
everWroteIntents bool
}
}

Expand All @@ -105,6 +112,7 @@ type txnHeartbeat struct {
func (h *txnHeartbeat) init(
mu sync.Locker,
txn *roachpb.Transaction,
st *cluster.Settings,
clock *hlc.Clock,
heartbeatInterval time.Duration,
gatekeeper lockedSender,
Expand All @@ -113,6 +121,7 @@ func (h *txnHeartbeat) init(
asyncAbortCallbackLocked func(context.Context),
) {
h.stopper = stopper
h.st = st
h.clock = clock
h.heartbeatInterval = heartbeatInterval
h.metrics = metrics
Expand Down Expand Up @@ -147,10 +156,11 @@ func (h *txnHeartbeat) SendLocked(
etReq = et.(*roachpb.EndTransactionRequest)
}

addedBeginTxn := false
needBeginTxn := haveTxnWrite && h.mu.needBeginTxn
if needBeginTxn {
h.mu.needBeginTxn = false
h.mu.everSentBeginTxn = true
h.mu.everWroteIntents = true
// From now on, all requests need to be checked against the AbortCache on
// the server side. We also conservatively update the current request,
// although I'm not sure if that's necessary.
Expand All @@ -167,20 +177,25 @@ func (h *txnHeartbeat) SendLocked(
// prepared before we had an anchor.
ba.Txn.Key = anchor
}
// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: h.mu.txn.Key,
},
}

// Inject the new request before the first write position, taking care to
// avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
addedBeginTxn = true

// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: h.mu.txn.Key,
},
}

// Inject the new request before the first write position, taking care to
// avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
}

// Start the heartbeat loop.
// Note that we don't do it for 1PC txns: they only leave intents around on
Expand All @@ -204,7 +219,7 @@ func (h *txnHeartbeat) SendLocked(
var commitTurnedToRollback bool
if haveEndTxn {
// Are we writing now or have we written in the past?
elideEndTxn = !h.mu.everSentBeginTxn
elideEndTxn = !h.mu.everWroteIntents
if elideEndTxn {
ba.Requests = ba.Requests[:lastIndex]
} else if etReq.Commit {
Expand Down Expand Up @@ -234,7 +249,7 @@ func (h *txnHeartbeat) SendLocked(
}

// If we inserted a begin transaction request, remove it here.
if needBeginTxn {
if addedBeginTxn {
if br != nil && br.Responses != nil {
br.Responses = append(br.Responses[:firstWriteIdx], br.Responses[firstWriteIdx+1:]...)
}
Expand Down Expand Up @@ -466,6 +481,16 @@ func (h *txnHeartbeat) heartbeat(ctx context.Context) bool {
return true
}

// TODO(nvanbenschoten): Figure out what to do here. The case we're
// handling is TransactionAbortedErrors without corresponding
// transaction protos attached. @andreimatei any suggestions?
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
h.mu.txn.Status = roachpb.ABORTED
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
h.abortTxnAsyncLocked(ctx)
return false
}

respTxn = pErr.GetTxn()
} else {
respTxn = br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/txn_interceptor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ func (m *txnMetrics) init(txn *roachpb.Transaction, clock *hlc.Clock, metrics *T
func (m *txnMetrics) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
_, hasBegin := ba.GetArg(roachpb.BeginTransaction)
et, hasEnd := ba.GetArg(roachpb.EndTransaction)
m.onePCCommit = hasBegin && hasEnd && et.(*roachpb.EndTransactionRequest).Commit
m.onePCCommit = ba.IsCompleteTransaction()

if m.txnStartNanos == 0 {
m.txnStartNanos = timeutil.Now().UnixNano()
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ func (tp *txnPipeliner) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Fast-path for 1PC transactions.
_, hasBT := ba.GetArg(roachpb.BeginTransaction)
_, hasET := ba.GetArg(roachpb.EndTransaction)
if hasBT && hasET {
if ba.IsCompleteTransaction() {
return tp.wrapped.SendLocked(ctx, ba)
}

Expand Down
Loading

0 comments on commit f921c46

Please sign in to comment.