diff --git a/cmd/seed/fixtures/fixture_service.go b/cmd/seed/fixtures/fixture_service.go index ba0c692..1c8cd32 100644 --- a/cmd/seed/fixtures/fixture_service.go +++ b/cmd/seed/fixtures/fixture_service.go @@ -109,7 +109,7 @@ func (o *FixtureService) CreateDefaultTask(ctx context.Context, title string, ex db.Task.Type.Set(db.TaskTypeCodeGeneration), db.Task.TaskData.Set(types.JSON(taskDataJSON)), db.Task.Status.Set(db.TaskStatusInProgress), - db.Task.MaxResults.Set(10), + db.Task.MaxResults.Set(1), db.Task.NumResults.Set(0), db.Task.NumCriteria.Set(0), db.Task.TotalReward.Set(101.0), diff --git a/cmd/seed/task_data.json b/cmd/seed/task_data.json index 5e79af1..914484d 100644 --- a/cmd/seed/task_data.json +++ b/cmd/seed/task_data.json @@ -6,17 +6,12 @@ "max": 100, "min": 1, "type": "multi-score", - "options": [ - "459520f8-b654-41a8-9f94-93e003c4ecb5", - "024ffca6-aac0-40d2-959b-1ab7b165b68e", - "4fa05b97-cd66-4df7-bb3a-298d2b04b148", - "1b26f82c-935f-432b-a60d-f21204e580f2" - ] + "options": ["1", "2", "3", "4"] } ], "responses": [ { - "model": "459520f8-b654-41a8-9f94-93e003c4ecb5", + "model": "1", "completion": { "files": [ { @@ -29,7 +24,7 @@ } }, { - "model": "024ffca6-aac0-40d2-959b-1ab7b165b68e", + "model": "2", "completion": { "files": [ { @@ -42,7 +37,7 @@ } }, { - "model": "4fa05b97-cd66-4df7-bb3a-298d2b04b148", + "model": "3", "completion": { "files": [ { @@ -55,7 +50,7 @@ } }, { - "model": "1b26f82c-935f-432b-a60d-f21204e580f2", + "model": "4", "completion": { "files": [ { diff --git a/pkg/api/controllers.go b/pkg/api/controllers.go index 4d8ac65..e926ee9 100644 --- a/pkg/api/controllers.go +++ b/pkg/api/controllers.go @@ -305,6 +305,10 @@ func SubmitTaskResultController(c *gin.Context) { return } + // Remove from cache + cacheInstance := cache.GetCacheInstance() + cacheInstance.Redis.Del(ctx, cache.BuildCacheKey(cache.TaskResultByWorker, worker.ID)) + // Update the metric data with goroutine handleMetricData(taskData, updatedTask) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 8b30d64..a6d9d5b 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string { // TODO - Need to check if this is the correct way without getting spoofing if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" { callerIp := c.Request.Header.Get("X-Original-Forwarded-For") - log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) + log.Debug().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) return callerIp } callerIp := c.ClientIP() - log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp) + log.Debug().Msgf("Got caller IP from ClientIP: %s", callerIp) return callerIp } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 928e3a1..6d4872a 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -41,8 +41,9 @@ const ( TasksByWorker CacheKey = "task:worker" // List of tasks by worker // Task Result cache keys - TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID - TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID + TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID + TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID + TaskResultsTotal CacheKey = "metrics:task_results:total" // Total task results count // Worker cache keys WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address diff --git a/pkg/metric/metrics_service.go b/pkg/metric/metrics_service.go index 7f8d517..2e1f5fa 100644 --- a/pkg/metric/metrics_service.go +++ b/pkg/metric/metrics_service.go @@ -56,16 +56,18 @@ func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context } func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error { - cache := cache.GetCacheInstance() + cacheInstance := cache.GetCacheInstance() metricORM := orm.NewMetricsORM() - cacheKey := "metrics:task_results:total" + cacheKey := string(cache.TaskResultsTotal) // Try to get current count from Redis - _, err := cache.Redis.Get(ctx, cacheKey).Int64() + currentCount, err := cacheInstance.Redis.Get(ctx, cacheKey).Int64() + log.Info().Int64("CurrentCount", currentCount).Msg("Current count") if err == redis.Nil { // Key doesn't exist (e.g., after Redis restart) // Get the last metric from database lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumTaskResults) + log.Info().Interface("LastMetric", lastMetric).Msg("Last metric") if err != nil && !db.IsErrNotFound(err) { return err } @@ -78,7 +80,7 @@ func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Cont } currentCount := int64(lastMetricData.TotalNumTasksResults) // Set the Redis counter to last known value - if err := cache.Redis.Set(ctx, cacheKey, currentCount, 0).Err(); err != nil { + if err := cacheInstance.Redis.Set(ctx, cacheKey, currentCount, 0).Err(); err != nil { return err } } @@ -87,7 +89,7 @@ func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Cont } // Increment the counter - count, err := cache.Redis.Incr(ctx, cacheKey).Result() + count, err := cacheInstance.Redis.Incr(ctx, cacheKey).Result() if err != nil { log.Error().Err(err).Msg("Failed to increment task results count") return err diff --git a/pkg/orm/task.go b/pkg/orm/task.go index b702e5d..cd01a0a 100644 --- a/pkg/orm/task.go +++ b/pkg/orm/task.go @@ -83,27 +83,6 @@ func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, er // Modified GetTasksByWorkerSubscription with caching func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId string, offset, limit int, sortQuery db.TaskOrderByParam, taskTypes []db.TaskType) ([]db.TaskModel, int, error) { - // Convert TaskTypes to strings - typeStrs := make([]string, len(taskTypes)) - for i, t := range taskTypes { - typeStrs[i] = string(t) - } - cacheKey := cache.BuildCacheKey(cache.TasksByWorker, workerId, strconv.Itoa(offset), strconv.Itoa(limit), strings.Join(typeStrs, ",")) - - var tasks []db.TaskModel - cache := cache.GetCacheInstance() - - // Try to get from cache first - if err := cache.GetCacheValue(cacheKey, &tasks); err == nil { - totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, nil) - if err != nil { - log.Error().Err(err).Msgf("Error fetching total tasks for worker ID %v", workerId) - return tasks, 0, err - } - return tasks, totalTasks, nil - } - - // Cache miss, proceed with database query o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() @@ -140,7 +119,7 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str filterParams = append(filterParams, db.Task.Type.In(taskTypes)) } - tasks, err = o.dbClient.Task.FindMany( + tasks, err := o.dbClient.Task.FindMany( filterParams..., ).OrderBy(sortQuery). Skip(offset). @@ -159,11 +138,6 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str log.Info().Int("totalTasks", totalTasks).Msgf("Successfully fetched total tasks fetched for worker ID %v", workerId) - // Store in cache - if err := cache.SetCacheValue(cacheKey, tasks); err != nil { - log.Warn().Err(err).Msg("Failed to set cache") - } - return tasks, totalTasks, nil } diff --git a/pkg/orm/task_result.go b/pkg/orm/task_result.go index 9b7d9d3..be7d18c 100644 --- a/pkg/orm/task_result.go +++ b/pkg/orm/task_result.go @@ -45,17 +45,6 @@ func (t *TaskResultORM) GetTaskResultsByTaskId(ctx context.Context, taskId strin } func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) { - cacheKey := cache.BuildCacheKey(cache.TaskResultByTaskAndWorker, taskId, workerId) - - var results []db.TaskResultModel - cacheInstance := cache.GetCacheInstance() - - // Try to get from cache - if err := cacheInstance.GetCacheValue(cacheKey, &results); err == nil { - return results, nil - } - - // Cache miss, fetch from database t.clientWrapper.BeforeQuery() defer t.clientWrapper.AfterQuery() @@ -68,11 +57,6 @@ func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, return nil, err } - // Set cache - if err := cacheInstance.SetCacheValue(cacheKey, results); err != nil { - log.Warn().Err(err).Msg("Failed to set cache") - } - return results, nil }