Skip to content

Commit

Permalink
kvserver: track replicate queue metrics by allocator action
Browse files Browse the repository at this point in the history
While previously we had metrics within the replicate queue which tracked
the number of particular actions processed by the queue based on a
unique set of categories, this change adds new metrics for tracking the
successes/errors of a replica being processed by the replicate queue,
using the allocator action as a method of categorizing these actions.
With this categorization, we are able to track success and error counts
during rebalancing, upreplicating when we have a dead node, or
decommissioning. The categorization makes no distinction between actions
relatinv to voter replicas vs non-voter replicas, so they are aggregated
across these two types.

Release note (ops change): added new metrics:
```
queue.replicate.addreplica.(success|error)
queue.replicate.removereplica.(success|error)
queue.replicate.replacedeadreplica.(success|error)
queue.replicate.removedeadreplica.(success|error)
queue.replicate.replacedecommissioningreplica.(success|error)
queue.replicate.removedecommissioningreplica.(success|error)
```
  • Loading branch information
AlexTalks committed Aug 12, 2022
1 parent ff3fc7e commit 62b5e8b
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 8 deletions.
182 changes: 174 additions & 8 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,78 @@ var (
Measurement: "Demotions of Voters to Non Voters",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueAddReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.addreplica.success",
Help: "Number of successful replica additions processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueAddReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.addreplica.error",
Help: "Number of failed replica additions processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removereplica.success",
Help: "Number of successful replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removereplica.error",
Help: "Number of failed replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDeadReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.replacedeadreplica.success",
Help: "Number of successful dead replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDeadReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.replacedeadreplica.error",
Help: "Number of failed dead replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDecommissioningReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.replacedecommissioningreplica.success",
Help: "Number of successful decommissioning replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDecommissioningReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.replacedecommissioningreplica.error",
Help: "Number of failed decommissioning replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDecommissioningReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removedecommissioningreplica.success",
Help: "Number of successful decommissioning replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDecommissioningReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removedecommissioningreplica.error",
Help: "Number of failed decommissioning replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDeadReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removedeadreplica.success",
Help: "Number of successful dead replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDeadReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removedeadreplica.error",
Help: "Number of failed dead replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
)

// quorumError indicates a retryable error condition which sends replicas being
Expand Down Expand Up @@ -228,6 +300,23 @@ type ReplicateQueueMetrics struct {
TransferLeaseCount *metric.Counter
NonVoterPromotionsCount *metric.Counter
VoterDemotionsCount *metric.Counter

// Success/error counts by allocator action.
RemoveReplicaSuccessCount *metric.Counter
RemoveReplicaErrorCount *metric.Counter
AddReplicaSuccessCount *metric.Counter
AddReplicaErrorCount *metric.Counter
ReplaceDeadReplicaSuccessCount *metric.Counter
ReplaceDeadReplicaErrorCount *metric.Counter
RemoveDeadReplicaSuccessCount *metric.Counter
RemoveDeadReplicaErrorCount *metric.Counter
ReplaceDecommissioningReplicaSuccessCount *metric.Counter
ReplaceDecommissioningReplicaErrorCount *metric.Counter
RemoveDecommissioningReplicaSuccessCount *metric.Counter
RemoveDecommissioningReplicaErrorCount *metric.Counter
// TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner,
// AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange
// allocator actions.
}

func makeReplicateQueueMetrics() ReplicateQueueMetrics {
Expand All @@ -251,6 +340,19 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount),
NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount),
VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount),

RemoveReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveReplicaSuccessCount),
RemoveReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveReplicaErrorCount),
AddReplicaSuccessCount: metric.NewCounter(metaReplicateQueueAddReplicaSuccessCount),
AddReplicaErrorCount: metric.NewCounter(metaReplicateQueueAddReplicaErrorCount),
ReplaceDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaSuccessCount),
ReplaceDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaErrorCount),
RemoveDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaSuccessCount),
RemoveDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaErrorCount),
ReplaceDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaSuccessCount),
ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount),
RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount),
RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount),
}
}

