Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admission,kvserver: improved byte token estimation for writes #85059

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
return err
}
}
_, pErr := r.repl.Send(ctx, ba)
_, writeBytes, pErr := r.repl.SendWithWriteBytes(ctx, ba)
defer writeBytes.Release()
if r.admissionController != nil {
r.admissionController.AdmittedKVWorkDone(admissionHandle)
r.admissionController.AdmittedKVWorkDone(admissionHandle, writeBytes)
}
if pErr != nil {
log.VErrEventf(ctx, 2, "%v", pErr.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Replica) evalAndPropose(
writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data))
}
if proposal.command.ReplicatedEvalResult.AddSSTable != nil {
writeBytes.SSTableBytes = int64(len(proposal.command.ReplicatedEvalResult.AddSSTable.Data))
writeBytes.IngestedBytes = int64(len(proposal.command.ReplicatedEvalResult.AddSSTable.Data))
}
// If the request requested that Raft consensus be performed asynchronously,
// return a proposal result immediately on the proposal's done channel.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ var optimisticEvalLimitedScans = settings.RegisterBoolSetting(
func (r *Replica) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br, _, pErr := r.SendWithWriteBytes(ctx, ba)
br, writeBytes, pErr := r.SendWithWriteBytes(ctx, ba)
writeBytes.Release()
return br, pErr
}

Expand Down
39 changes: 30 additions & 9 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -3729,7 +3730,7 @@ type KVAdmissionController interface {
) (handle interface{}, err error)
// AdmittedKVWorkDone is called after the admitted KV work is done
// executing.
AdmittedKVWorkDone(handle interface{})
AdmittedKVWorkDone(handle interface{}, writeBytes *StoreWriteBytes)
// SetTenantWeightProvider is used to set the provider that will be
// periodically polled for weights. The stopper should be used to terminate
// the periodic polling.
Expand Down Expand Up @@ -3763,9 +3764,10 @@ type KVAdmissionControllerImpl struct {
kvAdmissionQ *admission.WorkQueue
storeGrantCoords *admission.StoreGrantCoordinators
settings *cluster.Settings
every log.EveryN
}

var _ KVAdmissionController = KVAdmissionControllerImpl{}
var _ KVAdmissionController = &KVAdmissionControllerImpl{}

type admissionHandle struct {
tenantID roachpb.TenantID
Expand All @@ -3781,15 +3783,16 @@ func MakeKVAdmissionController(
storeGrantCoords *admission.StoreGrantCoordinators,
settings *cluster.Settings,
) KVAdmissionController {
return KVAdmissionControllerImpl{
return &KVAdmissionControllerImpl{
kvAdmissionQ: kvAdmissionQ,
storeGrantCoords: storeGrantCoords,
settings: settings,
every: log.Every(10 * time.Second),
}
}

// AdmitKVWork implements the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) AdmitKVWork(
func (n *KVAdmissionControllerImpl) AdmitKVWork(
ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
) (handle interface{}, err error) {
ah := admissionHandle{tenantID: tenantID}
Expand Down Expand Up @@ -3828,7 +3831,6 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
}
admissionEnabled := true
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb WriteBytes for ingest requests.
ah.storeWorkHandle, err = ah.storeAdmissionQ.Admit(
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
if err != nil {
Expand All @@ -3840,11 +3842,16 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
// kvAdmissionQ.Admit, and so callAdmittedWorkDoneOnKVAdmissionQ will
// stay false.
ah.storeAdmissionQ = nil
admissionEnabled = false
}
}
if admissionEnabled {
ah.callAdmittedWorkDoneOnKVAdmissionQ, err = n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
if ah.storeAdmissionQ != nil {
// No bytes were written.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{})
}
return admissionHandle{}, err
}
}
Expand All @@ -3853,19 +3860,33 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
}

// AdmittedKVWorkDone implements the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) AdmittedKVWorkDone(handle interface{}) {
func (n *KVAdmissionControllerImpl) AdmittedKVWorkDone(
handle interface{}, writeBytes *StoreWriteBytes,
) {
ah := handle.(admissionHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb ingestedIntoL0Bytes and handle error return value.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, 0)
var doneInfo admission.StoreWorkDoneInfo
if writeBytes != nil {
doneInfo = admission.StoreWorkDoneInfo(*writeBytes)
}
err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo)
if err != nil {
// This shouldn't be happening.
if util.RaceEnabled {
log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err))
}
if n.every.ShouldLog() {
log.Errorf(context.Background(), "%s", err)
}
}
}
}

// SetTenantWeightProvider implements the KVAdmissionController interface.
func (n KVAdmissionControllerImpl) SetTenantWeightProvider(
func (n *KVAdmissionControllerImpl) SetTenantWeightProvider(
provider TenantWeightProvider, stopper *stop.Stopper,
) {
go func() {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (
func (s *Store) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
br, _, pErr = s.SendWithWriteBytes(ctx, ba)
var writeBytes *StoreWriteBytes
br, writeBytes, pErr = s.SendWithWriteBytes(ctx, ba)
writeBytes.Release()
return br, pErr
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ func (ls *Stores) GetReplicaForRangeID(
func (ls *Stores) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br, _, pErr := ls.SendWithWriteBytes(ctx, ba)
br, writeBytes, pErr := ls.SendWithWriteBytes(ctx, ba)
writeBytes.Release()
return br, pErr
}

Expand All @@ -195,11 +196,16 @@ var storeWriteBytesPool = sync.Pool{
}

func newStoreWriteBytes() *StoreWriteBytes {
return storeWriteBytesPool.Get().(*StoreWriteBytes)
wb := storeWriteBytesPool.Get().(*StoreWriteBytes)
*wb = StoreWriteBytes{}
return wb
}

// Release returns the *StoreWriteBytes to the pool.
func (wb *StoreWriteBytes) Release() {
if wb == nil {
return
}
storeWriteBytesPool.Put(wb)
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,17 +1009,16 @@ func (n *Node) batchInternal(

tStart := timeutil.Now()
handle, err := n.admissionController.AdmitKVWork(ctx, tenID, args)
defer n.admissionController.AdmittedKVWorkDone(handle)
if err != nil {
return nil, err
}
var pErr *roachpb.Error
// TODO(sumeer): plumb *StoreWriteBytes to admission control.
var writeBytes *kvserver.StoreWriteBytes
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, *args)
if writeBytes != nil {
defer func() {
n.admissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
}
}()
var pErr *roachpb.Error
br, writeBytes, pErr = n.stores.SendWithWriteBytes(ctx, *args)
if pErr != nil {
br = &roachpb.BatchResponse{}
log.VErrEventf(ctx, 3, "error from stores.Send: %s", pErr)
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"doc.go",
"granter.go",
"store_token_estimation.go",
"work_queue.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/admission",
Expand All @@ -32,6 +33,7 @@ go_test(
name = "admission_test",
srcs = [
"granter_test.go",
"store_token_estimation_test.go",
"work_queue_test.go",
],
data = glob(["testdata/**"]),
Expand Down
Loading