Skip to content

Commit

Permalink
kv: add new kv.transaction.write_pipelining_max_batch_size setting
Browse files Browse the repository at this point in the history
This commit adds a new `kv.transaction.write_pipelining_max_batch_size`
setting which can be used to bound the number of writes we permit in a
batch that uses async consensus. This is useful because we'll have to
resolve  each write that uses async consensus using a QueryIntent, so
there's a point where it makes more sense to just perform consensus for
the entire batch synchronously and avoid all of the overhead of pipelining.

This crossover point is hard to determine experimentally and depends on
a lot of factors. It's better just to provide this as an option.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 13, 2018
1 parent b9b7de1 commit 11ca84e
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>0</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>server.clock.forward_jump_check_enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, forward clock jumps > max_offset/2 will cause a panic.</td></tr>
<tr><td><code>server.clock.persist_upper_bound_interval</code></td><td>duration</td><td><code>0s</code></td><td>the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature.</td></tr>
Expand Down
29 changes: 23 additions & 6 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var pipelinedWritesEnabled = settings.RegisterBoolSetting(
"if enabled, transactional writes are pipelined through Raft consensus",
true,
)
var pipelinedWritesMaxBatchSize = settings.RegisterNonNegativeIntSetting(
"kv.transaction.write_pipelining_max_batch_size",
"if non-zero, defines that maximum size batch that will be pipelined through Raft consensus",
0,
)

// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
Expand Down Expand Up @@ -173,16 +178,28 @@ func (tp *txnPipeliner) SendLocked(
// the list of writes we proved to exist that are no longer "outstanding" in
// updateOutstandingWrites.
//
// TODO(nvanbenschoten): the number of writes that we permit to use async
// consensus in a given Batch should be bound. We'll have to prove each one
// using a QueryIntent, so there's a point where it makes more sense to just
// perform consensus for the entire batch synchronously and avoid all of the
// overhead of pipelining. We might also want to bound the size of the
// outstandingWrites tree.
// TODO(nvanbenschoten): Consider placing an upper bound on the size of the
// outstandingWrites tree. Once this limit is hit, we'll either need to
// proactively prove outstanding writes or stop pipelining new writes.
func (tp *txnPipeliner) chainToOutstandingWrites(ba roachpb.BatchRequest) roachpb.BatchRequest {
asyncConsensus := tp.st.Version.IsActive(cluster.VersionAsyncConsensus) &&
pipelinedWritesEnabled.Get(&tp.st.SV)

// We provide a setting to bound the number of writes we permit in a batch
// that uses async consensus. This is useful because we'll have to prove
// each write that uses async consensus using a QueryIntent, so there's a
// point where it makes more sense to just perform consensus for the entire
// batch synchronously and avoid all of the overhead of pipelining.
if maxBatch := pipelinedWritesMaxBatchSize.Get(&tp.st.SV); maxBatch > 0 {
batchSize := int64(len(ba.Requests))
if _, hasBT := ba.GetArg(roachpb.BeginTransaction); hasBT {
batchSize--
}
if batchSize > maxBatch {
asyncConsensus = false
}
}

forked := false
oldReqs := ba.Requests
// TODO(nvanbenschoten): go 1.11 includes an optimization to quickly clear
Expand Down
79 changes: 79 additions & 0 deletions pkg/kv/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,82 @@ func TestTxnPipelinerEnableDisableMixTxn(t *testing.T) {
require.Nil(t, pErr)
require.Equal(t, 0, tp.outstandingWritesLen())
}

// TestTxnPipelinerMaxBatchSize tests that batches that contain more requests
// than allowed by the maxBatchSize setting will not be pipelined.
func TestTxnPipelinerMaxBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Set maxBatchSize limit to 1.
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 1)

txn := makeTxnProto()
keyA, keyC := roachpb.Key("a"), roachpb.Key("c")

// Batch below limit.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.BeginTransactionRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 2, len(ba.Requests))
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.BeginTransactionRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, 1, tp.outstandingWritesLen())

// Batch above limit.
ba.Requests = nil
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyC}})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 3, len(ba.Requests))
require.False(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.QueryIntentRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[2].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
br.Responses[0].GetInner().(*roachpb.QueryIntentResponse).FoundIntent = true
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, 0, tp.outstandingWritesLen())

// Increase maxBatchSize limit to 2.
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 2)

// Same batch now below limit.
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Equal(t, 2, len(ba.Requests))
require.True(t, ba.AsyncConsensus)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[1].GetInner())

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tp.SendLocked(ctx, ba)
require.NotNil(t, br)
require.Nil(t, pErr)
require.Equal(t, 2, tp.outstandingWritesLen())
}
20 changes: 10 additions & 10 deletions pkg/settings/float.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ func RegisterFloatSetting(key, desc string, defaultValue float64) *FloatSetting
return RegisterValidatedFloatSetting(key, desc, defaultValue, nil)
}

// RegisterNonNegativeFloatSetting defines a new setting with type float.
func RegisterNonNegativeFloatSetting(key, desc string, defaultValue float64) *FloatSetting {
return RegisterValidatedFloatSetting(key, desc, defaultValue, func(v float64) error {
if v < 0 {
return errors.Errorf("cannot set %s to a negative value: %f", key, v)
}
return nil
})
}

// RegisterValidatedFloatSetting defines a new setting with type float.
func RegisterValidatedFloatSetting(
key, desc string, defaultValue float64, validateFn func(float64) error,
Expand All @@ -103,13 +113,3 @@ func RegisterValidatedFloatSetting(
register(key, desc, setting)
return setting
}

// RegisterNonNegativeFloatSetting defines a new setting with type float.
func RegisterNonNegativeFloatSetting(key, desc string, defaultValue float64) *FloatSetting {
return RegisterValidatedFloatSetting(key, desc, defaultValue, func(v float64) error {
if v < 0 {
return errors.Errorf("cannot set %s to a negative value: %f", key, v)
}
return nil
})
}
10 changes: 10 additions & 0 deletions pkg/settings/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func RegisterIntSetting(key, desc string, defaultValue int64) *IntSetting {
return RegisterValidatedIntSetting(key, desc, defaultValue, nil)
}

// RegisterNonNegativeIntSetting defines a new setting with type int.
func RegisterNonNegativeIntSetting(key, desc string, defaultValue int64) *IntSetting {
return RegisterValidatedIntSetting(key, desc, defaultValue, func(v int64) error {
if v < 0 {
return errors.Errorf("cannot set %s to a negative value: %d", key, v)
}
return nil
})
}

// RegisterValidatedIntSetting defines a new setting with type int with a
// validation function.
func RegisterValidatedIntSetting(
Expand Down

0 comments on commit 11ca84e

Please sign in to comment.