Skip to content

Commit

Permalink
refactor: updated UpdateCompletedTaskCount func to cumulate completed…
Browse files Browse the repository at this point in the history
… tasks count
  • Loading branch information
codebender37 committed Dec 23, 2024
1 parent 5141e1f commit 240161f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type CacheKeys struct {
TaskResultByTaskAndWorker CacheKey
TaskResultByWorker CacheKey
TaskResultsTotal CacheKey

CompletedTasksTotal CacheKey
// Worker cache keys
WorkerByWallet CacheKey
WorkerCount CacheKey
Expand All @@ -65,6 +65,7 @@ var cacheKeys = CacheKeys{
TaskResultByTaskAndWorker: "tr:task:worker",
TaskResultByWorker: "tr:worker",
TaskResultsTotal: "metrics:tr:total",
CompletedTasksTotal: "metrics:completed_tasks:total",

// Worker cache keys
WorkerByWallet: "worker:wallet",
Expand Down
46 changes: 39 additions & 7 deletions pkg/metric/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,51 @@ func (metricService *MetricService) UpdateDojoWorkerCount(ctx context.Context) e
}

func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context) error {
taskORM := orm.NewTaskORM()
cache := cache.GetCacheInstance()
cacheKey := string(cache.Keys.CompletedTasksTotal)
metricORM := orm.NewMetricsORM()

completedTasksCount, err := taskORM.GetCompletedTaskCount(ctx)
// Try to get current count from Redis
currentCount, err := cache.Redis.Get(ctx, cacheKey).Int64()
log.Info().Int64("CompletedTasksCount", currentCount).Msg("Current completed tasks count")
if err == redis.Nil { // Key doesn't exist
// Get the last metric from database
lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumCompletedTasks)
if err != nil && !db.IsErrNotFound(err) {
return err
}

// Initialize Redis counter with last known value from database
var initialCount int64 = 0
if lastMetric != nil {
var lastMetricData MetricCompletedTasksCount
if err := json.Unmarshal(lastMetric.MetricsData, &lastMetricData); err != nil {
return err
}
log.Info().Interface("LastMetricData", lastMetricData).Msg("Last Completed Tasks Count in Metrics")
initialCount = int64(lastMetricData.TotalNumCompletedTasks)
}

if err := cache.Redis.Set(ctx, cacheKey, initialCount, 0).Err(); err != nil {
return err
}
log.Info().Int64("initial_count", initialCount).Msg("Initialized completed tasks counter")
} else if err != nil {
return err
}

// Increment the counter
count, err := cache.Redis.Incr(ctx, cacheKey).Result()
if err != nil {
log.Error().Err(err).Msg("Failed to get completed tasks")
log.Error().Err(err).Msg("Failed to increment completed tasks count")
return err
}
newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: completedTasksCount}
log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed task count metric")

err = metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData)
return err
// Store in database
newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: int(count)}
log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed tasks count metric")

return metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData)
}

func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error {
Expand Down

0 comments on commit 240161f

Please sign in to comment.