From 7425832dd4b7691d0f27246798e041e5b4237f51 Mon Sep 17 00:00:00 2001 From: Alex Novak Date: Tue, 8 Oct 2024 15:30:12 -0400 Subject: [PATCH 1/4] Adds scheduler platform properties as prometheus labels --- pkg/scheduler/in_memory_build_queue.go | 379 ++++++++++++++++++------- 1 file changed, 271 insertions(+), 108 deletions(-) diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index 0cc1cdf..9173c1c 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -46,57 +46,69 @@ import ( var ( inMemoryBuildQueuePrometheusMetrics sync.Once - inMemoryBuildQueueInFlightDeduplicationsTotal = prometheus.NewCounterVec( + schedulerRegistry = prometheus.NewRegistry() + + inMemoryBuildQueueInFlightDeduplicationsTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_in_flight_deduplications_total", Help: "Number of times an Execute() request of a cacheable action was performed, and whether it was in-flight deduplicated against an existing task.", }, - []string{"instance_name_prefix", "platform", "size_class", "outcome"}) + []string{"outcome"}, + } - inMemoryBuildQueueInvocationsCreatedTotal = prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsCreatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_created_total", Help: "Number of times an invocation object was created by creating a size class queue or scheduling a task through Execute().", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsActivatedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsActivatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_activated_total", Help: "Number of times an invocation object transitioned from being idle to having queued or executing operations.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsDeactivatedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsDeactivatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_deactivated_total", Help: "Number of times an invocation object transitioned from having queued or executing operations to being idle.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) - inMemoryBuildQueueInvocationsRemovedTotal = prometheus.NewCounterVec( + []string{"depth"}, + } + + inMemoryBuildQueueInvocationsRemovedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_invocations_removed_total", Help: "Number of times an invocation object was removed.", }, - []string{"instance_name_prefix", "platform", "size_class", "depth"}) + []string{"depth"}, + } - inMemoryBuildQueueTasksScheduledTotal = prometheus.NewCounterVec( + inMemoryBuildQueueTasksScheduledTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_tasks_scheduled_total", Help: "Number of times tasks were scheduled, either by calling Execute() or through initial size class selection retries.", }, - []string{"instance_name_prefix", "platform", "size_class", "assignment", "do_not_cache"}) - inMemoryBuildQueueTasksQueuedDurationSeconds = prometheus.NewHistogramVec( + []string{"assignment", "do_not_cache"}, + } + + inMemoryBuildQueueTasksQueuedDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -104,8 +116,10 @@ var ( Help: "Time in seconds that tasks were queued before executing.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueTasksExecutingDurationSeconds = prometheus.NewHistogramVec( + []string{}, + } + + inMemoryBuildQueueTasksExecutingDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -113,8 +127,10 @@ var ( Help: "Time in seconds that tasks were executing before completing.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class", "result", "grpc_code"}) - inMemoryBuildQueueTasksExecutingRetries = prometheus.NewHistogramVec( + []string{"result", "grpc_code"}, + } + + inMemoryBuildQueueTasksExecutingRetriesTemplate = metricTemplate[prometheus.HistogramOpts]{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -122,8 +138,10 @@ var ( Help: "Number of times that tasks were retried before completing.", Buckets: prometheus.LinearBuckets(0, 1, 11), }, - []string{"instance_name_prefix", "platform", "size_class", "result", "grpc_code"}) - inMemoryBuildQueueTasksCompletedDurationSeconds = prometheus.NewHistogramVec( + []string{"result", "grpc_code"}, + } + + inMemoryBuildQueueTasksCompletedDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -131,34 +149,40 @@ var ( Help: "Time in seconds that tasks were completed before being removed.", Buckets: util.DecimalExponentialBuckets(-3, 6, 2), }, - []string{"instance_name_prefix", "platform", "size_class"}) + []string{}, + } - inMemoryBuildQueueWorkersCreatedTotal = prometheus.NewCounterVec( + inMemoryBuildQueueWorkersCreatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_created_total", Help: "Number of workers created by Synchronize().", }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueWorkersTerminatingTotal = prometheus.NewCounterVec( + []string{}, + } + + inMemoryBuildQueueWorkersTerminatingTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_terminating_total", Help: "Number of workers that have entered the terminating state.", }, - []string{"instance_name_prefix", "platform", "size_class"}) - inMemoryBuildQueueWorkersRemovedTotal = prometheus.NewCounterVec( + []string{}, + } + + inMemoryBuildQueueWorkersRemovedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", Name: "in_memory_build_queue_workers_removed_total", Help: "Number of workers removed due to expiration.", }, - []string{"instance_name_prefix", "platform", "size_class", "state"}) + []string{"state"}, + } - inMemoryBuildQueueWorkerInvocationStickinessRetained = prometheus.NewHistogramVec( + inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate = metricTemplate[prometheus.HistogramOpts]{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -166,9 +190,15 @@ var ( Help: "How many levels of worker invocation stickiness were respected, as configured through worker_invocation_stickiness_limits.", Buckets: prometheus.LinearBuckets(0, 1, 11), }, - []string{"instance_name_prefix", "platform", "size_class"}) + []string{}, + } ) +type metricTemplate[T prometheus.HistogramOpts | prometheus.CounterOpts] struct { + opts T + requiredLabels []string +} + // InMemoryBuildQueueConfiguration contains all the tunable settings of // the InMemoryBuildQueue. type InMemoryBuildQueueConfiguration struct { @@ -284,24 +314,7 @@ var inMemoryBuildQueueCapabilitiesProvider = capabilities.NewStaticProvider(&rem // execution requests. All of these are created by sending it RPCs. func NewInMemoryBuildQueue(contentAddressableStorage blobstore.BlobAccess, clock clock.Clock, uuidGenerator util.UUIDGenerator, configuration *InMemoryBuildQueueConfiguration, maximumMessageSizeBytes int, actionRouter routing.ActionRouter, executeAuthorizer, modifyDrainsAuthorizer, killOperationsAuthorizer auth.Authorizer) *InMemoryBuildQueue { inMemoryBuildQueuePrometheusMetrics.Do(func() { - prometheus.MustRegister(inMemoryBuildQueueInFlightDeduplicationsTotal) - - prometheus.MustRegister(inMemoryBuildQueueInvocationsCreatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsActivatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsDeactivatedTotal) - prometheus.MustRegister(inMemoryBuildQueueInvocationsRemovedTotal) - - prometheus.MustRegister(inMemoryBuildQueueTasksScheduledTotal) - prometheus.MustRegister(inMemoryBuildQueueTasksQueuedDurationSeconds) - prometheus.MustRegister(inMemoryBuildQueueTasksExecutingDurationSeconds) - prometheus.MustRegister(inMemoryBuildQueueTasksExecutingRetries) - prometheus.MustRegister(inMemoryBuildQueueTasksCompletedDurationSeconds) - - prometheus.MustRegister(inMemoryBuildQueueWorkersCreatedTotal) - prometheus.MustRegister(inMemoryBuildQueueWorkersTerminatingTotal) - prometheus.MustRegister(inMemoryBuildQueueWorkersRemovedTotal) - - prometheus.MustRegister(inMemoryBuildQueueWorkerInvocationStickinessRetained) + prometheus.MustRegister(schedulerRegistry) }) return &InMemoryBuildQueue{ @@ -330,6 +343,17 @@ var ( _ buildqueuestate.BuildQueueStateServer = (*InMemoryBuildQueue)(nil) ) +func getPlatformProperties(platform *remoteexecution.Platform) map[string]string { + if platform == nil { + return make(map[string]string, 0) + } + properties := make(map[string]string, len(platform.Properties)) + for _, property := range platform.Properties { + properties[property.Name] = property.Value + } + return properties +} + // RegisterPredeclaredPlatformQueue adds a platform queue to // InMemoryBuildQueue that remains present, regardless of whether // workers appear. @@ -352,6 +376,7 @@ func (bq *InMemoryBuildQueue) RegisterPredeclaredPlatformQueue(instanceNamePrefi if err != nil { return err } + platformProperties := getPlatformProperties(platformMessage) bq.enter(bq.clock.Now()) defer bq.leave() @@ -360,7 +385,7 @@ func (bq *InMemoryBuildQueue) RegisterPredeclaredPlatformQueue(instanceNamePrefi return status.Error(codes.AlreadyExists, "A queue with the same instance name prefix or platform already exists") } - pq := bq.addPlatformQueue(platformKey, workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority) + pq := bq.addPlatformQueue(platformKey, platformProperties, workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority) for _, sizeClass := range sizeClasses { pq.addSizeClassQueue(bq, sizeClass, false) } @@ -461,7 +486,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re // Task is already associated with the current // invocation. Simply wait on the operation that // already exists. - scq.inFlightDeduplicationsSameInvocation.Inc() + scq.metrics.inFlightDeduplicationsSameInvocation.Inc() return o.waitExecution(bq, out) } @@ -479,7 +504,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re default: panic("Task in unexpected stage") } - scq.inFlightDeduplicationsOtherInvocation.Inc() + scq.metrics.inFlightDeduplicationsOtherInvocation.Inc() return o.waitExecution(bq, out) } @@ -529,7 +554,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re } if !action.DoNotCache { bq.inFlightDeduplicationMap[actionDigest] = t - scq.inFlightDeduplicationsNew.Inc() + scq.metrics.inFlightDeduplicationsNew.Inc() } i := scq.getOrCreateInvocation(bq, invocationKeys) o := t.newOperation(bq, in.ExecutionPolicy.GetPriority(), i, false) @@ -578,6 +603,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo if err != nil { return nil, err } + platformProperties := getPlatformProperties(request.Platform) workerKey := newWorkerKey(request.WorkerId) bq.enter(bq.clock.Now()) @@ -619,7 +645,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo // pair has not been observed before. Create a // new platform queue containing a single size // class queue. - pq = bq.addPlatformQueue(platformKey, nil, 0, 0) + pq = bq.addPlatformQueue(platformKey, platformProperties, nil, 0, 0) } scq = pq.addSizeClassQueue(bq, request.SizeClass, true) } @@ -645,7 +671,7 @@ func (bq *InMemoryBuildQueue) Synchronize(ctx context.Context, request *remotewo } i.idleWorkersCount++ scq.workers[workerKey] = w - scq.workersCreatedTotal.Inc() + scq.metrics.workersCreatedTotal.Inc() } // Install cleanup handlers to ensure stale workers and queues @@ -1271,12 +1297,15 @@ func (bq *InMemoryBuildQueue) getIdleSynchronizeResponse() *remoteworker.Synchro } } +// + // addPlatformQueue creates a new platform queue for a given platform. -func (bq *InMemoryBuildQueue) addPlatformQueue(platformKey platform.Key, workerInvocationStickinessLimits []time.Duration, maximumQueuedBackgroundLearningOperations int, backgroundLearningOperationPriority int32) *platformQueue { +func (bq *InMemoryBuildQueue) addPlatformQueue(platformKey platform.Key, properties map[string]string, workerInvocationStickinessLimits []time.Duration, maximumQueuedBackgroundLearningOperations int, backgroundLearningOperationPriority int32) *platformQueue { pq := &platformQueue{ - platformKey: platformKey, - instanceNamePatcher: digest.NewInstanceNamePatcher(platformKey.GetInstanceNamePrefix(), digest.EmptyInstanceName), - workerInvocationStickinessLimits: workerInvocationStickinessLimits, + platformKey: platformKey, + properties: properties, + instanceNamePatcher: digest.NewInstanceNamePatcher(platformKey.GetInstanceNamePrefix(), digest.EmptyInstanceName), + workerInvocationStickinessLimits: workerInvocationStickinessLimits, maximumQueuedBackgroundLearningOperations: maximumQueuedBackgroundLearningOperations, backgroundLearningOperationPriority: backgroundLearningOperationPriority, } @@ -1350,6 +1379,7 @@ func (k *sizeClassKey) getSizeClassQueueName() *buildqueuestate.SizeClassQueueNa // instance/platform for which one or more workers exist. type platformQueue struct { platformKey platform.Key + properties map[string]string instanceNamePatcher digest.InstanceNamePatcher workerInvocationStickinessLimits []time.Duration maximumQueuedBackgroundLearningOperations int @@ -1374,7 +1404,10 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin "platform": platformStr, "size_class": sizeClassStr, } - tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(platformLabels) + for p, pv := range pq.properties { + platformLabels[p] = pv + } + metrics := newSizeClassQueueMetrics(platformLabels) scq := &sizeClassQueue{ platformQueue: pq, sizeClass: sizeClass, @@ -1389,31 +1422,15 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin drains: map[string]*buildqueuestate.DrainState{}, undrainWakeup: make(chan struct{}), - inFlightDeduplicationsSameInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "SameInvocation"), - inFlightDeduplicationsOtherInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "OtherInvocation"), - inFlightDeduplicationsNew: inMemoryBuildQueueInFlightDeduplicationsTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "New"), - - tasksScheduledWorker: newTasksScheduledCounterVec(tasksScheduledTotal, "Worker"), - tasksScheduledQueue: newTasksScheduledCounterVec(tasksScheduledTotal, "Queue"), - tasksQueuedDurationSeconds: inMemoryBuildQueueTasksQueuedDurationSeconds.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - tasksExecutingDurationSeconds: inMemoryBuildQueueTasksExecutingDurationSeconds.MustCurryWith(platformLabels), - tasksExecutingRetries: inMemoryBuildQueueTasksExecutingRetries.MustCurryWith(platformLabels), - tasksCompletedDurationSeconds: inMemoryBuildQueueTasksCompletedDurationSeconds.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - - workersCreatedTotal: inMemoryBuildQueueWorkersCreatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - workersTerminatingTotal: inMemoryBuildQueueWorkersTerminatingTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), - workersRemovedIdleTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Idle"), - workersRemovedExecutingTotal: inMemoryBuildQueueWorkersRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, "Executing"), - - workerInvocationStickinessRetained: inMemoryBuildQueueWorkerInvocationStickinessRetained.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr), + metrics: metrics, } scq.rootInvocation.sizeClassQueue = scq scq.incrementInvocationsCreatedTotal(0) // Force creation of all metrics associated with this platform // queue to make recording rules work. - scq.tasksExecutingDurationSeconds.WithLabelValues("Success", "") - scq.tasksExecutingRetries.WithLabelValues("Success", "") + scq.metrics.tasksExecutingDurationSeconds.WithLabelValues("Success", "") + scq.metrics.tasksExecutingRetries.WithLabelValues("Success", "") // Insert the new size class queue into the platform queue. // Keep the size class queues sorted, so that they are provided @@ -1460,6 +1477,168 @@ type invocationsMetrics struct { removedTotal prometheus.Counter } +type sizeClassQueueMetrics struct { + registry *prometheus.Registry + inFlightDeduplicationsSameInvocation prometheus.Counter + inFlightDeduplicationsOtherInvocation prometheus.Counter + inFlightDeduplicationsNew prometheus.Counter + + tasksScheduledWorker tasksScheduledCounterVec + tasksScheduledQueue tasksScheduledCounterVec + + tasksQueuedDurationSeconds prometheus.Observer + tasksExecutingDurationSeconds prometheus.ObserverVec + tasksExecutingRetries prometheus.ObserverVec + tasksCompletedDurationSeconds prometheus.Observer + + workersCreatedTotal prometheus.Counter + workersTerminatingTotal prometheus.Counter + workersRemovedIdleTotal prometheus.Counter + workersRemovedExecutingTotal prometheus.Counter + + workerInvocationStickinessRetained prometheus.Observer + + invocationsCreatedTotal *prometheus.CounterVec + invocationsActivatedTotal *prometheus.CounterVec + invocationsDeactivatedTotal *prometheus.CounterVec + invocationsRemovedTotal *prometheus.CounterVec +} + +// newSizeClassQueueMetrics creates a new set of newSizeClassQueueMetrics and registers all +// newly created metrics with the associated regisry. The registry should be deregistered from +// the global registry if the sizeclass is ever deleted. +func newSizeClassQueueMetrics(properties map[string]string) sizeClassQueueMetrics { + propertyKeys := make([]string, 0, len(properties)) + for k := range properties { + propertyKeys = append(propertyKeys, k) + } + + update := func(base, updates map[string]string) prometheus.Labels { + ret := make(map[string]string, len(base)) + for k, v := range base { + ret[k] = v + } + for k, v := range updates { + ret[k] = v + } + return ret + } + + registry := prometheus.NewRegistry() + schedulerRegistry.MustRegister(registry) + + inMemoryBuildQueueInFlightDeduplicationsTotal := prometheus.NewCounterVec( + inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.opts, + append(inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueInFlightDeduplicationsTotal) + + inMemoryBuildQueueInvocationsCreatedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsCreatedTotalTemplate.opts, + append(inMemoryBuildQueueInvocationsCreatedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueInvocationsCreatedTotal) + + inMemoryBuildQueueInvocationsActivatedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsActivatedTotalTemplate.opts, + append(inMemoryBuildQueueInvocationsActivatedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueInvocationsActivatedTotal) + + inMemoryBuildQueueInvocationsDeactivatedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.opts, + append(inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueInvocationsDeactivatedTotal) + + inMemoryBuildQueueInvocationsRemovedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueInvocationsRemovedTotalTemplate.opts, + append(inMemoryBuildQueueInvocationsRemovedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueInvocationsRemovedTotal) + + inMemoryBuildQueueTasksScheduledTotal := prometheus.NewCounterVec( + inMemoryBuildQueueTasksScheduledTotalTemplate.opts, + append(inMemoryBuildQueueTasksScheduledTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueTasksScheduledTotal) + + inMemoryBuildQueueTasksQueuedDurationSeconds := prometheus.NewHistogramVec( + inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.opts, + append(inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueTasksQueuedDurationSeconds) + + inMemoryBuildQueueTasksExecutingDurationSeconds := prometheus.NewHistogramVec( + inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.opts, + append(inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueTasksExecutingDurationSeconds) + + inMemoryBuildQueueTasksExecutingRetries := prometheus.NewHistogramVec( + inMemoryBuildQueueTasksExecutingRetriesTemplate.opts, + append(inMemoryBuildQueueTasksExecutingRetriesTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueTasksExecutingRetries) + + inMemoryBuildQueueTasksCompletedDurationSeconds := prometheus.NewHistogramVec( + inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.opts, + append(inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueTasksCompletedDurationSeconds) + + inMemoryBuildQueueWorkersCreatedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueWorkersCreatedTotalTemplate.opts, + append(inMemoryBuildQueueWorkersCreatedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueWorkersCreatedTotal) + + inMemoryBuildQueueWorkersTerminatingTotal := prometheus.NewCounterVec( + inMemoryBuildQueueWorkersTerminatingTotalTemplate.opts, + append(inMemoryBuildQueueWorkersTerminatingTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueWorkersTerminatingTotal) + + inMemoryBuildQueueWorkersRemovedTotal := prometheus.NewCounterVec( + inMemoryBuildQueueWorkersRemovedTotalTemplate.opts, + append(inMemoryBuildQueueWorkersRemovedTotalTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueWorkersRemovedTotal) + + inMemoryBuildQueueWorkerInvocationStickinessRetained := prometheus.NewHistogramVec( + inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.opts, + append(inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.requiredLabels, propertyKeys...), + ) + registry.MustRegister(inMemoryBuildQueueWorkerInvocationStickinessRetained) + + tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(properties) + return sizeClassQueueMetrics{ + registry: registry, + inFlightDeduplicationsSameInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "SameInvocation"})), + inFlightDeduplicationsOtherInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "OtherInvocation"})), + inFlightDeduplicationsNew: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "New"})), + + tasksScheduledWorker: newTasksScheduledCounterVec(tasksScheduledTotal, "Worker"), + tasksScheduledQueue: newTasksScheduledCounterVec(tasksScheduledTotal, "Queue"), + tasksQueuedDurationSeconds: inMemoryBuildQueueTasksQueuedDurationSeconds.With(properties), + tasksExecutingDurationSeconds: inMemoryBuildQueueTasksExecutingDurationSeconds.MustCurryWith(properties), + tasksExecutingRetries: inMemoryBuildQueueTasksExecutingRetries.MustCurryWith(properties), + tasksCompletedDurationSeconds: inMemoryBuildQueueTasksCompletedDurationSeconds.With(properties), + + workersCreatedTotal: inMemoryBuildQueueWorkersCreatedTotal.With(properties), + workersTerminatingTotal: inMemoryBuildQueueWorkersTerminatingTotal.With(properties), + workersRemovedIdleTotal: inMemoryBuildQueueWorkersRemovedTotal.With(update(properties, map[string]string{"state": "Idle"})), + workersRemovedExecutingTotal: inMemoryBuildQueueWorkersRemovedTotal.With(update(properties, map[string]string{"state": "Executing"})), + + workerInvocationStickinessRetained: inMemoryBuildQueueWorkerInvocationStickinessRetained.With(properties), + + invocationsCreatedTotal: inMemoryBuildQueueInvocationsCreatedTotal.MustCurryWith(properties), + invocationsActivatedTotal: inMemoryBuildQueueInvocationsActivatedTotal.MustCurryWith(properties), + invocationsDeactivatedTotal: inMemoryBuildQueueInvocationsDeactivatedTotal.MustCurryWith(properties), + invocationsRemovedTotal: inMemoryBuildQueueInvocationsRemovedTotal.MustCurryWith(properties), + } +} + type sizeClassQueue struct { platformQueue *platformQueue sizeClass uint32 @@ -1476,25 +1655,9 @@ type sizeClassQueue struct { undrainWakeup chan struct{} // Prometheus metrics. - inFlightDeduplicationsSameInvocation prometheus.Counter - inFlightDeduplicationsOtherInvocation prometheus.Counter - inFlightDeduplicationsNew prometheus.Counter + metrics sizeClassQueueMetrics invocationsMetrics []invocationsMetrics - - tasksScheduledWorker tasksScheduledCounterVec - tasksScheduledQueue tasksScheduledCounterVec - tasksQueuedDurationSeconds prometheus.Observer - tasksExecutingDurationSeconds prometheus.ObserverVec - tasksExecutingRetries prometheus.ObserverVec - tasksCompletedDurationSeconds prometheus.Observer - - workersCreatedTotal prometheus.Counter - workersTerminatingTotal prometheus.Counter - workersRemovedIdleTotal prometheus.Counter - workersRemovedExecutingTotal prometheus.Counter - - workerInvocationStickinessRetained prometheus.Observer } func (scq *sizeClassQueue) getKey() sizeClassKey { @@ -1516,6 +1679,7 @@ func (scq *sizeClassQueue) remove(bq *InMemoryBuildQueue) { "Workers for this instance name, platform and size class disappeared while task was queued", ).Proto()) scq.invocationsMetrics[0].removedTotal.Inc() + schedulerRegistry.Unregister(scq.metrics.registry) delete(bq.sizeClassQueues, scq.getKey()) pq := scq.platformQueue @@ -1547,9 +1711,9 @@ func (scq *sizeClassQueue) removeStaleWorker(bq *InMemoryBuildQueue, workerKey w w := scq.workers[workerKey] scq.markWorkerTerminating(w) if t := w.currentTask; t == nil { - scq.workersRemovedIdleTotal.Inc() + scq.metrics.workersRemovedIdleTotal.Inc() } else { - scq.workersRemovedExecutingTotal.Inc() + scq.metrics.workersRemovedExecutingTotal.Inc() t.complete(bq, &remoteexecution.ExecuteResponse{ Status: status.Newf(codes.Unavailable, "Worker %s disappeared while task was executing", workerKey).Proto(), }, false) @@ -1603,16 +1767,15 @@ func (scq *sizeClassQueue) getOrCreateInvocation(bq *InMemoryBuildQueue, invocat // with zero. func (scq *sizeClassQueue) incrementInvocationsCreatedTotal(depth int) { if len(scq.invocationsMetrics) == depth { - instanceNamePrefix, platformStr, sizeClassStr := scq.platformQueue.getSizeClassQueueLabels(scq.sizeClass) depthStr := strconv.FormatInt(int64(depth), 10) scq.invocationsMetrics = append( scq.invocationsMetrics, invocationsMetrics{ - createdTotal: inMemoryBuildQueueInvocationsCreatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - activatedTotal: inMemoryBuildQueueInvocationsActivatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - deactivatedTotal: inMemoryBuildQueueInvocationsDeactivatedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), - removedTotal: inMemoryBuildQueueInvocationsRemovedTotal.WithLabelValues(instanceNamePrefix, platformStr, sizeClassStr, depthStr), + createdTotal: scq.metrics.invocationsCreatedTotal.WithLabelValues(depthStr), + activatedTotal: scq.metrics.invocationsActivatedTotal.WithLabelValues(depthStr), + deactivatedTotal: scq.metrics.invocationsDeactivatedTotal.WithLabelValues(depthStr), + removedTotal: scq.metrics.invocationsRemovedTotal.WithLabelValues(depthStr), }) } @@ -1621,7 +1784,7 @@ func (scq *sizeClassQueue) incrementInvocationsCreatedTotal(depth int) { func (scq *sizeClassQueue) markWorkerTerminating(w *worker) { if !w.terminating { - scq.workersTerminatingTotal.Inc() + scq.metrics.workersTerminatingTotal.Inc() w.terminating = true } } @@ -2385,7 +2548,7 @@ func (t *task) schedule(bq *InMemoryBuildQueue) { // TODO: Do we want to provide a histogram // on how far the new invocation is removed // from the original one? - t.registerQueuedStageStarted(bq, &scq.tasksScheduledWorker) + t.registerQueuedStageStarted(bq, &scq.metrics.tasksScheduledWorker) i.idleSynchronizingWorkers[0].worker.assignUnqueuedTaskAndWakeUp(bq, t, 0) return } @@ -2397,7 +2560,7 @@ func (t *task) schedule(bq *InMemoryBuildQueue) { // Queue the operation, so that workers // can pick it up when they become // available. - t.registerQueuedStageStarted(bq, &scq.tasksScheduledQueue) + t.registerQueuedStageStarted(bq, &scq.metrics.tasksScheduledQueue) for _, o := range t.operations { o.enqueue() } @@ -2597,7 +2760,7 @@ func (t *task) registerQueuedStageStarted(bq *InMemoryBuildQueue, tasksScheduled // task finishing the QUEUED stage. func (t *task) registerQueuedStageFinished(bq *InMemoryBuildQueue) { scq := t.getCurrentSizeClassQueue() - scq.tasksQueuedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksQueuedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) t.currentStageStartTime = bq.now } @@ -2605,8 +2768,8 @@ func (t *task) registerQueuedStageFinished(bq *InMemoryBuildQueue) { // the task finishing the EXECUTING stage. func (t *task) registerExecutingStageFinished(bq *InMemoryBuildQueue, result, grpcCode string) { scq := t.getCurrentSizeClassQueue() - scq.tasksExecutingDurationSeconds.WithLabelValues(result, grpcCode).Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) - scq.tasksExecutingRetries.WithLabelValues(result, grpcCode).Observe(float64(t.retryCount)) + scq.metrics.tasksExecutingDurationSeconds.WithLabelValues(result, grpcCode).Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksExecutingRetries.WithLabelValues(result, grpcCode).Observe(float64(t.retryCount)) t.currentStageStartTime = bq.now } @@ -2614,7 +2777,7 @@ func (t *task) registerExecutingStageFinished(bq *InMemoryBuildQueue, result, gr // the task finishing the COMPLETED stage, meaning the task got removed. func (t *task) registerCompletedStageFinished(bq *InMemoryBuildQueue) { scq := t.getCurrentSizeClassQueue() - scq.tasksCompletedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) + scq.metrics.tasksCompletedDurationSeconds.Observe(bq.now.Sub(t.currentStageStartTime).Seconds()) t.currentStageStartTime = bq.now } @@ -2776,7 +2939,7 @@ func (w *worker) assignNextQueuedTask(bq *InMemoryBuildQueue, scq *sizeClassQueu // One or more operations are enqueued in this // invocation directly. Pick the most preferable // operation. - scq.workerInvocationStickinessRetained.Observe(float64(stickinessRetained)) + scq.metrics.workerInvocationStickinessRetained.Observe(float64(stickinessRetained)) w.assignQueuedTask(bq, i.queuedOperations[0].task, stickinessRetained) return true } else if len(i.queuedChildren) > 0 { From 7666943c20120306c0b335a9dc55867556def93d Mon Sep 17 00:00:00 2001 From: Alex Novak Date: Tue, 8 Oct 2024 16:17:15 -0400 Subject: [PATCH 2/4] Slightly less boilerplate --- pkg/scheduler/in_memory_build_queue.go | 162 ++++++++++--------------- 1 file changed, 63 insertions(+), 99 deletions(-) diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index 9173c1c..c0f479c 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -48,7 +48,7 @@ var ( schedulerRegistry = prometheus.NewRegistry() - inMemoryBuildQueueInFlightDeduplicationsTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueInFlightDeduplicationsTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -58,7 +58,7 @@ var ( []string{"outcome"}, } - inMemoryBuildQueueInvocationsCreatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueInvocationsCreatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -68,7 +68,7 @@ var ( []string{"depth"}, } - inMemoryBuildQueueInvocationsActivatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueInvocationsActivatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -78,7 +78,7 @@ var ( []string{"depth"}, } - inMemoryBuildQueueInvocationsDeactivatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueInvocationsDeactivatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -88,7 +88,7 @@ var ( []string{"depth"}, } - inMemoryBuildQueueInvocationsRemovedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueInvocationsRemovedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -98,7 +98,7 @@ var ( []string{"depth"}, } - inMemoryBuildQueueTasksScheduledTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueTasksScheduledTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -108,7 +108,7 @@ var ( []string{"assignment", "do_not_cache"}, } - inMemoryBuildQueueTasksQueuedDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ + inMemoryBuildQueueTasksQueuedDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -119,7 +119,7 @@ var ( []string{}, } - inMemoryBuildQueueTasksExecutingDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ + inMemoryBuildQueueTasksExecutingDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -130,7 +130,7 @@ var ( []string{"result", "grpc_code"}, } - inMemoryBuildQueueTasksExecutingRetriesTemplate = metricTemplate[prometheus.HistogramOpts]{ + inMemoryBuildQueueTasksExecutingRetriesTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -141,7 +141,7 @@ var ( []string{"result", "grpc_code"}, } - inMemoryBuildQueueTasksCompletedDurationSecondsTemplate = metricTemplate[prometheus.HistogramOpts]{ + inMemoryBuildQueueTasksCompletedDurationSecondsTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -152,7 +152,7 @@ var ( []string{}, } - inMemoryBuildQueueWorkersCreatedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueWorkersCreatedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -162,7 +162,7 @@ var ( []string{}, } - inMemoryBuildQueueWorkersTerminatingTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueWorkersTerminatingTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -172,7 +172,7 @@ var ( []string{}, } - inMemoryBuildQueueWorkersRemovedTotalTemplate = metricTemplate[prometheus.CounterOpts]{ + inMemoryBuildQueueWorkersRemovedTotalTemplate = counterTemplate{ prometheus.CounterOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -182,7 +182,7 @@ var ( []string{"state"}, } - inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate = metricTemplate[prometheus.HistogramOpts]{ + inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate = histogramTemplate{ prometheus.HistogramOpts{ Namespace: "buildbarn", Subsystem: "builder", @@ -194,11 +194,44 @@ var ( } ) -type metricTemplate[T prometheus.HistogramOpts | prometheus.CounterOpts] struct { - opts T +// counterTemplate is a utility struct for generating and registering prometheus.CounterVec +// resources from a set of options, and required labels to appear in the resulting vec. +type counterTemplate struct { + opts prometheus.CounterOpts requiredLabels []string } +// createMetric creates an instance of the counter vec, including whatever additional labels are provided. +func (ct *counterTemplate) createMetric(additionalLabels []string) *prometheus.CounterVec { + return prometheus.NewCounterVec(ct.opts, append(ct.requiredLabels, additionalLabels...)) +} + +// createAndRegisterMetric performs the same task as createMetric, but also registers the metric in the provided registry. +func (ct *counterTemplate) createAndRegisterMetric(additionalLabels []string, registry *prometheus.Registry) *prometheus.CounterVec { + metric := ct.createMetric(additionalLabels) + registry.MustRegister(metric) + return metric +} + +// counterTemplate is a utility struct for generating and registering prometheus.HistogramVec +// resources from a set of options, and required labels to appear in the resulting vec. +type histogramTemplate struct { + opts prometheus.HistogramOpts + requiredLabels []string +} + +// createMetric creates an instance of the counter vec, including whatever additional labels are provided. +func (ht *histogramTemplate) createMetric(additionalLabels []string) *prometheus.HistogramVec { + return prometheus.NewHistogramVec(ht.opts, append(ht.requiredLabels, additionalLabels...)) +} + +// createAndRegisterMetric performs the same task as createMetric, but also registers the metric in the provided registry. +func (ht *histogramTemplate) createAndRegisterMetric(additionalLabels []string, registry *prometheus.Registry) *prometheus.HistogramVec { + metric := ht.createMetric(additionalLabels) + registry.MustRegister(metric) + return metric +} + // InMemoryBuildQueueConfiguration contains all the tunable settings of // the InMemoryBuildQueue. type InMemoryBuildQueueConfiguration struct { @@ -1527,89 +1560,20 @@ func newSizeClassQueueMetrics(properties map[string]string) sizeClassQueueMetric registry := prometheus.NewRegistry() schedulerRegistry.MustRegister(registry) - inMemoryBuildQueueInFlightDeduplicationsTotal := prometheus.NewCounterVec( - inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.opts, - append(inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueInFlightDeduplicationsTotal) - - inMemoryBuildQueueInvocationsCreatedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueInvocationsCreatedTotalTemplate.opts, - append(inMemoryBuildQueueInvocationsCreatedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueInvocationsCreatedTotal) - - inMemoryBuildQueueInvocationsActivatedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueInvocationsActivatedTotalTemplate.opts, - append(inMemoryBuildQueueInvocationsActivatedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueInvocationsActivatedTotal) - - inMemoryBuildQueueInvocationsDeactivatedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.opts, - append(inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueInvocationsDeactivatedTotal) - - inMemoryBuildQueueInvocationsRemovedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueInvocationsRemovedTotalTemplate.opts, - append(inMemoryBuildQueueInvocationsRemovedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueInvocationsRemovedTotal) - - inMemoryBuildQueueTasksScheduledTotal := prometheus.NewCounterVec( - inMemoryBuildQueueTasksScheduledTotalTemplate.opts, - append(inMemoryBuildQueueTasksScheduledTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueTasksScheduledTotal) - - inMemoryBuildQueueTasksQueuedDurationSeconds := prometheus.NewHistogramVec( - inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.opts, - append(inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueTasksQueuedDurationSeconds) - - inMemoryBuildQueueTasksExecutingDurationSeconds := prometheus.NewHistogramVec( - inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.opts, - append(inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueTasksExecutingDurationSeconds) - - inMemoryBuildQueueTasksExecutingRetries := prometheus.NewHistogramVec( - inMemoryBuildQueueTasksExecutingRetriesTemplate.opts, - append(inMemoryBuildQueueTasksExecutingRetriesTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueTasksExecutingRetries) - - inMemoryBuildQueueTasksCompletedDurationSeconds := prometheus.NewHistogramVec( - inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.opts, - append(inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueTasksCompletedDurationSeconds) - - inMemoryBuildQueueWorkersCreatedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueWorkersCreatedTotalTemplate.opts, - append(inMemoryBuildQueueWorkersCreatedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueWorkersCreatedTotal) - - inMemoryBuildQueueWorkersTerminatingTotal := prometheus.NewCounterVec( - inMemoryBuildQueueWorkersTerminatingTotalTemplate.opts, - append(inMemoryBuildQueueWorkersTerminatingTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueWorkersTerminatingTotal) - - inMemoryBuildQueueWorkersRemovedTotal := prometheus.NewCounterVec( - inMemoryBuildQueueWorkersRemovedTotalTemplate.opts, - append(inMemoryBuildQueueWorkersRemovedTotalTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueWorkersRemovedTotal) - - inMemoryBuildQueueWorkerInvocationStickinessRetained := prometheus.NewHistogramVec( - inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.opts, - append(inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.requiredLabels, propertyKeys...), - ) - registry.MustRegister(inMemoryBuildQueueWorkerInvocationStickinessRetained) + inMemoryBuildQueueInFlightDeduplicationsTotal := inMemoryBuildQueueInFlightDeduplicationsTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsCreatedTotal := inMemoryBuildQueueInvocationsCreatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsActivatedTotal := inMemoryBuildQueueInvocationsActivatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsDeactivatedTotal := inMemoryBuildQueueInvocationsDeactivatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueInvocationsRemovedTotal := inMemoryBuildQueueInvocationsRemovedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksQueuedDurationSeconds := inMemoryBuildQueueTasksQueuedDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksExecutingDurationSeconds := inMemoryBuildQueueTasksExecutingDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksExecutingRetries := inMemoryBuildQueueTasksExecutingRetriesTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueTasksCompletedDurationSeconds := inMemoryBuildQueueTasksCompletedDurationSecondsTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersCreatedTotal := inMemoryBuildQueueWorkersCreatedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersTerminatingTotal := inMemoryBuildQueueWorkersTerminatingTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkersRemovedTotal := inMemoryBuildQueueWorkersRemovedTotalTemplate.createAndRegisterMetric(propertyKeys, registry) + inMemoryBuildQueueWorkerInvocationStickinessRetained := inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.createAndRegisterMetric(propertyKeys, registry) tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(properties) return sizeClassQueueMetrics{ From f2881eb3b01b9ef5b4ce8f7d6707e249595cc881 Mon Sep 17 00:00:00 2001 From: Alex Novak Date: Tue, 8 Oct 2024 16:49:53 -0400 Subject: [PATCH 3/4] A little cleaner --- pkg/scheduler/in_memory_build_queue.go | 32 ++++++++++++-------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index c0f479c..aebece9 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -5,7 +5,9 @@ import ( "context" "encoding/json" "fmt" + "maps" "math" + "slices" "sort" "strconv" "sync" @@ -376,13 +378,19 @@ var ( _ buildqueuestate.BuildQueueStateServer = (*InMemoryBuildQueue)(nil) ) +// getPlatformProperties returns the list of platform properties from a remoteexecution.Platform +// as a map. Multiple instances of the same property name are concatenated with a comma delimeter. func getPlatformProperties(platform *remoteexecution.Platform) map[string]string { if platform == nil { return make(map[string]string, 0) } properties := make(map[string]string, len(platform.Properties)) for _, property := range platform.Properties { - properties[property.Name] = property.Value + if currVal, ok := properties[property.Name]; ok { + properties[property.Name] = fmt.Sprintf("%s,%s", currVal, property.Value) + } else { + properties[property.Name] = property.Value + } } return properties } @@ -1541,22 +1549,7 @@ type sizeClassQueueMetrics struct { // newly created metrics with the associated regisry. The registry should be deregistered from // the global registry if the sizeclass is ever deleted. func newSizeClassQueueMetrics(properties map[string]string) sizeClassQueueMetrics { - propertyKeys := make([]string, 0, len(properties)) - for k := range properties { - propertyKeys = append(propertyKeys, k) - } - - update := func(base, updates map[string]string) prometheus.Labels { - ret := make(map[string]string, len(base)) - for k, v := range base { - ret[k] = v - } - for k, v := range updates { - ret[k] = v - } - return ret - } - + propertyKeys := slices.Collect[string](maps.Keys(properties)) registry := prometheus.NewRegistry() schedulerRegistry.MustRegister(registry) @@ -1576,6 +1569,11 @@ func newSizeClassQueueMetrics(properties map[string]string) sizeClassQueueMetric inMemoryBuildQueueWorkerInvocationStickinessRetained := inMemoryBuildQueueWorkerInvocationStickinessRetainedTemplate.createAndRegisterMetric(propertyKeys, registry) tasksScheduledTotal := inMemoryBuildQueueTasksScheduledTotal.MustCurryWith(properties) + update := func(base, updates map[string]string) map[string]string { + clone := maps.Clone(base) + maps.Insert(clone, maps.All(updates)) + return clone + } return sizeClassQueueMetrics{ registry: registry, inFlightDeduplicationsSameInvocation: inMemoryBuildQueueInFlightDeduplicationsTotal.With(update(properties, map[string]string{"outcome": "SameInvocation"})), From cc1d2beac841edc5ab3b592b2a130bb616178e29 Mon Sep 17 00:00:00 2001 From: Alex Novak Date: Tue, 8 Oct 2024 17:07:56 -0400 Subject: [PATCH 4/4] One more fix --- pkg/scheduler/in_memory_build_queue.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index aebece9..e90f28e 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -1446,7 +1446,10 @@ func (pq *platformQueue) addSizeClassQueue(bq *InMemoryBuildQueue, sizeClass uin "size_class": sizeClassStr, } for p, pv := range pq.properties { - platformLabels[p] = pv + // do not overwrite existing labels, in case of weirdly named properties. + if _, ok := platformLabels[p]; !ok { + platformLabels[p] = pv + } } metrics := newSizeClassQueueMetrics(platformLabels) scq := &sizeClassQueue{