Skip to content

Commit

Permalink
admission,kvserver: improved byte token estimation for writes
Browse files Browse the repository at this point in the history
The existing scheme for byte token estimation simply looked
at the total bytes added to L0 and divided it among the number
of requests. This was because (a) the parameters to provide
better size information for the request were not populated by
kvserver, (b) the basic estimation approach was flawed since
it assumed that regular writes would be roughly equal sized,
and assumed that ingests would tell what fraction went into L0.

The kvserver-side plumbing for improving (a) were done in a
preceding PR. This one completes that plumbing to pass on
admission.StoreWorkDoneInfo to the admission control package.
In this scheme the {WriteBytes,IngestedBytes} are provided
post-proposal evaluation, and the IngestedBytes is for the
whole LSM. This PR makes changes to the plumbing in the
admission package: specifically, the post-work-done token
adjustments are performed via the granterWithStoreWriteDone
interface and the addition to granterWithIOTokens. The former
also returns the token adjustments to StoreWorkQueue so
that the per-tenant fairness accounting in WorkQueue can be
updated.

The main changes in this PR are in the byte token
estimation logic in the admission package, where the
 estimation now uses a linear model a.x + b, where
x is the bytes provided in admission.StoreWorkDoneInfo.
If we consider regular writes, one can expect that even
with many different sized workloads concurrently being
active on a node, we should be able to fit a model where
a is roughly 2 and b is tiny -- this is because x is the
bytes written to the raft log and does not include the
subsequent state machine application. Similarly, one can
imagine being somewhere in the interval [0,1] for ingested
work. The linear model is meant to fix flaw (b) mentioned
earlier. The current linear model fitting in
store_token_estimation.go is very simple and can be
independently improved in the future -- there are code
comments outlining this. Additionally, all the byte token
estimation logic in granter.go has been removed, which
is better from a code readability perspective.

This change was evaluated with a single node that first
saw a kv0 workload that writes 64KB blocks, then
additionally a kv0 workload that writes 4KB blocks, and
finally a third workload that starts doing an index
backfill due to creating an index on the v column in
the kv table.

Here are snippets from a sequence of log statements when
only the first workload (64KB writes) was running:
```
write-model 1.46x+1 B (smoothed 1.50x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 78 KiB
write-model 1.37x+1 B (smoothed 1.36x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 80 KiB
write-model 1.50x+1 B (smoothed 1.43x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 79 KiB
write-model 1.39x+1 B (smoothed 1.30x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 77 KiB
```
Note that the parameter a, in a.x does fluctuate. The
additive value b stays at the minimum of 1 bytes, which
is desirable. There is no change to the starting ingest
model since there are no ingestions.

After both the 4KB and 64KB writes are active the log
statements look like:
```
write-model 1.85x+1 B (smoothed 1.78x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 59 KiB
write-model 1.23x+1 B (smoothed 1.51x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 47 KiB
write-model 1.21x+1 B (smoothed 1.36x+1 B) + ingested-model 0.00x+0 B (smoothed 0.75x+1 B) + at-admission-tokens 40 KiB
```
Note that the b value stays at 1 byte. The tokens consumed
at admission time are evenly divided among requests, so
the value has dropped.

When the index backfill is also running, the sstables are
ingested into L5 and L6, so the x value in the ingested
model is high, but what is ingested into L0 is low, which
means a becomes very small for the ingested-model -- see
the smoothed 0.00x+1 B below. There is choppiness in this
experiment wrt the write model and the at-admission-tokens,
which is caused by a high number of write stalls. This
was not planned for, and is a side-effect of huge Pebble
manifests caused by 64KB keys. So ignore those values in
the following log statements.
```
write-model 1.93x+1 B (smoothed 1.56x+2 B) + ingested-model 0.00x+1 B (smoothed 0.00x+1 B) + at-admission-tokens 120 KiB
write-model 2.34x+1 B (smoothed 1.95x+1 B) + ingested-model 0.00x+1 B (smoothed 0.00x+1 B) + at-admission-tokens 157 KiB
```

Fixes #79092
Informs #82536

Release note: None
  • Loading branch information
