diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index cf9017f9dbf..90ebcf30ab1 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -33,6 +33,7 @@
kv.transaction.max_intents_bytes | integer | 256000 | maximum number of bytes used to track write intents in transactions |
kv.transaction.max_refresh_spans_bytes | integer | 256000 | maximum number of bytes used to track refresh spans in serializable transactions |
kv.transaction.write_pipelining_enabled | boolean | true | if enabled, transactional writes are pipelined through Raft consensus |
+kv.transaction.write_pipelining_max_batch_size | integer | 0 | if non-zero, defines that maximum size batch that will be pipelined through Raft consensus |
rocksdb.min_wal_sync_interval | duration | 0s | minimum duration between syncs of the RocksDB WAL |
server.clock.forward_jump_check_enabled | boolean | false | if enabled, forward clock jumps > max_offset/2 will cause a panic. |
server.clock.persist_upper_bound_interval | duration | 0s | the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature. |
diff --git a/pkg/kv/txn_interceptor_pipeliner.go b/pkg/kv/txn_interceptor_pipeliner.go
index 56309fb01bc..97d0805496a 100644
--- a/pkg/kv/txn_interceptor_pipeliner.go
+++ b/pkg/kv/txn_interceptor_pipeliner.go
@@ -33,6 +33,11 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
+var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting(
+ "kv.transaction.write_pipelining_max_batch_size",
+ "if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
+ 0,
+)
// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
@@ -173,16 +178,28 @@ func (tp *txnPipeliner) SendLocked(
// the list of writes we proved to exist that are no longer "outstanding" in
// updateOutstandingWrites.
//
-// TODO(nvanbenschoten): the number of writes that we permit to use async
-// consensus in a given Batch should be bound. We'll have to prove each one
-// using a QueryIntent, so there's a point where it makes more sense to just
-// perform consensus for the entire batch synchronously and avoid all of the
-// overhead of pipelining. We might also want to bound the size of the
-// outstandingWrites tree.
+// TODO(nvanbenschoten): Consider placing an upper bound on the size of the
+// outstandingWrites tree. Once this limit is hit, we'll either need to
+// proactively prove outstanding writes or stop pipelining new writes.
func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest {
asyncConsensus := tp.st.Version.IsActive(cluster.VersionAsyncConsensus) &&
pipelinedWritesEnabled.Get(&tp.st.SV)
+ // We provide a setting to bound the number of writes we permit in a batch
+ // that uses async consensus. This is useful because we'll have to prove
+ // each write that uses async consensus using a QueryIntent, so there's a
+ // point where it makes more sense to just perform consensus for the entire
+ // batch synchronously and avoid all of the overhead of pipelining.
+ if maxBatch := pipelinedWritesMaxBatchSize.Get(&tp.st.SV); maxBatch > 0 {
+ batchSize := int64(len(ba.Requests))
+ if _, hasBT := ba.GetArg(roachpb.BeginTransaction); hasBT {
+ batchSize--
+ }
+ if batchSize > maxBatch {
+ asyncConsensus = false
+ }
+ }
+
forked := false
oldReqs := ba.Requests
// TODO(nvanbenschoten): go 1.11 includes an optimization to quickly clear
diff --git a/pkg/kv/txn_interceptor_pipeliner_test.go b/pkg/kv/txn_interceptor_pipeliner_test.go
index 535c9b596ab..e2f98953d86 100644
--- a/pkg/kv/txn_interceptor_pipeliner_test.go
+++ b/pkg/kv/txn_interceptor_pipeliner_test.go
@@ -858,3 +858,82 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Nil(t, pErr)
require.Equal(t, 0, tp.outstandingWritesLen())
}
+
+// TestTxnPipelinerMaxBatchSize tests that batches that contain more requests
+// than allowed by the maxBatchSize setting will not be pipelined.
+func TestTxnPipelinerMaxBatchSize(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+ tp, mockSender := makeMockTxnPipeliner()
+
+ // Set maxBatchSize limit to 1.
+ pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 1)
+
+ txn := makeTxnProto()
+ keyA, keyC := roachpb.Key("a"), roachpb.Key("c")
+
+ // Batch below limit.
+ var ba roachpb.BatchRequest
+ ba.Header = roachpb.Header{Txn: &txn}
+ ba.Add(&roachpb.BeginTransactionRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
+ ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
+
+ mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
+ require.Equal(t, 2, len(ba.Requests))
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &roachpb.BeginTransactionRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
+
+ br := ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ })
+
+ br, pErr := tp.SendLocked(ctx, ba)
+ require.NotNil(t, br)
+ require.Nil(t, pErr)
+ require.Equal(t, 1, tp.outstandingWritesLen())
+
+ // Batch above limit.
+ ba.Requests = nil
+ ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
+ ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})
+
+ mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
+ require.Equal(t, 3, len(ba.Requests))
+ require.False(t, ba.AsyncConsensus)
+ require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
+ require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.NotNil(t, br)
+ require.Nil(t, pErr)
+ require.Equal(t, 0, tp.outstandingWritesLen())
+
+ // Increase maxBatchSize limit to 2.
+ pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 2)
+
+ // Same batch now below limit.
+ mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
+ require.Equal(t, 2, len(ba.Requests))
+ require.True(t, ba.AsyncConsensus)
+ require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
+ require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
+
+ br = ba.CreateReply()
+ br.Txn = ba.Txn
+ return br, nil
+ })
+
+ br, pErr = tp.SendLocked(ctx, ba)
+ require.NotNil(t, br)
+ require.Nil(t, pErr)
+ require.Equal(t, 2, tp.outstandingWritesLen())
+}
diff --git a/pkg/settings/float.go b/pkg/settings/float.go
index f38e7177537..90449379898 100644
--- a/pkg/settings/float.go
+++ b/pkg/settings/float.go
@@ -87,6 +87,16 @@ func RegisterFloatSetting(key, desc string, defaultValue float64) *FloatSetting
return RegisterValidatedFloatSetting(key, desc, defaultValue, nil)
}
+// RegisterNonNegativeFloatSetting defines a new setting with type float.
+func RegisterNonNegativeFloatSetting(key, desc string, defaultValue float64) *FloatSetting {
+ return RegisterValidatedFloatSetting(key, desc, defaultValue, func(v float64) error {
+ if v < 0 {
+ return errors.Errorf("cannot set %s to a negative value: %f", key, v)
+ }
+ return nil
+ })
+}
+
// RegisterValidatedFloatSetting defines a new setting with type float.
func RegisterValidatedFloatSetting(
key, desc string, defaultValue float64, validateFn func(float64) error,
@@ -103,13 +113,3 @@ func RegisterValidatedFloatSetting(
register(key, desc, setting)
return setting
}
-
-// RegisterNonNegativeFloatSetting defines a new setting with type float.
-func RegisterNonNegativeFloatSetting(key, desc string, defaultValue float64) *FloatSetting {
- return RegisterValidatedFloatSetting(key, desc, defaultValue, func(v float64) error {
- if v < 0 {
- return errors.Errorf("cannot set %s to a negative value: %f", key, v)
- }
- return nil
- })
-}
diff --git a/pkg/settings/int.go b/pkg/settings/int.go
index dc6816814c4..18df923222c 100644
--- a/pkg/settings/int.go
+++ b/pkg/settings/int.go
@@ -83,6 +83,16 @@ func RegisterIntSetting(key, desc string, defaultValue int64) *IntSetting {
return RegisterValidatedIntSetting(key, desc, defaultValue, nil)
}
+// RegisterNonNegativeIntSetting defines a new setting with type int.
+func RegisterNonNegativeIntSetting(key, desc string, defaultValue int64) *IntSetting {
+ return RegisterValidatedIntSetting(key, desc, defaultValue, func(v int64) error {
+ if v < 0 {
+ return errors.Errorf("cannot set %s to a negative value: %d", key, v)
+ }
+ return nil
+ })
+}
+
// RegisterValidatedIntSetting defines a new setting with type int with a
// validation function.
func RegisterValidatedIntSetting(