Skip to content

Commit

Permalink
kv: pipeline transactional writes
Browse files Browse the repository at this point in the history
This change pipelines transactional writes using the approach presented
in #16026 (comment).

The change introduces transactional pipelining by creating a new
`txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`.
txnPipeliner is a txnInterceptor that pipelines transactional writes by using
asynchronous consensus. The interceptor then tracks all writes that have been
asynchronously proposed through Raft and ensures that all interfering
requests chain on to them by first proving that the async writes succeeded.
The interceptor also ensures that when committing a transaction all writes
that have been proposed but not proven to have succeeded are first checked
before committing. These async writes are referred to as "outstanding writes"
and this process of proving that an outstanding write succeeded is called
"resolving" the write.

Chaining on to in-flight async writes is important for two main reasons to
txnPipeliner:
1. requests proposed to Raft will not necessarily succeed. For any number of
   reasons, the request may make it through Raft and be discarded or fail to
   ever even be replicated. A transaction must check that all async writes
   succeeded before committing. However, when these proposals do fail, their
   errors aren't particularly interesting to a transaction. This is because
   these errors are not deterministic Transaction-domain errors that a
   transaction must adhere to for correctness such as conditional-put errors or
   other symptoms of constraint violations. These kinds of errors are all
   discovered during write *evaluation*, which an async write will perform
   synchronously before consensus. Any error during consensus is outside of the
   Transaction-domain and can always trigger a transaction retry.
2. transport layers beneath the txnPipeliner do not provide strong enough
   ordering guarantees between concurrent requests in the same transaction to
   avoid needing explicit chaining. For instance, DistSender uses unary gRPC
   requests instead of gRPC streams, so it can't natively expose strong ordering
   guarantees. Perhaps more importantly, even when a command has entered the
   command queue and evaluated on a Replica, it is not guaranteed to be applied
   before interfering commands. This is because the command may be retried
   outside of the serialization of the command queue for any number of reasons,
   such as leaseholder changes. When the command re-enters the command queue,
   it's possible that interfering commands may jump ahead of it. To combat
   this, the txnPipeliner uses chaining to throw an error when these
   re-orderings would have affected the order that transactional requests
   evaluate in.

The first sanity check benchmark was to run the change against a geo-distributed
cluster running TPCC. A node was located in each of us-east1-b, us-west1-b,
europe-west2-b. The load generator was located in us-east1-b and all leases were
moved to this zone as well. The test first limited all operations to
newOrders, but soon expanded to full TPCC after seeing desirable results.

Without txn pipelining
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  180.0s      198.3 154.2%    916.5    486.5   2818.6   4160.7   5637.1   5905.6
```

With txn pipelining
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  180.0s      200.0 155.5%    388.2    184.5   1208.0   1946.2   2684.4   2818.6
```

One big caveat is that all foreign key checks needed to be disabled for this
experiment to show a significant improvement. The reason for this is subtle.
TPCC's schema is structured as a large hierarchy of tables, and most of its
transactions insert down a lineage of tables. With FKs enabled, we see a query
to a table immediately after it is inserted into when its child table is
inserted into next. This effectively causes a pipeline stall immediately after
each write, which eliminated a lot of the benefit that we expect to see here.
Luckily, this is a problem that can be avoided by being a little smarter about
foreign keys at the SQL-level. We should not be performing these scans over all
possible column families for a row just to check for existence (resulting kv
count > 0) if we know that we just inserted into that row. We need some kind of
row-level existence cache in SQL to avoid these scans. This will be generally
useful, but will be especially useful to avoid these pipeline stalls.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 13, 2018
1 parent a853c0c commit b9b7de1
Show file tree
Hide file tree
Showing 17 changed files with 2,451 additions and 311 deletions.
284 changes: 280 additions & 4 deletions c-deps/libroach/protos/roachpb/data.pb.cc

Large diffs are not rendered by default.

240 changes: 236 additions & 4 deletions c-deps/libroach/protos/roachpb/data.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>server.clock.forward_jump_check_enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, forward clock jumps > max_offset/2 will cause a panic.</td></tr>
<tr><td><code>server.clock.persist_upper_bound_interval</code></td><td>duration</td><td><code>0s</code></td><td>the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature.</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (txn *Txn) GetStrippedTxnCoordMeta() roachpb.TxnCoordMeta {
meta.RefreshReads = nil
meta.RefreshWrites = nil
case LeafTxn:
// Nothing yet.
meta.OutstandingWrites = nil
}
return meta
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (ds *DistSender) initAndVerifyBatch(
return roachpb.NewErrorf("batch with limit contains both forward and reverse scans")
}

case *roachpb.ResolveIntentRangeRequest:
case *roachpb.QueryIntentRequest, *roachpb.ResolveIntentRangeRequest:
continue

case *roachpb.BeginTransactionRequest, *roachpb.EndTransactionRequest, *roachpb.ReverseScanRequest:
Expand Down Expand Up @@ -633,11 +633,8 @@ func (ds *DistSender) Send(
panic("EndTransaction not in last chunk of batch")
}
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
if len(parts) != 2 {
panic("split of final EndTransaction chunk resulted in != 2 parts")
}
// Restart transaction of the last chunk as two parts
// with EndTransaction in the second part.
// Restart transaction of the last chunk as multiple parts
// with EndTransaction in the last part.
continue
}
if pErr != nil {
Expand Down Expand Up @@ -741,7 +738,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If we're in the middle of a panic, don't wait on responseChs.
panic(r)
}
var hadSuccess bool
var hadSuccessWriting bool
for _, responseCh := range responseChs {
resp := <-responseCh
if resp.pErr != nil {
Expand All @@ -750,7 +747,15 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}
continue
}
hadSuccess = true
if !hadSuccessWriting {
for _, i := range resp.positions {
req := ba.Requests[i].GetInner()
if !roachpb.IsReadOnly(req) {
hadSuccessWriting = true
break
}
}
}

// Combine the new response with the existing one (including updating
// the headers).
Expand All @@ -775,7 +780,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If this is a write batch with any successful responses, but
// we're ultimately returning an error, wrap the error with a
// MixedSuccessError.
if hadSuccess && ba.IsWrite() {
if hadSuccessWriting {
pErr = roachpb.NewError(&roachpb.MixedSuccessError{Wrapped: pErr})
}
} else if couldHaveSkippedResponses {
Expand Down
Loading

0 comments on commit b9b7de1

Please sign in to comment.