Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: updated UpdateCompletedTaskCount func to cumulate completed tasks count #74

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type CacheKeys struct {
TaskResultByTaskAndWorker CacheKey
TaskResultByWorker CacheKey
TaskResultsTotal CacheKey

CompletedTasksTotal CacheKey
// Worker cache keys
WorkerByWallet CacheKey
WorkerCount CacheKey
Expand All @@ -65,6 +65,7 @@ var cacheKeys = CacheKeys{
TaskResultByTaskAndWorker: "tr:task:worker",
TaskResultByWorker: "tr:worker",
TaskResultsTotal: "metrics:tr:total",
CompletedTasksTotal: "metrics:completed_tasks:total",

// Worker cache keys
WorkerByWallet: "worker:wallet",
Expand Down
46 changes: 39 additions & 7 deletions pkg/metric/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,51 @@ func (metricService *MetricService) UpdateDojoWorkerCount(ctx context.Context) e
}

func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context) error {
taskORM := orm.NewTaskORM()
cache := cache.GetCacheInstance()
cacheKey := string(cache.Keys.CompletedTasksTotal)
metricORM := orm.NewMetricsORM()

completedTasksCount, err := taskORM.GetCompletedTaskCount(ctx)
// Try to get current count from Redis
currentCount, err := cache.Redis.Get(ctx, cacheKey).Int64()
log.Info().Int64("CompletedTasksCount", currentCount).Msg("Current completed tasks count")
if err == redis.Nil { // Key doesn't exist
// Get the last metric from database
lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumCompletedTasks)
if err != nil && !db.IsErrNotFound(err) {
return err
}

// Initialize Redis counter with last known value from database
var initialCount int64 = 0
if lastMetric != nil {
var lastMetricData MetricCompletedTasksCount
if err := json.Unmarshal(lastMetric.MetricsData, &lastMetricData); err != nil {
return err
}
log.Info().Interface("LastMetricData", lastMetricData).Msg("Last Completed Tasks Count in Metrics")
initialCount = int64(lastMetricData.TotalNumCompletedTasks)
}

if err := cache.Redis.Set(ctx, cacheKey, initialCount, 0).Err(); err != nil {
return err
}
log.Info().Int64("initial_count", initialCount).Msg("Initialized completed tasks counter")
} else if err != nil {
return err
}

// Increment the counter
count, err := cache.Redis.Incr(ctx, cacheKey).Result()
if err != nil {
log.Error().Err(err).Msg("Failed to get completed tasks")
log.Error().Err(err).Msg("Failed to increment completed tasks count")
return err
}
newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: completedTasksCount}
log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed task count metric")

err = metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData)
return err
// Store in database
newMetricData := MetricCompletedTasksCount{TotalNumCompletedTasks: int(count)}
log.Info().Interface("CompletedTaskCount", newMetricData).Msg("Updating completed tasks count metric")

return metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumCompletedTasks, newMetricData)
}

func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error {
Expand Down