diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index f4178dedbe81..0d8123d9a15b 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -186,6 +186,78 @@ var ( Measurement: "Demotions of Voters to Non Voters", Unit: metric.Unit_COUNT, } + metaReplicateQueueAddReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.addreplica.success", + Help: "Number of successful replica additions processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueAddReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.addreplica.error", + Help: "Number of failed replica additions processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.removereplica.success", + Help: "Number of successful replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.removereplica.error", + Help: "Number of failed replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueReplaceDeadReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.replacedeadreplica.success", + Help: "Number of successful dead replica replica replacements processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueReplaceDeadReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.replacedeadreplica.error", + Help: "Number of failed dead replica replica replacements processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueReplaceDecommissioningReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.replacedecommissioningreplica.success", + Help: "Number of successful decommissioning replica replica replacements processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueReplaceDecommissioningReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.replacedecommissioningreplica.error", + Help: "Number of failed decommissioning replica replica replacements processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDecommissioningReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.removedecommissioningreplica.success", + Help: "Number of successful decommissioning replica replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDecommissioningReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.removedecommissioningreplica.error", + Help: "Number of failed decommissioning replica replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDeadReplicaSuccessCount = metric.Metadata{ + Name: "queue.replicate.removedeadreplica.success", + Help: "Number of successful dead replica replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDeadReplicaErrorCount = metric.Metadata{ + Name: "queue.replicate.removedeadreplica.error", + Help: "Number of failed dead replica replica removals processed by the replicate queue", + Measurement: "Replicas", + Unit: metric.Unit_COUNT, + } ) // quorumError indicates a retryable error condition which sends replicas being @@ -228,6 +300,23 @@ type ReplicateQueueMetrics struct { TransferLeaseCount *metric.Counter NonVoterPromotionsCount *metric.Counter VoterDemotionsCount *metric.Counter + + // Success/error counts by allocator action. + RemoveReplicaSuccessCount *metric.Counter + RemoveReplicaErrorCount *metric.Counter + AddReplicaSuccessCount *metric.Counter + AddReplicaErrorCount *metric.Counter + ReplaceDeadReplicaSuccessCount *metric.Counter + ReplaceDeadReplicaErrorCount *metric.Counter + RemoveDeadReplicaSuccessCount *metric.Counter + RemoveDeadReplicaErrorCount *metric.Counter + ReplaceDecommissioningReplicaSuccessCount *metric.Counter + ReplaceDecommissioningReplicaErrorCount *metric.Counter + RemoveDecommissioningReplicaSuccessCount *metric.Counter + RemoveDecommissioningReplicaErrorCount *metric.Counter + // TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner, + // AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange + // allocator actions. } func makeReplicateQueueMetrics() ReplicateQueueMetrics { @@ -251,6 +340,19 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics { TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount), VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount), + + RemoveReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveReplicaSuccessCount), + RemoveReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveReplicaErrorCount), + AddReplicaSuccessCount: metric.NewCounter(metaReplicateQueueAddReplicaSuccessCount), + AddReplicaErrorCount: metric.NewCounter(metaReplicateQueueAddReplicaErrorCount), + ReplaceDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaSuccessCount), + ReplaceDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaErrorCount), + RemoveDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaSuccessCount), + RemoveDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaErrorCount), + ReplaceDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaSuccessCount), + ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount), + RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount), + RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount), } } @@ -353,6 +455,50 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount( } } +// trackProcessResult increases the corresponding success/error count metric for +// processing a particular allocator action through the replicate queue. +func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction( + action allocatorimpl.AllocatorAction, err error, dryRun bool, +) { + if dryRun { + return + } + switch action { + case allocatorimpl.AllocatorRemoveVoter, allocatorimpl.AllocatorRemoveNonVoter: + if err == nil { + metrics.RemoveReplicaSuccessCount.Inc(1) + } else { + metrics.RemoveReplicaErrorCount.Inc(1) + } + case allocatorimpl.AllocatorAddVoter, allocatorimpl.AllocatorAddNonVoter: + if err == nil { + metrics.AddReplicaSuccessCount.Inc(1) + } else { + metrics.AddReplicaErrorCount.Inc(1) + } + case allocatorimpl.AllocatorReplaceDeadVoter, allocatorimpl.AllocatorReplaceDeadNonVoter: + if err == nil { + metrics.ReplaceDeadReplicaSuccessCount.Inc(1) + } else { + metrics.ReplaceDeadReplicaErrorCount.Inc(1) + } + case allocatorimpl.AllocatorReplaceDecommissioningVoter, allocatorimpl.AllocatorReplaceDecommissioningNonVoter: + if err == nil { + metrics.ReplaceDecommissioningReplicaSuccessCount.Inc(1) + } else { + metrics.ReplaceDecommissioningReplicaErrorCount.Inc(1) + } + case allocatorimpl.AllocatorRemoveDecommissioningVoter, allocatorimpl.AllocatorRemoveDecommissioningNonVoter: + if err == nil { + metrics.RemoveDecommissioningReplicaSuccessCount.Inc(1) + } else { + metrics.RemoveDecommissioningReplicaErrorCount.Inc(1) + } + default: + panic(fmt.Sprintf("unsupported AllocatorAction: %v", action)) + } +} + // replicateQueue manages a queue of replicas which may need to add an // additional replica to their range. type replicateQueue struct { @@ -641,19 +787,27 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case allocatorimpl.AllocatorAddVoter: - return rq.addOrReplaceVoters( + requeue, err := rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, ) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err case allocatorimpl.AllocatorAddNonVoter: - return rq.addOrReplaceNonVoters( + requeue, err := rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, ) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err // Remove replicas. case allocatorimpl.AllocatorRemoveVoter: - return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + requeue, err := rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err case allocatorimpl.AllocatorRemoveNonVoter: - return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + requeue, err := rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err // Replace dead replicas. case allocatorimpl.AllocatorReplaceDeadVoter: @@ -667,8 +821,10 @@ func (rq *replicateQueue) processOneChange( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters( + requeue, err := rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err case allocatorimpl.AllocatorReplaceDeadNonVoter: if len(deadNonVoterReplicas) == 0 { // Nothing to do. @@ -680,8 +836,10 @@ func (rq *replicateQueue) processOneChange( "dead non-voter %v unexpectedly not found in %v", deadNonVoterReplicas[0], nonVoterReplicas) } - return rq.addOrReplaceNonVoters( + requeue, err := rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err // Replace decommissioning replicas. case allocatorimpl.AllocatorReplaceDecommissioningVoter: @@ -698,6 +856,7 @@ func (rq *replicateQueue) processOneChange( } requeue, err := rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} } @@ -715,6 +874,7 @@ func (rq *replicateQueue) processOneChange( } requeue, err := rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} } @@ -727,12 +887,14 @@ func (rq *replicateQueue) processOneChange( // AllocatorReplaceDecommissioning{Non}Voter above. case allocatorimpl.AllocatorRemoveDecommissioningVoter: requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.VoterTarget, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} } return requeue, nil case allocatorimpl.AllocatorRemoveDecommissioningNonVoter: requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.NonVoterTarget, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} } @@ -744,9 +906,13 @@ func (rq *replicateQueue) processOneChange( // over-replicated and has dead replicas; in the common case we'll hit // AllocatorReplaceDead{Non}Voter above. case allocatorimpl.AllocatorRemoveDeadVoter: - return rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun) + requeue, err := rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err case allocatorimpl.AllocatorRemoveDeadNonVoter: - return rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun) + requeue, err := rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun) + rq.metrics.trackResultByAllocatorAction(action, err, dryRun) + return requeue, err case allocatorimpl.AllocatorConsiderRebalance: return rq.considerRebalance( diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index b2fd78760175..6c5827963b0b 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -480,6 +480,8 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { previousRemovalCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() previousDecommRemovals := store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + previousDecommReplacementSuccesses := + store.ReplicateQueueMetrics().ReplaceDecommissioningReplicaSuccessCount.Count() // Decommission each of the two nodes that have the non-voters and make sure // that those non-voters are upreplicated elsewhere. @@ -515,6 +517,8 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { currentRemoveCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() currentDecommRemovals := store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + currentDecommReplacementSuccesses := + store.ReplicateQueueMetrics().ReplaceDecommissioningReplicaSuccessCount.Count() require.GreaterOrEqualf( t, currentAddCount, previousAddCount+2, @@ -528,6 +532,9 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { t, currentDecommRemovals, previousDecommRemovals+2, "expected decommissioning replica removals to increase by at least 2", ) + require.GreaterOrEqualf(t, currentDecommReplacementSuccesses, previousDecommReplacementSuccesses+2, + "expected decommissioning replica replacement successes to increase by at least 2", + ) }) // Check that when we have more non-voters than needed and some of those @@ -558,9 +565,22 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { if err != nil { t.Fatal(err) } + + // Ensure leaseholder has updated span config with 0 non-voters. + require.Eventually(t, func() bool { + repl, err := store.GetReplica(scratchRange.RangeID) + if err != nil { + t.Fatal(err) + } + _, conf := repl.DescAndSpanConfig() + return conf.GetNumNonVoters() == 0 + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + previousRemovalCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() previousDecommRemovals := store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + previousDecommRemovalSuccesses := + store.ReplicateQueueMetrics().RemoveDecommissioningReplicaSuccessCount.Count() require.NoError(t, tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, nonVoterNodeIDs)) @@ -581,6 +601,8 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { currentRemoveCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() currentDecommRemovals := store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + currentDecommRemovalSuccesses := + store.ReplicateQueueMetrics().RemoveDecommissioningReplicaSuccessCount.Count() require.GreaterOrEqualf( t, currentRemoveCount, previousRemovalCount+2, "expected replica removals to increase by at least 2", @@ -589,6 +611,9 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { t, currentDecommRemovals, previousDecommRemovals+2, "expected replica removals to increase by at least 2", ) + require.GreaterOrEqualf(t, currentDecommRemovalSuccesses, previousDecommRemovalSuccesses+2, + "expected decommissioning replica removal successes to increase by at least 2", + ) }) } @@ -765,6 +790,7 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { prevAdditions := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() prevRemovals := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() prevDeadRemovals := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + prevDeadReplacementSuccesses := store.ReplicateQueueMetrics().ReplaceDeadReplicaSuccessCount.Count() beforeNodeIDs := getNonVoterNodeIDs(scratchRange) markDead(beforeNodeIDs) @@ -794,6 +820,7 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { addCount := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() removeNonVoterCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() removeDeadNonVoterCount := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + replaceDeadSuccesses := store.ReplicateQueueMetrics().ReplaceDeadReplicaSuccessCount.Count() require.GreaterOrEqualf( t, addCount, prevAdditions+2, @@ -807,6 +834,10 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { t, removeDeadNonVoterCount, prevDeadRemovals+2, "expected replica removals to increase by at least 2", ) + require.GreaterOrEqualf( + t, replaceDeadSuccesses, prevDeadReplacementSuccesses+2, + "expected dead replica replacement successes to increase by at least 2", + ) }) // This subtest checks that when we have more non-voters than needed and some @@ -838,9 +869,21 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { if err != nil { t.Fatal(err) } + + // Ensure leaseholder has updated span config with 0 non-voters. + require.Eventually(t, func() bool { + repl, err := store.GetReplica(scratchRange.RangeID) + if err != nil { + t.Fatal(err) + } + _, conf := repl.DescAndSpanConfig() + return conf.GetNumNonVoters() == 0 + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + prevRemovals := store.ReplicateQueueMetrics().RemoveReplicaCount.Count() prevNonVoterRemovals := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() prevDeadRemovals := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + prevDeadRemovalSuccesses := store.ReplicateQueueMetrics().RemoveDeadReplicaSuccessCount.Count() beforeNodeIDs := getNonVoterNodeIDs(scratchRange) markDead(beforeNodeIDs) @@ -858,6 +901,7 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { removeCount := store.ReplicateQueueMetrics().RemoveReplicaCount.Count() removeNonVoterCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() removeDeadNonVoterCount := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + removeDeadSuccesses := store.ReplicateQueueMetrics().RemoveDeadReplicaSuccessCount.Count() require.GreaterOrEqualf( t, removeCount, prevRemovals+2, "expected replica removals to increase by at least 2", @@ -870,6 +914,10 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { t, removeDeadNonVoterCount, prevDeadRemovals+2, "expected replica removals to increase by at least 2", ) + require.GreaterOrEqualf( + t, removeDeadSuccesses, prevDeadRemovalSuccesses+2, + "expected dead replica removal successes to increase by at least 2", + ) }) } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 3be998121eaf..f37b4e6d758b 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2020,6 +2020,28 @@ var charts = []sectionDescription{ "queue.replicate.removedecommissioningnonvoterreplica", }, }, + { + Title: "Successes by Action", + Metrics: []string{ + "queue.replicate.addreplica.success", + "queue.replicate.removereplica.success", + "queue.replicate.replacedeadreplica.success", + "queue.replicate.removedeadreplica.success", + "queue.replicate.replacedecommissioningreplica.success", + "queue.replicate.removedecommissioningreplica.success", + }, + }, + { + Title: "Errors by Action", + Metrics: []string{ + "queue.replicate.addreplica.error", + "queue.replicate.removereplica.error", + "queue.replicate.replacedeadreplica.error", + "queue.replicate.removedeadreplica.error", + "queue.replicate.replacedecommissioningreplica.error", + "queue.replicate.removedecommissioningreplica.error", + }, + }, { Title: "Successes", Metrics: []string{