sumeerbhola committed Jul 26, 2022
1 parent 48ffa80 commit 5d1fa1e
Show file tree
Hide file tree
Showing 17 changed files with 1,117 additions and 562 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,12 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
return err
}
}
_, pErr := r.repl.Send(ctx, ba)
_, writeBytes, pErr := r.repl.SendWithWriteBytes(ctx, ba)
if r.admissionController != nil {
r.admissionController.AdmittedKVWorkDone(admissionHandle)
r.admissionController.AdmittedKVWorkDone(admissionHandle, writeBytes)
if writeBytes != nil {
writeBytes.Release()
}
}
if pErr != nil {
log.VErrEventf(ctx, 2, "%v", pErr.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Replica) evalAndPropose(
writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data))
}
if proposal.command.ReplicatedEvalResult.AddSSTable != nil {
writeBytes.SSTableBytes = int64(len(proposal.command.ReplicatedEvalResult.AddSSTable.Data))
writeBytes.IngestedBytes = int64(len(proposal.command.ReplicatedEvalResult.AddSSTable.Data))
}
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3724,7 +3724,7 @@ type KVAdmissionController interface {
) (handle interface{}, err error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(handle interface{})
AdmittedKVWorkDone(handle interface{}, writeBytes *StoreWriteBytes)
// SetTenantWeightProvider is used to set the provider that will be
// periodically polled for weights. The stopper should be used to terminate
// the periodic polling.
Expand Down Expand Up @@ -3823,7 +3823,6 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
}
admissionEnabled := true
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb WriteBytes for ingest requests.
ah.storeWorkHandle, err = ah.storeAdmissionQ.Admit(
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
if err != nil {
Expand All @@ -3835,11 +3834,16 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
// kvAdmissionQ.Admit, and so callAdmittedWorkDoneOnKVAdmissionQ will
// stay false.
ah.storeAdmissionQ = nil
admissionEnabled = false
}
}
if admissionEnabled {
ah.callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
if ah.storeAdmissionQ != nil {
// No bytes were written.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{})
}
return admissionHandle{}, err
}
}
Expand All @@ -3848,14 +3852,23 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
}

// AdmittedKVWorkDone implements the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) AdmittedKVWorkDone(handle interface{}) {
func (n KVAdmissionControllerImpl) AdmittedKVWorkDone(
handle interface{}, writeBytes *StoreWriteBytes,
) {
ah := handle.(admissionHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb ingestedIntoL0Bytes and handle error return value.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, 0)
var doneInfo admission.StoreWorkDoneInfo
if writeBytes != nil {
doneInfo = admission.StoreWorkDoneInfo(*writeBytes)
}
err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo)
if err != nil {
// This shouldn't be happening, so log.
log.Errorf(context.Background(), "%s", err)
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ var storeWriteBytesPool = sync.Pool{
}

func newStoreWriteBytes() *StoreWriteBytes {
return storeWriteBytesPool.Get().(*StoreWriteBytes)
wb := storeWriteBytesPool.Get().(*StoreWriteBytes)
*wb = StoreWriteBytes{}
return wb
}

// Release returns the *StoreWriteBytes to the pool.
Expand Down
13 changes: 7 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,17 +1008,18 @@ func (n *Node) batchInternal(

tStart := timeutil.Now()
handle, err := n.admissionController.AdmitKVWork(ctx, tenID, args)
defer n.admissionController.AdmittedKVWorkDone(handle)
if err != nil {
return nil, err
}
var pErr *roachpb.Error
// TODO(sumeer): plumb *StoreWriteBytes to admission control.
var writeBytes *kvserver.StoreWriteBytes
defer func() {
n.admissionController.AdmittedKVWorkDone(handle, writeBytes)
if writeBytes != nil {
writeBytes.Release()
}
}()
var pErr *roachpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, *args)
if writeBytes != nil {
writeBytes.Release()
}
if pErr != nil {
br = &roachpb.BatchResponse{}
log.VErrEventf(ctx, 3, "error from stores.Send: %s", pErr)
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"doc.go",
"granter.go",
"store_token_estimation.go",
"work_queue.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/admission",
Expand All @@ -32,6 +33,7 @@ go_test(
name = "admission_test",
srcs = [
"granter_test.go",
"store_token_estimation_test.go",
"work_queue_test.go",
],
data = glob(["testdata/**"]),
Expand Down
Loading

0 comments on commit 5d1fa1e

Please sign in to comment.