Skip to content

Commit

Permalink
fix: fixed from PR feedback
Browse files Browse the repository at this point in the history
- added some changes for PR feedback
- removed cache on GetTasksByWorkerSubscription to fix task list not properly updated
  • Loading branch information
codebender37 committed Dec 5, 2024
1 parent 080de18 commit 4d3c375
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cmd/seed/fixtures/fixture_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (o *FixtureService) CreateDefaultTask(ctx context.Context, title string, ex
db.Task.Type.Set(db.TaskTypeCodeGeneration),
db.Task.TaskData.Set(types.JSON(taskDataJSON)),
db.Task.Status.Set(db.TaskStatusInProgress),
db.Task.MaxResults.Set(10),
db.Task.MaxResults.Set(1),
db.Task.NumResults.Set(0),
db.Task.NumCriteria.Set(0),
db.Task.TotalReward.Set(101.0),
Expand Down
15 changes: 5 additions & 10 deletions cmd/seed/task_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
"max": 100,
"min": 1,
"type": "multi-score",
"options": [
"459520f8-b654-41a8-9f94-93e003c4ecb5",
"024ffca6-aac0-40d2-959b-1ab7b165b68e",
"4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"1b26f82c-935f-432b-a60d-f21204e580f2"
]
"options": ["1", "2", "3", "4"]
}
],
"responses": [
{
"model": "459520f8-b654-41a8-9f94-93e003c4ecb5",
"model": "1",
"completion": {
"files": [
{
Expand All @@ -29,7 +24,7 @@
}
},
{
"model": "024ffca6-aac0-40d2-959b-1ab7b165b68e",
"model": "2",
"completion": {
"files": [
{
Expand All @@ -42,7 +37,7 @@
}
},
{
"model": "4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"model": "3",
"completion": {
"files": [
{
Expand All @@ -55,7 +50,7 @@
}
},
{
"model": "1b26f82c-935f-432b-a60d-f21204e580f2",
"model": "4",
"completion": {
"files": [
{
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func SubmitTaskResultController(c *gin.Context) {
return
}

// Remove from cache
cacheInstance := cache.GetCacheInstance()
cacheInstance.Redis.Del(ctx, cache.BuildCacheKey(cache.TaskResultByWorker, worker.ID))

// Update the metric data with goroutine
handleMetricData(taskData, updatedTask)

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string {
// TODO - Need to check if this is the correct way without getting spoofing
if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" {
callerIp := c.Request.Header.Get("X-Original-Forwarded-For")
log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
log.Debug().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
return callerIp
}
callerIp := c.ClientIP()
log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp)
log.Debug().Msgf("Got caller IP from ClientIP: %s", callerIp)
return callerIp
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ const (
TasksByWorker CacheKey = "task:worker" // List of tasks by worker

// Task Result cache keys
TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID
TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID
TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID
TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID
TaskResultsTotal CacheKey = "metrics:task_results:total" // Total task results count

// Worker cache keys
WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address
Expand Down
12 changes: 7 additions & 5 deletions pkg/metric/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,18 @@ func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context
}

func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error {
cache := cache.GetCacheInstance()
cacheInstance := cache.GetCacheInstance()
metricORM := orm.NewMetricsORM()

cacheKey := "metrics:task_results:total"
cacheKey := string(cache.TaskResultsTotal)

// Try to get current count from Redis
_, err := cache.Redis.Get(ctx, cacheKey).Int64()
currentCount, err := cacheInstance.Redis.Get(ctx, cacheKey).Int64()
log.Info().Int64("CurrentCount", currentCount).Msg("Current count")
if err == redis.Nil { // Key doesn't exist (e.g., after Redis restart)
// Get the last metric from database
lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumTaskResults)
log.Info().Interface("LastMetric", lastMetric).Msg("Last metric")
if err != nil && !db.IsErrNotFound(err) {
return err
}
Expand All @@ -78,7 +80,7 @@ func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Cont
}
currentCount := int64(lastMetricData.TotalNumTasksResults)
// Set the Redis counter to last known value
if err := cache.Redis.Set(ctx, cacheKey, currentCount, 0).Err(); err != nil {
if err := cacheInstance.Redis.Set(ctx, cacheKey, currentCount, 0).Err(); err != nil {
return err
}
}
Expand All @@ -87,7 +89,7 @@ func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Cont
}

// Increment the counter
count, err := cache.Redis.Incr(ctx, cacheKey).Result()
count, err := cacheInstance.Redis.Incr(ctx, cacheKey).Result()
if err != nil {
log.Error().Err(err).Msg("Failed to increment task results count")
return err
Expand Down
28 changes: 1 addition & 27 deletions pkg/orm/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,6 @@ func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, er

// Modified GetTasksByWorkerSubscription with caching
func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId string, offset, limit int, sortQuery db.TaskOrderByParam, taskTypes []db.TaskType) ([]db.TaskModel, int, error) {
// Convert TaskTypes to strings
typeStrs := make([]string, len(taskTypes))
for i, t := range taskTypes {
typeStrs[i] = string(t)
}
cacheKey := cache.BuildCacheKey(cache.TasksByWorker, workerId, strconv.Itoa(offset), strconv.Itoa(limit), strings.Join(typeStrs, ","))

var tasks []db.TaskModel
cache := cache.GetCacheInstance()

// Try to get from cache first
if err := cache.GetCacheValue(cacheKey, &tasks); err == nil {
totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, nil)
if err != nil {
log.Error().Err(err).Msgf("Error fetching total tasks for worker ID %v", workerId)
return tasks, 0, err
}
return tasks, totalTasks, nil
}

// Cache miss, proceed with database query
o.clientWrapper.BeforeQuery()
defer o.clientWrapper.AfterQuery()

Expand Down Expand Up @@ -140,7 +119,7 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str
filterParams = append(filterParams, db.Task.Type.In(taskTypes))
}

tasks, err = o.dbClient.Task.FindMany(
tasks, err := o.dbClient.Task.FindMany(
filterParams...,
).OrderBy(sortQuery).
Skip(offset).
Expand All @@ -159,11 +138,6 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str

log.Info().Int("totalTasks", totalTasks).Msgf("Successfully fetched total tasks fetched for worker ID %v", workerId)

// Store in cache
if err := cache.SetCacheValue(cacheKey, tasks); err != nil {
log.Warn().Err(err).Msg("Failed to set cache")
}

return tasks, totalTasks, nil
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/orm/task_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,6 @@ func (t *TaskResultORM) GetTaskResultsByTaskId(ctx context.Context, taskId strin
}

func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) {
cacheKey := cache.BuildCacheKey(cache.TaskResultByTaskAndWorker, taskId, workerId)

var results []db.TaskResultModel
cacheInstance := cache.GetCacheInstance()

// Try to get from cache
if err := cacheInstance.GetCacheValue(cacheKey, &results); err == nil {
return results, nil
}

// Cache miss, fetch from database
t.clientWrapper.BeforeQuery()
defer t.clientWrapper.AfterQuery()

Expand All @@ -68,11 +57,6 @@ func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context,
return nil, err
}

// Set cache
if err := cacheInstance.SetCacheValue(cacheKey, results); err != nil {
log.Warn().Err(err).Msg("Failed to set cache")
}

return results, nil
}

Expand Down

0 comments on commit 4d3c375

Please sign in to comment.