diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 19dff5c..48ac2b7 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -45,7 +45,7 @@ type CacheKeys struct { TaskResultByTaskAndWorker CacheKey TaskResultByWorker CacheKey TaskResultsTotal CacheKey - + CompletedTasksTotal CacheKey // Worker cache keys WorkerByWallet CacheKey WorkerCount CacheKey @@ -65,6 +65,7 @@ var cacheKeys = CacheKeys{ TaskResultByTaskAndWorker: "tr:task:worker", TaskResultByWorker: "tr:worker", TaskResultsTotal: "metrics:tr:total", + CompletedTasksTotal: "metrics:completed_tasks:total", // Worker cache keys WorkerByWallet: "worker:wallet", diff --git a/pkg/metric/metrics_service.go b/pkg/metric/metrics_service.go index bc95328..27372d8 100644 --- a/pkg/metric/metrics_service.go +++ b/pkg/metric/metrics_service.go @@ -40,19 +40,51 @@ func (metricService *MetricService) UpdateDojoWorkerCount(ctx context.Context) e } func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context) error { - taskORM := orm.NewTaskORM() + cache := cache.GetCacheInstance() + cacheKey := string(cache.Keys.CompletedTasksTotal) metricORM := orm.NewMetricsORM() - completedTasksCount, err := taskORM.GetCompletedTaskCount(ctx) + // Try to get current count from Redis + currentCount, err := cache.Redis.Get(ctx, cacheKey).Int64() + log.Info().Int64("CompletedTasksCount", currentCount).Msg("Current completed tasks count") + if err == redis.Nil { // Key doesn't exist + // Get the last metric from database + lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumCompletedTasks) + if err != nil && !db.IsErrNotFound(err) { + return err + } + + // Initialize Redis counter with last known value from database + var initialCount int64 = 0 + if lastMetric != nil { + var lastMetricData MetricCompletedTasksCount + if err := json.Unmarshal(lastMetric.MetricsData, &lastMetricData); err != nil { + return err + } + log.Info().Interface("LastMetricData", lastMetricData).Msg("Last Completed Tasks Count in Metrics") + initialCount = int64(lastMetricData.TotalNumCompletedTasks) + } + + if err := cache.Redis.Set(ctx, cacheKey, initialCount, 0).Err(); err != nil { + return err + } + log.Info().Int64("initial_count", initialCount).Msg("Initialized completed tasks counter") + } else if err != nil { + return err + } + + // Increment the counter + count, err := cache.Redis.Incr(ctx, cacheKey).Result() if err != nil { - log.Error().Err(err).Msg("Failed to get completed tasks") + log.Error().Err(err).Msg("Failed to increment completed tasks count") return err } - newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: completedTasksCount} - log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed task count metric") - err = metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData) - return err + // Store in database + newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: int(count)} + log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed tasks count metric") + + return metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData) } func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error {