From f2654a6b259b9d0852a3c0c423a52b96166dc4fa Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Wed, 11 Dec 2024 10:29:48 +0700 Subject: [PATCH] refactor: update metric api to cumulate task count after db purge (#60) * refactor: update metric api to cumulate task count after db purge - increment the task result count everytime the api is called instead of fetch from TaskResult table - use Redis INCR for atomic increments that work across all instances * fix: fixed from PR feedback - added some changes for PR feedback - removed cache on GetTasksByWorkerSubscription to fix task list not properly updated * fix: add more changes for PR, and edges cases - add TasksByWorker cache back that was removed because of inconsistent UI - reset the related cache after successful task sumit, or create - refactor some code in cache.go - add some changes from PR feedback * chore: clean up substrate logs * fix: bug fixed on changing expired time - use UTC on current time when checking on expired task - remove the check for expire task with no result, since schedular handling it already --- cmd/seed/fixtures/fixture_service.go | 2 +- cmd/seed/task_data.json | 15 +-- pkg/api/controllers.go | 16 +++ pkg/api/utils.go | 4 +- pkg/blockchain/subnet_state.go | 2 +- pkg/blockchain/substrate.go | 2 +- pkg/cache/cache.go | 147 ++++++++++++++++++++------- pkg/metric/metrics_service.go | 50 +++++++-- pkg/orm/dojo_worker.go | 5 +- pkg/orm/metrics.go | 3 +- pkg/orm/miner_user.go | 2 +- pkg/orm/subscriptionKey.go | 6 +- pkg/orm/task.go | 50 ++------- pkg/orm/task_result.go | 19 +--- 14 files changed, 192 insertions(+), 131 deletions(-) diff --git a/cmd/seed/fixtures/fixture_service.go b/cmd/seed/fixtures/fixture_service.go index ba0c692..1c8cd32 100644 --- a/cmd/seed/fixtures/fixture_service.go +++ b/cmd/seed/fixtures/fixture_service.go @@ -109,7 +109,7 @@ func (o *FixtureService) CreateDefaultTask(ctx context.Context, title string, ex db.Task.Type.Set(db.TaskTypeCodeGeneration), db.Task.TaskData.Set(types.JSON(taskDataJSON)), db.Task.Status.Set(db.TaskStatusInProgress), - db.Task.MaxResults.Set(10), + db.Task.MaxResults.Set(1), db.Task.NumResults.Set(0), db.Task.NumCriteria.Set(0), db.Task.TotalReward.Set(101.0), diff --git a/cmd/seed/task_data.json b/cmd/seed/task_data.json index 5e79af1..914484d 100644 --- a/cmd/seed/task_data.json +++ b/cmd/seed/task_data.json @@ -6,17 +6,12 @@ "max": 100, "min": 1, "type": "multi-score", - "options": [ - "459520f8-b654-41a8-9f94-93e003c4ecb5", - "024ffca6-aac0-40d2-959b-1ab7b165b68e", - "4fa05b97-cd66-4df7-bb3a-298d2b04b148", - "1b26f82c-935f-432b-a60d-f21204e580f2" - ] + "options": ["1", "2", "3", "4"] } ], "responses": [ { - "model": "459520f8-b654-41a8-9f94-93e003c4ecb5", + "model": "1", "completion": { "files": [ { @@ -29,7 +24,7 @@ } }, { - "model": "024ffca6-aac0-40d2-959b-1ab7b165b68e", + "model": "2", "completion": { "files": [ { @@ -42,7 +37,7 @@ } }, { - "model": "4fa05b97-cd66-4df7-bb3a-298d2b04b148", + "model": "3", "completion": { "files": [ { @@ -55,7 +50,7 @@ } }, { - "model": "1b26f82c-935f-432b-a60d-f21204e580f2", + "model": "4", "completion": { "files": [ { diff --git a/pkg/api/controllers.go b/pkg/api/controllers.go index 4d8ac65..ce36a6a 100644 --- a/pkg/api/controllers.go +++ b/pkg/api/controllers.go @@ -193,6 +193,10 @@ func CreateTasksController(c *gin.Context) { taskIds = append(taskIds, task.ID) } + // Clean up the cache + cache := cache.GetCacheInstance() + cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*") + c.JSON(http.StatusOK, defaultSuccessResponse(taskIds)) } @@ -305,6 +309,15 @@ func SubmitTaskResultController(c *gin.Context) { return } + // Remove from cache + cache := cache.GetCacheInstance() + cache.DeleteWithSuffix(cache.Keys.TaskResultByWorker, worker.ID) + + // Clean all task:worker:* cache entries + if err := cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*"); err != nil { + log.Error().Err(err).Msg("Failed to clean task:worker cache entries") + } + // Update the metric data with goroutine handleMetricData(taskData, updatedTask) @@ -472,6 +485,9 @@ func GetTaskByIdController(c *gin.Context) { taskID := c.Param("task-id") taskService := task.NewTaskService() + // TODO: Remove this after testing + log.Info().Interface("Headers", c.Request.Header).Msg("Request Headers") + task, err := taskService.GetTaskResponseById(c.Request.Context(), taskID) if err != nil { c.JSON(http.StatusInternalServerError, defaultErrorResponse("Internal server error")) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 8b30d64..a6d9d5b 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string { // TODO - Need to check if this is the correct way without getting spoofing if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" { callerIp := c.Request.Header.Get("X-Original-Forwarded-For") - log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) + log.Debug().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) return callerIp } callerIp := c.ClientIP() - log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp) + log.Debug().Msgf("Got caller IP from ClientIP: %s", callerIp) return callerIp } diff --git a/pkg/blockchain/subnet_state.go b/pkg/blockchain/subnet_state.go index 7d2a8b4..2053c74 100644 --- a/pkg/blockchain/subnet_state.go +++ b/pkg/blockchain/subnet_state.go @@ -123,7 +123,7 @@ func (s *SubnetStateSubscriber) OnRegisteredFound(hotkey string) { _, err := minerUserORM.GetUserByHotkey(hotkey) if err != nil { if err == db.ErrNotFound { - log.Info().Msg("User not found, continuing...") + log.Debug().Msg("User not found, continuing...") return } log.Error().Err(err).Msg("Error getting user by hotkey") diff --git a/pkg/blockchain/substrate.go b/pkg/blockchain/substrate.go index 2c27053..a3a1978 100644 --- a/pkg/blockchain/substrate.go +++ b/pkg/blockchain/substrate.go @@ -222,7 +222,7 @@ func (s *SubstrateService) GetAxonInfo(subnetId int, hotkey string) (*AxonInfo, } if storageResponse.Value == nil { - log.Warn().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey) + log.Debug().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey) return nil, errors.New("value is nil") } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 928e3a1..19dff5c 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -24,6 +24,7 @@ type RedisConfig struct { type Cache struct { Redis redis.Client + Keys CacheKeys } var ( @@ -32,54 +33,57 @@ var ( mu sync.Mutex ) -// CacheKey type for type-safe cache keys type CacheKey string -const ( +// CacheKeys holds all cache key constants +type CacheKeys struct { // Task cache keys - TaskById CacheKey = "task" // Single task by ID - TasksByWorker CacheKey = "task:worker" // List of tasks by worker + TaskById CacheKey + TasksByWorker CacheKey // Task Result cache keys - TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID - TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID + TaskResultByTaskAndWorker CacheKey + TaskResultByWorker CacheKey + TaskResultsTotal CacheKey // Worker cache keys - WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address - WorkerCount CacheKey = "worker:count" // Total worker count + WorkerByWallet CacheKey + WorkerCount CacheKey // Subscription cache keys - SubByHotkey CacheKey = "sub:hotkey" // Subscription by hotkey - SubByKey CacheKey = "sub:key" // Subscription by key -) - -// CacheConfig defines cache keys and their expiration times -var CacheConfig = map[CacheKey]time.Duration{ - TaskById: 5 * time.Minute, - TasksByWorker: 2 * time.Minute, - TaskResultByTaskAndWorker: 10 * time.Minute, - TaskResultByWorker: 10 * time.Minute, - WorkerByWallet: 5 * time.Minute, - WorkerCount: 1 * time.Minute, - SubByHotkey: 5 * time.Minute, - SubByKey: 5 * time.Minute, + SubByHotkey CacheKey + SubByKey CacheKey } -// GetCacheExpiration returns the expiration time for a given cache key -func GetCacheExpiration(key CacheKey) time.Duration { - if duration, exists := CacheConfig[key]; exists { - return duration - } - return 5 * time.Minute // default expiration +// Default cache keys +var cacheKeys = CacheKeys{ + // Task cache keys + TaskById: "task", + TasksByWorker: "task:worker", + + // Task Result cache keys + TaskResultByTaskAndWorker: "tr:task:worker", + TaskResultByWorker: "tr:worker", + TaskResultsTotal: "metrics:tr:total", + + // Worker cache keys + WorkerByWallet: "worker:wallet", + WorkerCount: "worker:count", + + // Subscription cache keys + SubByHotkey: "sub:hotkey", + SubByKey: "sub:key", } -// BuildCacheKey builds a cache key with the given prefix and components -func BuildCacheKey(prefix CacheKey, components ...string) string { - key := string(prefix) - for _, component := range components { - key += ":" + component - } - return key +var cacheExpirations = map[CacheKey]time.Duration{ + cacheKeys.TaskById: 5 * time.Minute, + cacheKeys.TasksByWorker: 2 * time.Minute, + cacheKeys.TaskResultByTaskAndWorker: 10 * time.Minute, + cacheKeys.TaskResultByWorker: 10 * time.Minute, + cacheKeys.WorkerByWallet: 5 * time.Minute, + cacheKeys.WorkerCount: 1 * time.Minute, + cacheKeys.SubByHotkey: 5 * time.Minute, + cacheKeys.SubByKey: 5 * time.Minute, } func GetCacheInstance() *Cache { @@ -119,11 +123,31 @@ func GetCacheInstance() *Cache { } log.Info().Msgf("Successfully connected to Redis and ping succeeded") - instance = &Cache{Redis: *redisClient} + instance = &Cache{ + Redis: *redisClient, + Keys: cacheKeys, + } }) return instance } +// GetCacheExpiration returns the expiration time for a given cache key +func (c *Cache) GetCacheExpiration(key CacheKey) time.Duration { + if duration, exists := cacheExpirations[key]; exists { + return duration + } + return 5 * time.Minute // default expiration +} + +// BuildCacheKey builds a cache key with the given prefix and components +func (c *Cache) BuildCacheKey(prefix CacheKey, components ...string) string { + key := string(prefix) + for _, component := range components { + key += ":" + component + } + return key +} + func (c *Cache) SetWithExpire(key string, value interface{}, expiration time.Duration) error { switch v := value.(type) { case string: @@ -148,7 +172,7 @@ func (c *Cache) Get(key string) (string, error) { // val, err := rc.Redis.Do(ctx, rc.Redis.B().Get().Key(key).Build()).AsBytes() val, err := c.Redis.Get(ctx, key).Bytes() if err == redis.Nil { - log.Error().Err(err).Str("key", key).Msg("Key not found in Redis") + log.Debug().Str("key", key).Msg("Key not found in Redis") return "", err } else if err != nil { log.Panic().Err(err).Msg("Failed to get from Redis ...") @@ -179,7 +203,7 @@ func (c *Cache) SetCacheValue(key string, value interface{}) error { return fmt.Errorf("failed to marshal data: %w", err) } - expiration := GetCacheExpiration(CacheKey(key)) + expiration := c.GetCacheExpiration(CacheKey(key)) if err := c.SetWithExpire(key, dataBytes, expiration); err != nil { return fmt.Errorf("failed to set cache: %w", err) } @@ -187,3 +211,50 @@ func (c *Cache) SetCacheValue(key string, value interface{}) error { log.Info().Msgf("Successfully set cache for key: %s", key) return nil } + +// Delete removes a single specific cache key +// Example: cache.Delete("task:worker:123") +func (c *Cache) Delete(key string) error { + ctx := context.Background() + deleted, err := c.Redis.Del(ctx, key).Result() + if err != nil { + log.Error().Err(err).Str("key", key).Msg("Failed to clean cache key") + return fmt.Errorf("failed to delete key: %w", err) + } + + if deleted > 0 { + log.Info().Str("key", key).Msg("Clean up Cache") + } else { + log.Debug().Str("key", key).Msg("Cache key not found") + } + return nil +} + +// DeleteByPattern removes all cache entries matching the given pattern +// Example: cache.DeleteByPattern("task:worker:*") - deletes all worker task caches +// Example: cache.DeleteByPattern("user:123:*") - deletes all user 123's caches +func (c *Cache) DeleteByPattern(pattern string) error { + ctx := context.Background() + keys, err := c.Redis.Keys(ctx, pattern).Result() + if err != nil { + log.Error().Err(err).Str("pattern", pattern).Msg("Failed to get keys") + return fmt.Errorf("failed to get keys: %w", err) + } + + if len(keys) > 0 { + deleted, err := c.Redis.Del(ctx, keys...).Result() + if err != nil { + log.Error().Err(err).Msg("Failed to delete keys") + return err + } + log.Info().Int64("deleted", deleted).Str("pattern", pattern).Msg("Clean up Cache") + } + return nil +} + +// DeleteWithSuffix removes a single cache key built with prefix and suffix components +// Example: cache.DeleteWithSuffix(cache.TasksByWorker, "123") -> deletes "task:worker:123" +func (c *Cache) DeleteWithSuffix(prefix CacheKey, suffixes ...string) error { + key := c.BuildCacheKey(prefix, suffixes...) + return c.Delete(key) +} diff --git a/pkg/metric/metrics_service.go b/pkg/metric/metrics_service.go index dcd030e..bc95328 100644 --- a/pkg/metric/metrics_service.go +++ b/pkg/metric/metrics_service.go @@ -2,11 +2,14 @@ package metric import ( "context" + "encoding/json" + "dojo-api/db" + "dojo-api/pkg/cache" "dojo-api/pkg/event" "dojo-api/pkg/orm" - "encoding/json" + "github.com/redis/go-redis/v9" "github.com/rs/zerolog/log" ) @@ -53,19 +56,52 @@ func (metricService *MetricService) UpdateCompletedTaskCount(ctx context.Context } func (metricService *MetricService) UpdateTotalTaskResultsCount(ctx context.Context) error { - taskResultORM := orm.NewTaskResultORM() + cache := cache.GetCacheInstance() + cacheKey := string(cache.Keys.TaskResultsTotal) metricORM := orm.NewMetricsORM() - completedTResultCount, err := taskResultORM.GetCompletedTResultCount(ctx) + // Try to get current count from Redis + currentCount, err := cache.Redis.Get(ctx, cacheKey).Int64() + log.Info().Int64("TaskResultsCount", currentCount).Msg("Current task results count") + if err == redis.Nil { // Key doesn't exist (e.g., after Redis restart) + // Get the last metric from database + lastMetric, err := metricORM.GetMetricsDataByMetricType(ctx, db.MetricsTypeTotalNumTaskResults) + if err != nil && !db.IsErrNotFound(err) { + return err + } + + // Initialize Redis counter with last known value from database + // If no metric is found, counter will start from 0 + var initialCount int64 = 0 + if lastMetric != nil { + var lastMetricData MetricTaskResultsCount + if err := json.Unmarshal(lastMetric.MetricsData, &lastMetricData); err != nil { + return err + } + log.Info().Interface("LastMetricData", lastMetricData).Msg("Last Task Results Count in Metrics") + initialCount = int64(lastMetricData.TotalNumTasksResults) + } + + if err := cache.Redis.Set(ctx, cacheKey, initialCount, 0).Err(); err != nil { + return err + } + log.Info().Int64("initial_count", initialCount).Msg("Initialized task results counter") + } else if err != nil { + return err + } + + // Increment the counter + count, err := cache.Redis.Incr(ctx, cacheKey).Result() if err != nil { - log.Error().Err(err).Msg("Failed to get completed task result count") + log.Error().Err(err).Msg("Failed to increment task results count") return err } - newMetricData := MetricTaskResultsCount{TotalNumTasksResults: completedTResultCount} + + // Store in database + newMetricData := MetricTaskResultsCount{TotalNumTasksResults: int(count)} log.Info().Interface("TotalTaskResultsCount", newMetricData).Msg("Updating total task results count metric") - err = metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumTaskResults, newMetricData) - return err + return metricORM.CreateNewMetric(ctx, db.MetricsTypeTotalNumTaskResults, newMetricData) } func (metricService *MetricService) UpdateAvgTaskCompletionTime(ctx context.Context) error { diff --git a/pkg/orm/dojo_worker.go b/pkg/orm/dojo_worker.go index 6b5215f..ab3fb53 100644 --- a/pkg/orm/dojo_worker.go +++ b/pkg/orm/dojo_worker.go @@ -36,10 +36,9 @@ func (s *DojoWorkerORM) CreateDojoWorker(walletAddress string, chainId string) ( } func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db.DojoWorkerModel, error) { - cacheKey := cache.BuildCacheKey(cache.WorkerByWallet, walletAddress) - var worker *db.DojoWorkerModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.WorkerByWallet, walletAddress) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &worker); err == nil { @@ -71,9 +70,9 @@ func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db. } func (s *DojoWorkerORM) GetDojoWorkers() (int, error) { - cacheKey := cache.BuildCacheKey(cache.WorkerCount, "") var count int cache := cache.GetCacheInstance() + cacheKey := string(cache.Keys.WorkerCount) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &count); err == nil { diff --git a/pkg/orm/metrics.go b/pkg/orm/metrics.go index 03b924e..c60e4b2 100644 --- a/pkg/orm/metrics.go +++ b/pkg/orm/metrics.go @@ -2,10 +2,11 @@ package orm import ( "context" - "dojo-api/db" "encoding/json" "time" + "dojo-api/db" + "github.com/rs/zerolog/log" ) diff --git a/pkg/orm/miner_user.go b/pkg/orm/miner_user.go index 14ccd35..c3c354c 100644 --- a/pkg/orm/miner_user.go +++ b/pkg/orm/miner_user.go @@ -68,7 +68,7 @@ func (s *MinerUserORM) GetUserByHotkey(hotkey string) (*db.MinerUserModel, error user, err := s.dbClient.MinerUser.FindFirst( db.MinerUser.Hotkey.Equals(hotkey), ).Exec(ctx) - if err != nil { + if err != nil && err != db.ErrNotFound { log.Error().Err(err).Msg("Error retrieving user by hotkey") return nil, err } diff --git a/pkg/orm/subscriptionKey.go b/pkg/orm/subscriptionKey.go index f646285..6df619e 100644 --- a/pkg/orm/subscriptionKey.go +++ b/pkg/orm/subscriptionKey.go @@ -21,10 +21,9 @@ func NewSubscriptionKeyORM() *SubscriptionKeyORM { } func (a *SubscriptionKeyORM) GetSubscriptionKeysByMinerHotkey(hotkey string) ([]db.SubscriptionKeyModel, error) { - cacheKey := cache.BuildCacheKey(cache.SubByHotkey, hotkey) - var subKeys []db.SubscriptionKeyModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.SubByHotkey, hotkey) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &subKeys); err == nil { @@ -102,10 +101,9 @@ func (a *SubscriptionKeyORM) DisableSubscriptionKeyByHotkey(hotkey string, subsc } func (a *SubscriptionKeyORM) GetSubscriptionByKey(subScriptionKey string) (*db.SubscriptionKeyModel, error) { - cacheKey := cache.BuildCacheKey(cache.SubByKey, subScriptionKey) - var foundSubscriptionKey *db.SubscriptionKeyModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.SubByKey, subScriptionKey) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &foundSubscriptionKey); err == nil { diff --git a/pkg/orm/task.go b/pkg/orm/task.go index b702e5d..ffeeffb 100644 --- a/pkg/orm/task.go +++ b/pkg/orm/task.go @@ -52,10 +52,9 @@ func (o *TaskORM) CreateTask(ctx context.Context, task db.InnerTask, minerUserId // GetById with caching func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, error) { - cacheKey := cache.BuildCacheKey(cache.TaskById, taskId) - var task *db.TaskModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.TaskById, taskId) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &task); err == nil { @@ -88,10 +87,10 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str for i, t := range taskTypes { typeStrs[i] = string(t) } - cacheKey := cache.BuildCacheKey(cache.TasksByWorker, workerId, strconv.Itoa(offset), strconv.Itoa(limit), strings.Join(typeStrs, ",")) var tasks []db.TaskModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.TasksByWorker, workerId, strconv.Itoa(offset), strconv.Itoa(limit), strings.Join(typeStrs, ",")) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &tasks); err == nil { @@ -239,54 +238,17 @@ func (o *TaskORM) countTasksByWorkerSubscription(ctx context.Context, taskTypes return totalTasks, nil } -// check every 10 mins for expired tasks +// Check every 10 mins for expired tasks func (o *TaskORM) UpdateExpiredTasks(ctx context.Context) { - for range time.Tick(3 * time.Minute) { + for range time.Tick(10 * time.Minute) { log.Info().Msg("Checking for expired tasks") o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() - currentTime := time.Now() + currentTime := time.Now().UTC() batchSize := 100 // Adjust batch size based on database performance - - // Step 1: Delete expired tasks without TaskResults in batches batchNumber := 0 - startTime := time.Now() // Start timing for delete operation - for { - batchNumber++ - deleteQuery := ` - DELETE FROM "Task" - WHERE "id" IN ( - SELECT "id" FROM "Task" - WHERE "expire_at" <= $1 - AND "status" IN ($2::"TaskStatus", $3::"TaskStatus") - AND "id" NOT IN (SELECT DISTINCT "task_id" FROM "TaskResult") - LIMIT $4 - ) - ` - - // has to include TaskStatusInProgress, to handle Task with in-progress with no results - params := []interface{}{currentTime, db.TaskStatusInProgress, db.TaskStatusExpired, batchSize} - - execResult, err := o.dbClient.Prisma.ExecuteRaw(deleteQuery, params...).Exec(ctx) - if err != nil { - log.Error().Err(err).Msg("Error deleting tasks without TaskResults") - break - } - - if execResult.Count == 0 { - log.Info().Msg("No more expired tasks to delete without TaskResults") - break - } - - log.Info().Msgf("Deleted %v expired tasks without associated TaskResults in batch %d", execResult.Count, batchNumber) - } - deleteDuration := time.Since(startTime) // Calculate total duration for delete operation - log.Info().Msgf("Total time taken to delete expired tasks without TaskResults: %s", deleteDuration) - - // Step 2: Update expired tasks with TaskResults to 'expired' status in batches - batchNumber = 0 - startTime = time.Now() // Start timing for update operation + startTime := time.Now().UTC() // Start timing for update operation for { batchNumber++ updateQuery := ` diff --git a/pkg/orm/task_result.go b/pkg/orm/task_result.go index 9b7d9d3..aa10657 100644 --- a/pkg/orm/task_result.go +++ b/pkg/orm/task_result.go @@ -45,17 +45,6 @@ func (t *TaskResultORM) GetTaskResultsByTaskId(ctx context.Context, taskId strin } func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) { - cacheKey := cache.BuildCacheKey(cache.TaskResultByTaskAndWorker, taskId, workerId) - - var results []db.TaskResultModel - cacheInstance := cache.GetCacheInstance() - - // Try to get from cache - if err := cacheInstance.GetCacheValue(cacheKey, &results); err == nil { - return results, nil - } - - // Cache miss, fetch from database t.clientWrapper.BeforeQuery() defer t.clientWrapper.AfterQuery() @@ -68,19 +57,13 @@ func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, return nil, err } - // Set cache - if err := cacheInstance.SetCacheValue(cacheKey, results); err != nil { - log.Warn().Err(err).Msg("Failed to set cache") - } - return results, nil } func (t *TaskResultORM) GetCompletedTResultByWorker(ctx context.Context, workerId string) ([]db.TaskResultModel, error) { - cacheKey := cache.BuildCacheKey(cache.TaskResultByWorker, workerId) - var results []db.TaskResultModel cache := cache.GetCacheInstance() + cacheKey := cache.BuildCacheKey(cache.Keys.TaskResultByWorker, workerId) // Try to get from cache first if err := cache.GetCacheValue(cacheKey, &results); err == nil {