Expand Down Expand Up @@ -353,6 +455,50 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(
}
}

// trackProcessResult increases the corresponding success/error count metric for
// processing a particular allocator action through the replicate queue.
func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction(
action allocatorimpl.AllocatorAction, err error, dryRun bool,
) {
if dryRun {
return
}
switch action {
case allocatorimpl.AllocatorRemoveVoter, allocatorimpl.AllocatorRemoveNonVoter:
if err == nil {
metrics.RemoveReplicaSuccessCount.Inc(1)
} else {
metrics.RemoveReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorAddVoter, allocatorimpl.AllocatorAddNonVoter:
if err == nil {
metrics.AddReplicaSuccessCount.Inc(1)
} else {
metrics.AddReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorReplaceDeadVoter, allocatorimpl.AllocatorReplaceDeadNonVoter:
if err == nil {
metrics.ReplaceDeadReplicaSuccessCount.Inc(1)
} else {
metrics.ReplaceDeadReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorReplaceDecommissioningVoter, allocatorimpl.AllocatorReplaceDecommissioningNonVoter:
if err == nil {
metrics.ReplaceDecommissioningReplicaSuccessCount.Inc(1)
} else {
metrics.ReplaceDecommissioningReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorRemoveDecommissioningVoter, allocatorimpl.AllocatorRemoveDecommissioningNonVoter:
if err == nil {
metrics.RemoveDecommissioningReplicaSuccessCount.Inc(1)
} else {
metrics.RemoveDecommissioningReplicaErrorCount.Inc(1)
}
default:
panic(fmt.Sprintf("unsupported AllocatorAction: %v", action))
}
}

// replicateQueue manages a queue of replicas which may need to add an
// additional replica to their range.
type replicateQueue struct {
Expand Down Expand Up @@ -641,19 +787,27 @@ func (rq *replicateQueue) processOneChange(

// Add replicas.
case allocatorimpl.AllocatorAddVoter:
return rq.addOrReplaceVoters(
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorAddNonVoter:
return rq.addOrReplaceNonVoters(
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Remove replicas.
case allocatorimpl.AllocatorRemoveVoter:
return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
requeue, err := rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorRemoveNonVoter:
return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
requeue, err := rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Replace dead replicas.
case allocatorimpl.AllocatorReplaceDeadVoter:
Expand All @@ -667,8 +821,10 @@ func (rq *replicateQueue) processOneChange(
"dead voter %v unexpectedly not found in %v",
deadVoterReplicas[0], voterReplicas)
}
return rq.addOrReplaceVoters(
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorReplaceDeadNonVoter:
if len(deadNonVoterReplicas) == 0 {
// Nothing to do.
Expand All @@ -680,8 +836,10 @@ func (rq *replicateQueue) processOneChange(
"dead non-voter %v unexpectedly not found in %v",
deadNonVoterReplicas[0], nonVoterReplicas)
}
return rq.addOrReplaceNonVoters(
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Replace decommissioning replicas.
case allocatorimpl.AllocatorReplaceDecommissioningVoter:
Expand All @@ -698,6 +856,7 @@ func (rq *replicateQueue) processOneChange(
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -715,6 +874,7 @@ func (rq *replicateQueue) processOneChange(
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -727,12 +887,14 @@ func (rq *replicateQueue) processOneChange(
// AllocatorReplaceDecommissioning{Non}Voter above.
case allocatorimpl.AllocatorRemoveDecommissioningVoter:
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.VoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil
case allocatorimpl.AllocatorRemoveDecommissioningNonVoter:
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.NonVoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -744,9 +906,13 @@ func (rq *replicateQueue) processOneChange(
// over-replicated and has dead replicas; in the common case we'll hit
// AllocatorReplaceDead{Non}Voter above.
case allocatorimpl.AllocatorRemoveDeadVoter:
return rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun)
requeue, err := rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorRemoveDeadNonVoter:
return rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun)
requeue, err := rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

case allocatorimpl.AllocatorConsiderRebalance:
return rq.considerRebalance(
Expand Down
Loading

0 comments on commit 62b5e8b

Please sign in to comment.