Skip to content

Commit

Permalink
Merge #26599
Browse files Browse the repository at this point in the history
26599: kv: pipeline transactional writes r=nvanbenschoten a=nvanbenschoten

This change pipelines transactional writes using the approach presented
in #16026 (comment).

## Approach

The change introduces transactional pipelining by creating a new
`txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`.
`txnPipeliner` is a `txnReqInterceptor` that pipelines transactional writes by
using asynchronous consensus. The interceptor then tracks all writes that
have been asynchronously proposed through Raft and ensures that all
interfering requests chain on to them by first proving that the async writes
succeeded. The interceptor also ensures that when committing a transaction
all outstanding writes that have been proposed but not proven to have
succeeded are first checked before committing.

Chaining on to in-flight async writes is important for two main reasons to
txnPipeliner:
1. requests proposed to Raft will not necessarily succeed. For any number of
   reasons, the request may make it through Raft and be discarded or fail to
   ever even be replicated. A transaction must check that all async writes
   succeeded before committing. However, when these proposals do fail, their
   errors aren't particularly interesting to a transaction. This is because
   these errors are not deterministic Transaction-domain errors that a
   transaction must adhere to for correctness such as conditional-put errors or
   other symptoms of constraint violations. These kinds of errors are all
   discovered during write *evaluation*, which an async write will perform
   synchronously before consensus. Any error during consensus is outside of the
   Transaction-domain and can always trigger a transaction retry.
2. transport layers beneath the txnPipeliner do not provide strong enough
   ordering guarantees between concurrent requests in the same transaction to
   avoid needing explicit chaining. For instance, DistSender uses unary gRPC
   requests instead of gRPC streams, so it can't natively expose strong ordering
   guarantees. Perhaps more importantly, even when a command has entered the
   command queue and evaluated on a Replica, it is not guaranteed to be applied
   before interfering commands. This is because the command may be retried
   outside of the serialization of the command queue for any number of reasons,
   such as leaseholder changes. When the command re-enters the command queue,
   it's possible that interfering commands may jump ahead of it. To combat
   this, the txnPipeliner uses chaining to throw an error when these
   re-orderings would have affected the order that transactional requests
   evaluate in.

## Testing/Benchmarking

This change will require a lot of testing (warning: very little unit testing is
included so far) and benchmarking. The first batch of benchmarking has been very
promising in terms of reducing transactional latency, but it hasn't shown much
effect on transactional throughput. This is somewhat expected, as the approach
to transactional pipelining is hits a tradeoff between a transaction performing
slightly more work (in parallel) while also having a significantly smaller
contention footprint.

The first sanity check benchmark was to run the change against a geo-distributed
cluster running TPCC. A node was located in each of us-east1-b, us-west1-b,
europe-west2-b. The load generator was located in us-east1-b and all leases were
moved to this zone as well. The test first limited all operations to
newOrders, but soon expanded to full TPCC after seeing desirable results.

#### Without txn pipelining
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  180.0s      198.3 154.2%    916.5    486.5   2818.6   4160.7   5637.1   5905.6
```

#### With txn pipelining
```
_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  180.0s      200.0 155.5%    388.2    184.5   1208.0   1946.2   2684.4   2818.6
```

One big caveat is that all foreign key checks needed to be disabled for this
experiment to show much of an improvement. The reason for this is subtle. TPCC's
schema is structured as a large hierarchy of tables, and most of its
transactions insert down a lineage of tables. With FKs enabled, we see a query
to a table immediately after it is inserted into when its child table is
inserted into next. This effectively causes a pipeline stall immediately after
each write, which eliminated a lot of the benefit that we expect to see here.
Luckily, this is a problem that can be avoided by being a little smarter about
foreign keys at the SQL-level. We should not be performing these scans over all
possible column families for a row just to check for existence (resulting kv
count > 0) if we know that we just inserted into that row. We need some kind of
row-level existence cache in SQL to avoid these scans. This will be generally
useful, but will be especially useful to avoid these pipeline stalls.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jul 13, 2018
2 parents dc9b9d3 + 11ca84e commit c93feab
Show file tree
Hide file tree
Showing 27 changed files with 3,105 additions and 680 deletions.
284 changes: 280 additions & 4 deletions c-deps/libroach/protos/roachpb/data.pb.cc

Large diffs are not rendered by default.

240 changes: 236 additions & 4 deletions c-deps/libroach/protos/roachpb/data.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track write intents in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>256000</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>0</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>server.clock.forward_jump_check_enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, forward clock jumps > max_offset/2 will cause a panic.</td></tr>
<tr><td><code>server.clock.persist_upper_bound_interval</code></td><td>duration</td><td><code>0s</code></td><td>the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature.</td></tr>
Expand Down Expand Up @@ -66,6 +68,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-8</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-9</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ func (txn *Txn) GetStrippedTxnCoordMeta() roachpb.TxnCoordMeta {
meta.RefreshReads = nil
meta.RefreshWrites = nil
case LeafTxn:
// Nothing yet.
meta.OutstandingWrites = nil
}
return meta
}
Expand Down
Loading

0 comments on commit c93feab

Please sign in to comment.