-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: pipeline transactional writes at the KV layer #16026
Comments
This is kind of related to the existing parallelism work (cc @nvanbenschoten) and the use of streaming interfaces in the sql layer (#7775, cc @tristan-ohlson) We already parallelize SQL queries that have no results ( |
@nvanbenschoten Can you describe your recent experiments in this area? They seem very promising and something we should investigate doing for 2.1. |
Pipelining transactional writes between SQL statements and batching transactional writes between SQL statements are two alternatives that allow us to achieve a more general goal - lifting consensus out of the synchronous path for SQL writes. To understand why this is important, let's first make the assumption that a write is always more expensive than a read, usually by orders of magnitude. We'll also assume that a SQL gateway is always collocated in the same datacenter as the leaseholder of the data it is trying to access [1]. This supports the first assumption because it means that reads can be served without inter-dc communication, while writes do require inter-dc coordination. The key insight (which is discussed above) is that in order to satisfy the contract for a SQL write (
To convince oneself that this is true, it's useful to distinguish SQL-domain errors from KV-domain errors. A SQL-domain error is one which is expected by SQL when a constraint is violated by an operation. Examples of these are unique constraint violations, referential integrity constraint violations, and check constraint violations. SQL mandates that these are detected and returned to the SQL client when statements are issued. A statement cannot succeed if these constraints are violated. A KV-domain error is one which arrises due to a failure "beneath" SQL and is not expected by SQL. Examples of these errors are network failures that prevent writes from succeeding, transaction retry errors, and disk corruption errors. This class of error must prevent a transaction from committing if they prevent all transaction effects from going into effect, but they are not bound to a specific SQL statement. In fact, as long as neither of the requiremnts listed above are broken, they can always be delayed until the end of a transaction and returned as the result of a So, with this insight in mind, we can begin rethinking how a SQL-level write is processed in Cockroach. Instead of a SQL-level write resulting in a synchronous KV-level write, we can instead break it up into a synchronous "dry-run" KV-read operation and an asynchronous KV-write operation. The read operation will perform all constraint validation and determine what the effect of the write will be but importatly will not actually perform the KV-level write. The KV-write will then be performed at some later time, with the only constraint that it happens before the transaction is committed. This restructuring is a huge improvement for transactional latency because it means that each SQL-write operation will only need to wait for a KV-read instead of a KV-write, which we assumed above will be much faster. The individual latencies required for each KV-write can then be combined into the latency of only a single consensus write, either by performing the writes concurrenctly or by batching them together [2]. The main complication of this restructuring is that we need to enforce the second requirement for transactional writes - that all future operations observe their effect. This is where the main difference between the pipelining proposal and the deferred batch proposal comes into effect. In the pipelining proposal, the dry-run KV-read and the KV-write are sent out at the same time by a transaction coordinator. The coordinator waits for the KV-read to return before returning to the SQL client but does not wait for the KV-write. Future reads or writes will see the effect of the first KV-write because the proposal makes the assumption that the KV protocol is streaming and that all KV-steams are strongly ordered such that any request necessarily observes the full result of all previous requests sent on that stream. With this property, it follows that any future read or write in the transaction will be ordered behind the asynchronous write and therefore observe its effect. However, it remains unclear how difficult it would be to introduce the properties necessary for this approach into the KV protocol. This also means that reading immediately after a write forces the next read to wait for consensus of the previous KV-write, which undermines the benefit to some degree. In the deferred write batch proposal, the dry-run KV-read is sent out immediately by the transaction coordinator but the KV-write is not. Instead, the KV-write is deferred until strictly necessary. "strictly necessary" can mean different things. In the current prototype of this proposal, a write is deferred until a future read or write overlaps the same keyspace or until a commit is issued, at which point the deferred write is added to the front of the new batch and flushed from the deferred write set. This could be improved. For instance, it should be possible for future reads (or "dry-run" reads) to read directly from the deferred write set without flushing it. This will transparently improve foreign key validation and help with #15157 because it means that for a transaction that writes into two tables who have an FK constraint, the FK validation for the second write could be fielded directly from the deferred write batch. Another benefit of this change is that it naturally creates larger write batches, which reduces the amount of network traffic and disk writes. It also allows us to hit the 1PC fast path more often. cc. @tschottdorf @andy-kimball @spencerkimball [1] these assumptions do not always hold, but because they're so important for high performance in CockroachDB, it's appropriate to optimize for them. [2] at the moment, our transactional model does not allow transaction commits to be in-flight concurrently with other writes for the same transaction on other ranges. This means that transactions that span ranges must always perform at least two serialized consensus writes. This restriction could be lifted, but that is an orthogonal concern. |
Excellent write-up! I think this can be a big performance win for 2.1.
Most SQL write operations are conditional puts which are implemented internally as a read-and-compare followed by a write. Separating the read and write phases of the transaction makes a lot of sense for multi-statement transactions, but that extra round-trip will be a performance hit for simple transactions that insert using a single statement. I haven't looked at your prototype, so perhaps you're already taking this into consideration. |
I'd add the mild complications around the parallel DistSQL machinery, we need to figure out the key spans touched by a DistSQL plan and then flush the overlapping key ranges for in-flight writes (or send the writes along with DistSQL, but then the act of waiting for them becomes an awkward poll, so it's probably not worth even considering). Also (and this is probably implicit anyway) we don't have to defer the writes all the way to the end, we can also send batches out as we see fit. For example, a transaction that performs lots of writes would have chunks flushed out by the We can also satisfy reads from the in-flight write set in some situations, though I'm not sure it's useful enough to introduce the complexity for.
We should play through this with some examples, but if the commit/release is passed in a batch, it would make sense to disable dry running for it. Hopefully that covers most of these cases (and if it doesn't, it seems that SQL should produce better batches). |
KV writes have multiple internal phases: they are first evaluated on the leaseholder, then submitted to raft and applied on all replicas. We currently send the response to the gateway after the command has been applied, but the response is actually fully determined after the evaluation phase (modulo re-evaluations and re-proposals). We could add a KV option to return the response as soon as evaluation completes, along with a token that can be used to check later (just before the commit, analogous with RefreshSpans) whether the request eventually applied. (Instead of this token, a streaming response protocol would allow us to do this as two responses to the same request). |
This has the nice advantage that the txn reads its own writes for free, but the disadvantage that a failed write isn't noticed until the end of the txn which might result in confusing error messages. |
The prototype doesn't separate the read and the write phase of a kv-write if a later operation in the same batch depends on the write. So for simple transactions that send a commit in the same batch as a write, the write will not be split and we will still achieve a 1PC txn. For multi-statement txns, the read phase and the write phase will be split up, which can actually help us achieve a 1PC txn in more cases. That said, there still are some workloads that would perform relatively poorly with this change. For instance, a series of alternating writes and reads that overlap would result in each write being immediately flushed, making the read-phase of a kv-write essentially useless added latency. The optimization to allow for reading directly from the deferred write batch would be a big help here as it would mean that the deferred writes don't all need to be flushed immediately after the next read comes in.
The prototype takes the approach of just flushing the entire deferred write batch before allowing DistSQL reads. At a minimum, we should be smarter about only flushing exactly what the DistSQL flow plans on reading. I'm not sure if we know this all beforehand on the SQL gateway, but I expect we have a pretty good idea.
Yes, this would be a very important optimization for two reasons:
Both of these reasons are very important when it comes to tables with FK relations to one another. Writing to a parent table and a child table that references the parent table in the same txn is very common. It would be a shame if FK validation when writing to the second table forced us to flush the deferred write batch. On the other hand, if we could read from our own deferred write batch then we could actually improve FK validation in this situation by avoiding communication with the leaseholder altogether!
Exactly, this is how the prototype works.
This is a good alternative proposal and one I've also been thinking about a lot. I think the biggest benefit of this is that it wouldn't cause us to delay the writing of intents until late in the transaction. As is, the deferred write proposal has the potential to dramatically alter contended workload behavior because transactions won't begin writing intents until very late. In practice, I'm not actually sure how this would play out. On one hand, waiting until late in the transaction to lie down intents could essentially undermine the Another benefit to this alternative is that it pipelines the execution of the write consensus with the rest of the txn instead of deferring that cost until the end. This means that it should never hurt performance compared to what we see today. The flipside is that it pays for this by giving up any potential batching effect that the first proposal could achieve, which had the potential of reducing disk writes, reducing Raft overhead, and increasing 1PC txn opportunities. I would disagree with the idea that |
I've been thinking more about @bdarnell's alternative because there is a lot I like about it. My one hesitation is the idea of a token and all the extra state on a replica that it would require us to hold. A recent discussion with @tschottdorf about idempotency and intent resolution got me thinking in this area and I now don't think we need any of this extra state at all. Any future request that needs to check for the success of a proposal only needs to check that an intent exists with an expected sequence number at an expected key for it to know if the request succeeded or not. With this in mind, we could create a However, unlike the current proposal, a future request in the same txn that overlaps the
The only case where this logic doesn't hold up is Just like with the original prototype here, we would want to be able to read from the The major roadblock I see here is re-evaluations. It's my understanding that these take the request out of the CommandQueue, which would be an issue because it would mean that |
That's a good idea! How does this work with DistSQL? Waiting to flush as before, or sending the tree along so it can inject the right
You know this, but just to clarify the writeup: once it's in the tree, you can handle the next client command. Btw, are there case in which you wouldn't even have to wait for the evaluation to conclude? I.e. a kind of Perhaps instead of injecting
👍 👍 👍 let's put in the elbow grease to make this all transparent and architecturally sound. I like how well confined this is so far, btw!
There's actually an interesting observation here. In #23942, you point out that we sometimes propose no-op writes to Raft. That's just saying that we have commands that are often writes but sometimes act as reads, and yet we send them through Raft! Thinking this through, we should understand a read as a no-op write. If we get the overhead for that down enough, we have basically succeeded in removing the read/write distinction at the replica level (and think of all the flags we could delete 😉). Besides, conceptually this fits in well with predicate push down and transformations (aggregations etc) which really will do more than just "read" -- they'll transform the data.
#24194 🔍 |
With a streaming KV API, we could use the stream itself in place of an explicit token. There wouldn't be any more state here than in the current implementation that holds the KV request open across the entire commit+apply time. We might still need the token (or the suggested
We'd still have some concept of read-only vs read/write at the replica level (it's important for the command queue), but I think you're right that we could get rid of the per-method flags and use the "write" path for everything, and bail out if it turns out to be a no-op. |
Sending the tree along seems possible, although to start it's probably easiest to just flush before launching distributed flows. In this new proposal, "flush" means that we resolve all outstanding writes by sending out a batch consisting of a
If we had strongly ordered streams between gateways and ranges then probably, but I think the idea is to get away from
The reason why I'm convinced that a discrete RPC is the best approach is that there are cases when we'll want to perform this It's also convenient that the new RPC can be shared between this proposal and #24194.
Conceptually I think you're right, but there are practical differences like how the commands are treated in the CommandQueue and the kind of
I agree with everything you're saying here, but I don't want this idea to broaden the scope of this proposal to the point where it becomes predicated on switching to a streaming KV API. That said, I do think it's interesting to consider how the two proposals interact and what we can build now that can later be improved by a streaming KV API if we ever more in that direction. Most of the structures proposed in my most recent comment would be necessary regardless of the KV API. The only difference would be "proposal validation". As you pointed out, a streaming KV API could allow us to validate proposal completion without any necessary PRCs. Meanwhile, without this streaming KV API we'll need to introduce a
I'd like to make it explicit how these two proposals fit together with one another. The interesting part where they interact is when a transaction has outstanding writes at the point of a commit. In that case, we need to make sure that the writes succeed before committing the transaction. However, just like with writes in the same batch as an EndTransactionRequest, we don't want to have to run this step before sending off the EndTransactionRequest. Using the same trick as we use in #24194, this should be possible! To achieve this, we'll need to treat the outstanding writes the same as we do with parallel writes by adding them to the "promised write set". We'll then need to add a |
This doesn't completely eliminate the usefulness of RETURNING NOTHING. An INSERT today requires a round trip to the leaseholder(s) (plural if there are secondary indexes) followed by a round of consensus. INSERT RETURNING NOTHING defers/parallelizes both of those steps. With pipelining post-evaluation, the consensus is parallelized but you still have to talk to the leaseholder(s) to discover whether there is a conflict. Partitioning and follow the workload can help increase the odds that you have a local leaseholder, but sometimes you'll still have to go remote and RETURNING NOTHING can still help in those cases. (For example, consider a partitioned users table with a global index on the email column. The global index will likely have a remote leaseholder). Maybe it's not worth maintaining at that point given its low adoption and reduced impact, but it's not completely unnecessary.
The command queue doesn't know about the read or write flags on the RPC methods. It only knows about the declared key accesses (write commands are read-only on many of the keys they touch). OTOH, the engine.Batch issue is a tricker one to resolve, especially if we start using rocksdb snapshots instead of raw engine access for reads. (This would allow us to remove the read from the command queue and update the tscache as soon as we've grabbed the snapshot instead of waiting for evaluation to complete).
Agreed. |
I think we can avoid exposing the |
This change introduces a new request method called QueryIntent. The request type draws parallels to the QueryTxn method, but it visits an intent instead of a transaction record and returns whether the intent exists or not. The request type also includes an "if missing" behavior, which allows clients to optionally ensure that the request prevents a missing intent from ever being written or return an error if the intent is found to be missing. This request type was proposed/discussed in both cockroachdb#24194 and cockroachdb#16026. It is a prerequisite for either proposal. Release note: None
26335: roachpb: introduce QueryIntent request r=bdarnell a=nvanbenschoten This change introduces a new request method called QueryIntent. The request type draws parallels to the QueryTxn method, but it visits an intent instead of a transaction record and returns whether the intent exists or not. The request type also includes an "if missing" behavior, which allows clients to optionally ensure that the request prevents a missing intent from ever being written or return an error if the intent is found to be missing. This request type was proposed/discussed in both #24194 and #16026. It is a prerequisite for either proposal. Release note: None Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
This change pipelines transactional writes using the approach presented in cockroachdb#16026 (comment). The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. txnPipeliner is a txnInterceptor that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all writes that have been proposed but not proven to have succeeded are first checked before committing. These async writes are referred to as "outstanding writes" and this process of proving that an outstanding write succeeded is called "resolving" the write. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show a significant improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Release note: None
26599: kv: pipeline transactional writes r=nvanbenschoten a=nvanbenschoten This change pipelines transactional writes using the approach presented in #16026 (comment). ## Approach The change introduces transactional pipelining by creating a new `txnReqInterceptor` that hooks into the `TxnCoordSender` called `txnPipeliner`. `txnPipeliner` is a `txnReqInterceptor` that pipelines transactional writes by using asynchronous consensus. The interceptor then tracks all writes that have been asynchronously proposed through Raft and ensures that all interfering requests chain on to them by first proving that the async writes succeeded. The interceptor also ensures that when committing a transaction all outstanding writes that have been proposed but not proven to have succeeded are first checked before committing. Chaining on to in-flight async writes is important for two main reasons to txnPipeliner: 1. requests proposed to Raft will not necessarily succeed. For any number of reasons, the request may make it through Raft and be discarded or fail to ever even be replicated. A transaction must check that all async writes succeeded before committing. However, when these proposals do fail, their errors aren't particularly interesting to a transaction. This is because these errors are not deterministic Transaction-domain errors that a transaction must adhere to for correctness such as conditional-put errors or other symptoms of constraint violations. These kinds of errors are all discovered during write *evaluation*, which an async write will perform synchronously before consensus. Any error during consensus is outside of the Transaction-domain and can always trigger a transaction retry. 2. transport layers beneath the txnPipeliner do not provide strong enough ordering guarantees between concurrent requests in the same transaction to avoid needing explicit chaining. For instance, DistSender uses unary gRPC requests instead of gRPC streams, so it can't natively expose strong ordering guarantees. Perhaps more importantly, even when a command has entered the command queue and evaluated on a Replica, it is not guaranteed to be applied before interfering commands. This is because the command may be retried outside of the serialization of the command queue for any number of reasons, such as leaseholder changes. When the command re-enters the command queue, it's possible that interfering commands may jump ahead of it. To combat this, the txnPipeliner uses chaining to throw an error when these re-orderings would have affected the order that transactional requests evaluate in. ## Testing/Benchmarking This change will require a lot of testing (warning: very little unit testing is included so far) and benchmarking. The first batch of benchmarking has been very promising in terms of reducing transactional latency, but it hasn't shown much effect on transactional throughput. This is somewhat expected, as the approach to transactional pipelining is hits a tradeoff between a transaction performing slightly more work (in parallel) while also having a significantly smaller contention footprint. The first sanity check benchmark was to run the change against a geo-distributed cluster running TPCC. A node was located in each of us-east1-b, us-west1-b, europe-west2-b. The load generator was located in us-east1-b and all leases were moved to this zone as well. The test first limited all operations to newOrders, but soon expanded to full TPCC after seeing desirable results. #### Without txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 198.3 154.2% 916.5 486.5 2818.6 4160.7 5637.1 5905.6 ``` #### With txn pipelining ``` _elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms) 180.0s 200.0 155.5% 388.2 184.5 1208.0 1946.2 2684.4 2818.6 ``` One big caveat is that all foreign key checks needed to be disabled for this experiment to show much of an improvement. The reason for this is subtle. TPCC's schema is structured as a large hierarchy of tables, and most of its transactions insert down a lineage of tables. With FKs enabled, we see a query to a table immediately after it is inserted into when its child table is inserted into next. This effectively causes a pipeline stall immediately after each write, which eliminated a lot of the benefit that we expect to see here. Luckily, this is a problem that can be avoided by being a little smarter about foreign keys at the SQL-level. We should not be performing these scans over all possible column families for a row just to check for existence (resulting kv count > 0) if we know that we just inserted into that row. We need some kind of row-level existence cache in SQL to avoid these scans. This will be generally useful, but will be especially useful to avoid these pipeline stalls. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Consider an application which executes the following statements one-by-one (yes, this could be done more efficiently, but an ORM is likely to produce SQL like this):
This transaction is moving $1 from account
11
to account10
. Our current SQL implementation imposes unnecessary latency on theUPDATE
operations. Internally, eachUPDATE
is a select for the matching rows followed by a consistent (replicated) write of the new balance. After the write completes we return the number of rows updated. We suffer the latency of the consistent write even though lower layers of the system will serialize access to a particular key. Translating into KV operations this looks like:Notice that we know the number of rows that will be updated after the Scan operation completes. We can return to the client at that point and asynchronously send the Put. When the next statement arrives, we don't have to wait for any pending mutations as the KV layer will serialize the operations [*] for a particular key. We would have to wait for the outstanding mutations to complete before performing the commit. The above would become:
There is a similar win to be had for
DELETE
operations which involve a Scan followed by a DelRange.INSERT
is somewhat more complicated as it is translated into aConditionalPut
operation which is internally a read (on the leader) followed by a write. We could potentially return from theConditionalPut
operation as soon as the write is proposed, but we'd have to leave the operation in the CommandQueue until the write is applied. There are likely lots of dragons here, though perhaps the approach of allowing a transactional write operation to return as soon as it is proposed could handle all of the cases here. TheTxnCoordSender
would then have to have a facility for waiting for the transactional write to be applied before allowing the transaction to be committed.[*] While a replica will serialize operations for a particular key, multiple operations sent via separate
DistSenders
will not. We'd need to make sure that the operations sent for a transaction are somehow pipelined within theDistSender/TxnCoordSender
. We wouldn't want a Scan operation to get reordered in front of a Put operation.Cc @tschottdorf, @bdarnell
The text was updated successfully, but these errors were encountered: