From a050222b56f9f4db8a65fba9177591ac3e4ff79e Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Wed, 27 Nov 2024 20:50:29 +0630 Subject: [PATCH 1/3] perf: added cache in db layer --- pkg/api/utils.go | 4 +- pkg/cache/cache.go | 33 +++++++++ pkg/orm/dojo_worker.go | 83 ++++++++++++++++++++++- pkg/orm/subscriptionKey.go | 72 +++++++++++++++++++- pkg/orm/task.go | 133 ++++++++++++++++++++++++++++++++----- pkg/orm/task_result.go | 110 ++++++++++++++++++++++++++++-- 6 files changed, 406 insertions(+), 29 deletions(-) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 11ba6ae..8b30d64 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string { // TODO - Need to check if this is the correct way without getting spoofing if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" { callerIp := c.Request.Header.Get("X-Original-Forwarded-For") - log.Info().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) + log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp) return callerIp } callerIp := c.ClientIP() - log.Info().Msgf("Got caller IP from ClientIP: %s", callerIp) + log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp) return callerIp } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 93c88cd..00ee7c0 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -3,6 +3,7 @@ package cache import ( "context" "crypto/tls" + "encoding/json" "fmt" "os" "sync" @@ -25,6 +26,12 @@ type Cache struct { Redis redis.Client } +// CacheableData interface for any data that can be cached +type CacheableData interface { + GetCacheKey() string + GetExpiration() time.Duration +} + var ( instance *Cache once sync.Once @@ -109,3 +116,29 @@ func (c *Cache) Shutdown() { c.Redis.Close() log.Info().Msg("Successfully closed Redis connection") } + +// GetCache attempts to retrieve and unmarshal data from cache +func (c *Cache) GetCache(data CacheableData, value interface{}) error { + cachedData, err := c.Get(data.GetCacheKey()) + if err != nil || cachedData == "" { + return fmt.Errorf("cache miss for key: %s", data.GetCacheKey()) + } + + log.Info().Msgf("Cache hit for key: %s", data.GetCacheKey()) + return json.Unmarshal([]byte(cachedData), value) +} + +// SetCache marshals and stores data in cache +func (c *Cache) SetCache(data CacheableData, value interface{}) error { + dataJSON, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("failed to marshal data: %w", err) + } + + if err := c.SetWithExpire(data.GetCacheKey(), dataJSON, data.GetExpiration()); err != nil { + return fmt.Errorf("failed to set cache: %w", err) + } + + log.Info().Msgf("Successfully set cache for key: %s", data.GetCacheKey()) + return nil +} diff --git a/pkg/orm/dojo_worker.go b/pkg/orm/dojo_worker.go index 2de7adb..294c4b4 100644 --- a/pkg/orm/dojo_worker.go +++ b/pkg/orm/dojo_worker.go @@ -2,14 +2,58 @@ package orm import ( "context" - "dojo-api/db" "errors" "fmt" "strconv" + "time" + + "dojo-api/db" + + "dojo-api/pkg/cache" "github.com/rs/zerolog/log" ) +type DojoWorkerCacheKey string + +const ( + WorkerByWalletCacheKey DojoWorkerCacheKey = "worker_by_wallet" // Short key for worker by wallet + WorkerCountCacheKey DojoWorkerCacheKey = "worker_count" // Short key for worker count +) + +type DojoWorkerCache struct { + key DojoWorkerCacheKey + walletAddress string +} + +func NewDojoWorkerCache(key DojoWorkerCacheKey) *DojoWorkerCache { + return &DojoWorkerCache{ + key: key, + } +} + +func (dc *DojoWorkerCache) GetCacheKey() string { + switch dc.key { + case WorkerByWalletCacheKey: + return fmt.Sprintf("%s:%s", dc.key, dc.walletAddress) + case WorkerCountCacheKey: + return string(dc.key) + default: + return fmt.Sprintf("dw:%s", dc.walletAddress) + } +} + +func (dc *DojoWorkerCache) GetExpiration() time.Duration { + switch dc.key { + case WorkerByWalletCacheKey: + return 5 * time.Minute + case WorkerCountCacheKey: + return 1 * time.Minute + default: + return 3 * time.Minute + } +} + type DojoWorkerORM struct { dbClient *db.PrismaClient clientWrapper *PrismaClientWrapper @@ -33,6 +77,18 @@ func (s *DojoWorkerORM) CreateDojoWorker(walletAddress string, chainId string) ( } func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db.DojoWorkerModel, error) { + workerCache := NewDojoWorkerCache(WorkerByWalletCacheKey) + workerCache.walletAddress = walletAddress + + var worker *db.DojoWorkerModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(workerCache, &worker); err == nil { + return worker, nil + } + + // Cache miss, fetch from database s.clientWrapper.BeforeQuery() defer s.clientWrapper.AfterQuery() @@ -47,10 +103,26 @@ func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db. } return nil, err } + + // Store in cache + if err := cache.SetCache(workerCache, worker); err != nil { + log.Warn().Err(err).Msg("Failed to set worker cache") + } + return worker, nil } func (s *DojoWorkerORM) GetDojoWorkers() (int, error) { + workerCache := NewDojoWorkerCache(WorkerCountCacheKey) + var count int + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(workerCache, &count); err == nil { + return count, nil + } + + // Cache miss, fetch from database s.clientWrapper.BeforeQuery() defer s.clientWrapper.AfterQuery() @@ -70,10 +142,15 @@ func (s *DojoWorkerORM) GetDojoWorkers() (int, error) { } workerCountStr := string(result[0].Count) - workerCountInt, err := strconv.Atoi(workerCountStr) + count, err = strconv.Atoi(workerCountStr) if err != nil { return 0, err } - return workerCountInt, nil + // Store in cache + if err := cache.SetCache(workerCache, count); err != nil { + log.Warn().Err(err).Msg("Failed to set worker count cache") + } + + return count, nil } diff --git a/pkg/orm/subscriptionKey.go b/pkg/orm/subscriptionKey.go index 449181b..e9aada4 100644 --- a/pkg/orm/subscriptionKey.go +++ b/pkg/orm/subscriptionKey.go @@ -2,8 +2,12 @@ package orm import ( "context" - "dojo-api/db" "errors" + "fmt" + "time" + + "dojo-api/db" + "dojo-api/pkg/cache" "github.com/rs/zerolog/log" ) @@ -13,14 +17,56 @@ type SubscriptionKeyORM struct { clientWrapper *PrismaClientWrapper } +type SubscriptionKeyCacheKey string + +const ( + SubKeysByHotkeyCacheKey SubscriptionKeyCacheKey = "sk_by_hotkey" + SubKeyByKeyCacheKey SubscriptionKeyCacheKey = "sk_by_key" +) + +type SubscriptionKeyCache struct { + key SubscriptionKeyCacheKey + hotkey string + subKey string +} + +func NewSubscriptionKeyCache(key SubscriptionKeyCacheKey) *SubscriptionKeyCache { + return &SubscriptionKeyCache{ + key: key, + } +} + +func (sc *SubscriptionKeyCache) GetCacheKey() string { + switch sc.key { + case SubKeysByHotkeyCacheKey: + return fmt.Sprintf("%s:%s", sc.key, sc.hotkey) + case SubKeyByKeyCacheKey: + return fmt.Sprintf("%s:%s", sc.key, sc.subKey) + default: + return fmt.Sprintf("sk:%s", sc.subKey) + } +} + +func (sc *SubscriptionKeyCache) GetExpiration() time.Duration { + return 5 * time.Minute +} + func NewSubscriptionKeyORM() *SubscriptionKeyORM { clientWrapper := GetPrismaClient() return &SubscriptionKeyORM{dbClient: clientWrapper.Client, clientWrapper: clientWrapper} } func (a *SubscriptionKeyORM) GetSubscriptionKeysByMinerHotkey(hotkey string) ([]db.SubscriptionKeyModel, error) { - a.clientWrapper.BeforeQuery() - defer a.clientWrapper.AfterQuery() + subCache := NewSubscriptionKeyCache(SubKeysByHotkeyCacheKey) + subCache.hotkey = hotkey + + var subKeys []db.SubscriptionKeyModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(subCache, &subKeys); err == nil { + return subKeys, nil + } ctx := context.Background() @@ -41,6 +87,11 @@ func (a *SubscriptionKeyORM) GetSubscriptionKeysByMinerHotkey(hotkey string) ([] return nil, err } + // Cache the result + if err := cache.SetCache(subCache, apiKeys); err != nil { + log.Error().Err(err).Msgf("Error caching subscription keys") + } + return apiKeys, nil } @@ -88,6 +139,16 @@ func (a *SubscriptionKeyORM) DisableSubscriptionKeyByHotkey(hotkey string, subsc } func (a *SubscriptionKeyORM) GetSubscriptionByKey(subScriptionKey string) (*db.SubscriptionKeyModel, error) { + subCache := NewSubscriptionKeyCache(SubKeyByKeyCacheKey) + subCache.subKey = subScriptionKey + + var foundSubscriptionKey *db.SubscriptionKeyModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(subCache, &foundSubscriptionKey); err == nil { + return foundSubscriptionKey, nil + } a.clientWrapper.BeforeQuery() defer a.clientWrapper.AfterQuery() @@ -107,5 +168,10 @@ func (a *SubscriptionKeyORM) GetSubscriptionByKey(subScriptionKey string) (*db.S return nil, err } + // Cache the result + if err := cache.SetCache(subCache, foundSubscriptionKey); err != nil { + log.Error().Err(err).Msgf("Error caching subscription key") + } + return foundSubscriptionKey, nil } diff --git a/pkg/orm/task.go b/pkg/orm/task.go index c8e3048..27b1e8f 100644 --- a/pkg/orm/task.go +++ b/pkg/orm/task.go @@ -9,6 +9,7 @@ import ( "time" "dojo-api/db" + "dojo-api/pkg/cache" sq "github.com/Masterminds/squirrel" @@ -25,6 +26,65 @@ func NewTaskORM() *TaskORM { return &TaskORM{dbClient: clientWrapper.Client, clientWrapper: clientWrapper} } +// Define cache keys as constants similar to rate limiter pattern +type TaskCacheKey string + +const ( + TaskByIdCacheKey TaskCacheKey = "task" + TasksByWorkerCacheKey TaskCacheKey = "tasks_by_worker" +) + +type TaskCache struct { + key TaskCacheKey + id string + workerId string + offset int + limit int + types []db.TaskType +} + +func NewTaskCache(key TaskCacheKey) *TaskCache { + return &TaskCache{ + key: key, + } +} + +func (tc *TaskCache) GetCacheKey() string { + switch tc.key { + case TaskByIdCacheKey: + return fmt.Sprintf("%s:%s", tc.key, tc.id) + case TasksByWorkerCacheKey: + // Create a shorter hash for task types + typeHash := "" + if len(tc.types) > 0 { + typeStrs := make([]string, len(tc.types)) + for i, t := range tc.types { + typeStrs[i] = string(t)[0:2] // Take first 2 chars of each type + } + typeHash = strings.Join(typeStrs, "") + } + return fmt.Sprintf("%s:%s:%d:%d:%s", + tc.key, // "tasks_by_worker" + tc.workerId, // worker id + tc.offset, // offset number + tc.limit, // limit number + typeHash) // shortened type hash + default: + return fmt.Sprintf("task:%s", tc.id) + } +} + +func (tc *TaskCache) GetExpiration() time.Duration { + switch tc.key { + case TaskByIdCacheKey: + return 5 * time.Minute + case TasksByWorkerCacheKey: + return 2 * time.Minute + default: + return 3 * time.Minute + } +} + // DOES NOT USE ANY DEFAULT VALUES, SO REMEMBER TO SET THE RIGHT STATUS // CreateTask creates a new task in the database with the provided details. // Ignores `Status` and `NumResults` fields as they are set to default values. @@ -49,45 +109,86 @@ func (o *TaskORM) CreateTask(ctx context.Context, task db.InnerTask, minerUserId return createdTask, err } +// GetById with caching func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, error) { + taskCache := NewTaskCache(TaskByIdCacheKey) + taskCache.id = taskId + + var task *db.TaskModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(taskCache, &task); err == nil { + return task, nil + } + + // Cache miss, fetch from database o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() + task, err := o.dbClient.Task.FindUnique( db.Task.ID.Equals(taskId), ).Exec(ctx) - return task, err + if err != nil { + return nil, err + } + + // Store in cache + if err := cache.SetCache(taskCache, task); err != nil { + log.Warn().Err(err).Msg("Failed to set cache") + } + + return task, nil } -// TODO: Optimization +// Modified GetTasksByWorkerSubscription with caching func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId string, offset, limit int, sortQuery db.TaskOrderByParam, taskTypes []db.TaskType) ([]db.TaskModel, int, error) { + // Initialize cache + taskCache := NewTaskCache(TasksByWorkerCacheKey) + taskCache.workerId = workerId + taskCache.offset = offset + taskCache.limit = limit + taskCache.types = taskTypes + + var tasks []db.TaskModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(taskCache, &tasks); err == nil { + totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, nil) + if err != nil { + return tasks, 0, err + } + return tasks, totalTasks, nil + } + + // Cache miss, proceed with database query o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() - // Fetch all active WorkerPartner records to retrieve MinerUser's subscription keys. + + // Rest of the existing implementation... partners, err := o.dbClient.WorkerPartner.FindMany( db.WorkerPartner.WorkerID.Equals(workerId), db.WorkerPartner.IsDeleteByMiner.Equals(false), db.WorkerPartner.IsDeleteByWorker.Equals(false), ).Exec(ctx) if err != nil { - log.Error().Err(err).Msg("Error in fetching WorkerPartner by WorkerID") return nil, 0, err } - // Collect Subscription keys from the fetched WorkerPartner records var subscriptionKeys []string for _, partner := range partners { subscriptionKeys = append(subscriptionKeys, partner.MinerSubscriptionKey) } if len(subscriptionKeys) == 0 { - log.Error().Err(err).Msg("No WorkerPartner found with the given WorkerID") return nil, 0, err } filterParams := []db.TaskWhereParam{ db.Task.MinerUser.Where( db.MinerUser.SubscriptionKeys.Some( - db.SubscriptionKey.Key.In(subscriptionKeys), // SubscriptionKey should be one of the keys in the subscriptionKeys slice. + db.SubscriptionKey.Key.In(subscriptionKeys), ), ), } @@ -96,27 +197,26 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str filterParams = append(filterParams, db.Task.Type.In(taskTypes)) } - log.Debug().Interface("taskTypes", taskTypes).Msgf("Filter Params: %v", filterParams) - - // Fetch tasks associated with these subscription keys - tasks, err := o.dbClient.Task.FindMany( + tasks, err = o.dbClient.Task.FindMany( filterParams..., ).OrderBy(sortQuery). Skip(offset). Take(limit). Exec(ctx) if err != nil { - log.Error().Err(err).Msg("Error in fetching tasks by WorkerSubscriptionKey") return nil, 0, err } totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, subscriptionKeys) if err != nil { - log.Error().Err(err).Msg("Error in fetching total tasks by WorkerSubscriptionKey") return nil, 0, err } - log.Info().Int("totalTasks", totalTasks).Msgf("Successfully fetched total tasks fetched for worker ID %v", workerId) + // Store in cache + if err := cache.SetCache(taskCache, tasks); err != nil { + log.Warn().Err(err).Msg("Failed to set cache") + } + return tasks, totalTasks, nil } @@ -273,6 +373,7 @@ func (o *TaskORM) UpdateExpiredTasks(ctx context.Context) { } } +// Modify GetCompletedTaskCount to use the new pattern func (o *TaskORM) GetCompletedTaskCount(ctx context.Context) (int, error) { o.clientWrapper.BeforeQuery() defer o.clientWrapper.AfterQuery() @@ -292,12 +393,12 @@ func (o *TaskORM) GetCompletedTaskCount(ctx context.Context) (int, error) { } taskCountStr := string(result[0].Count) - taskCountInt, err := strconv.Atoi(taskCountStr) + count, err := strconv.Atoi(taskCountStr) if err != nil { return 0, err } - return taskCountInt, nil + return count, nil } func (o *TaskORM) GetNextInProgressTask(ctx context.Context, taskId string, workerId string) (*db.TaskModel, error) { diff --git a/pkg/orm/task_result.go b/pkg/orm/task_result.go index a792dcf..77ef43b 100644 --- a/pkg/orm/task_result.go +++ b/pkg/orm/task_result.go @@ -2,12 +2,58 @@ package orm import ( "context" - "dojo-api/db" "fmt" "strconv" "time" + + "dojo-api/db" + "dojo-api/pkg/cache" + + "github.com/rs/zerolog/log" +) + +type TaskResultCacheKey string + +const ( + TrByTaskAndWorkerCacheKey TaskResultCacheKey = "task_result_by_task_and_worker" // Short key for task result by worker + TrByWorkerCacheKey TaskResultCacheKey = "task_result_by_worker" // Short key for task result by worker ) +type TaskResultCache struct { + key TaskResultCacheKey + taskId string + workerId string +} + +func NewTaskResultCache(key TaskResultCacheKey) *TaskResultCache { + return &TaskResultCache{ + key: key, + } +} + +func (tc *TaskResultCache) GetCacheKey() string { + switch tc.key { + case TrByTaskAndWorkerCacheKey: + return fmt.Sprintf("%s:%s:%s", tc.key, tc.taskId, tc.workerId) + case TrByWorkerCacheKey: + return fmt.Sprintf("%s:%s", tc.key, tc.workerId) + + default: + return fmt.Sprintf("task_result:%s", tc.taskId) + } +} + +func (tc *TaskResultCache) GetExpiration() time.Duration { + switch tc.key { + case TrByTaskAndWorkerCacheKey: + return 10 * time.Minute + case TrByWorkerCacheKey: + return 10 * time.Minute + default: + return 1 * time.Minute + } +} + type TaskResultORM struct { client *db.PrismaClient clientWrapper *PrismaClientWrapper @@ -36,22 +82,76 @@ func (t *TaskResultORM) CreateTaskResult(ctx context.Context, taskResult *db.Inn func (t *TaskResultORM) GetTaskResultsByTaskId(ctx context.Context, taskId string) ([]db.TaskResultModel, error) { t.clientWrapper.BeforeQuery() defer t.clientWrapper.AfterQuery() + return t.client.TaskResult.FindMany(db.TaskResult.TaskID.Equals(taskId)).Exec(ctx) } -func (orm *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) { - return orm.client.TaskResult.FindMany( +func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) { + // Initialize cache + resultCache := NewTaskResultCache(TrByTaskAndWorkerCacheKey) + resultCache.taskId = taskId + resultCache.workerId = workerId + + var results []db.TaskResultModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(resultCache, &results); err == nil { + return results, nil + } + + // Cache miss, fetch from database + t.clientWrapper.BeforeQuery() + defer t.clientWrapper.AfterQuery() + + results, err := t.client.TaskResult.FindMany( db.TaskResult.TaskID.Equals(taskId), db.TaskResult.WorkerID.Equals(workerId), db.TaskResult.Status.Equals(db.TaskResultStatusCompleted), ).Exec(ctx) + if err != nil { + return nil, err + } + + // Store in cache + if err := cache.SetCache(resultCache, results); err != nil { + log.Warn().Err(err).Msg("Failed to set task result cache") + } + + return results, nil } -func (orm *TaskResultORM) GetCompletedTResultByWorker(ctx context.Context, workerId string) ([]db.TaskResultModel, error) { - return orm.client.TaskResult.FindMany( +func (t *TaskResultORM) GetCompletedTResultByWorker(ctx context.Context, workerId string) ([]db.TaskResultModel, error) { + // Initialize cache + resultCache := NewTaskResultCache(TrByWorkerCacheKey) + resultCache.workerId = workerId + + var results []db.TaskResultModel + cache := cache.GetCacheInstance() + + // Try to get from cache first + if err := cache.GetCache(resultCache, &results); err == nil { + return results, nil + } + + // Cache miss, fetch from database + t.clientWrapper.BeforeQuery() + defer t.clientWrapper.AfterQuery() + + results, err := t.client.TaskResult.FindMany( db.TaskResult.WorkerID.Equals(workerId), db.TaskResult.Status.Equals(db.TaskResultStatusCompleted), ).Exec(ctx) + if err != nil { + return nil, err + } + + // Store in cache + if err := cache.SetCache(resultCache, results); err != nil { + log.Warn().Err(err).Msg("Failed to set task result cache") + } + + return results, nil } func (t *TaskResultORM) CreateTaskResultWithInvalid(ctx context.Context, taskResult *db.InnerTaskResult) (*db.TaskResultModel, error) { From 589c7a302d91b11f8437b52ef2237df7b78b2eda Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Fri, 29 Nov 2024 12:11:06 +0630 Subject: [PATCH 2/3] perf: trying out with msg pack --- go.mod | 2 ++ go.sum | 4 ++++ pkg/cache/cache.go | 12 ++++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index f7555a5..fb0831f 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,8 @@ require ( github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/relvacode/iso8601 v1.4.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/sync v0.7.0 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect ) diff --git a/go.sum b/go.sum index 9ca9303..ba12e34 100644 --- a/go.sum +++ b/go.sum @@ -261,6 +261,10 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/ulule/limiter/v3 v3.11.2 h1:P4yOrxoEMJbOTfRJR2OzjL90oflzYPPmWg+dvwN2tHA= github.com/ulule/limiter/v3 v3.11.2/go.mod h1:QG5GnFOCV+k7lrL5Y8kgEeeflPH3+Cviqlqa8SVSQxI= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 00ee7c0..1b61b22 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -3,7 +3,6 @@ package cache import ( "context" "crypto/tls" - "encoding/json" "fmt" "os" "sync" @@ -12,6 +11,7 @@ import ( "dojo-api/utils" "github.com/redis/go-redis/v9" + "github.com/vmihailenco/msgpack/v5" "github.com/rs/zerolog/log" ) @@ -117,7 +117,7 @@ func (c *Cache) Shutdown() { log.Info().Msg("Successfully closed Redis connection") } -// GetCache attempts to retrieve and unmarshal data from cache +// GetCache retrieves and unmarshals data from cache using MessagePack func (c *Cache) GetCache(data CacheableData, value interface{}) error { cachedData, err := c.Get(data.GetCacheKey()) if err != nil || cachedData == "" { @@ -125,17 +125,17 @@ func (c *Cache) GetCache(data CacheableData, value interface{}) error { } log.Info().Msgf("Cache hit for key: %s", data.GetCacheKey()) - return json.Unmarshal([]byte(cachedData), value) + return msgpack.Unmarshal([]byte(cachedData), value) } -// SetCache marshals and stores data in cache +// SetCache marshals and stores data in cache using MessagePack func (c *Cache) SetCache(data CacheableData, value interface{}) error { - dataJSON, err := json.Marshal(value) + dataBytes, err := msgpack.Marshal(value) if err != nil { return fmt.Errorf("failed to marshal data: %w", err) } - if err := c.SetWithExpire(data.GetCacheKey(), dataJSON, data.GetExpiration()); err != nil { + if err := c.SetWithExpire(data.GetCacheKey(), dataBytes, data.GetExpiration()); err != nil { return fmt.Errorf("failed to set cache: %w", err) } From 1f0100bdc1cbd981817cfb9836c0ddd62decc51c Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Sun, 1 Dec 2024 17:35:53 +0630 Subject: [PATCH 3/3] fix: added fixed for PR feedbacks --- pkg/cache/cache.go | 75 +++++++++++++++++++++++++------- pkg/orm/dojo_worker.go | 54 +++-------------------- pkg/orm/subscriptionKey.go | 54 ++++------------------- pkg/orm/task.go | 89 ++++++++------------------------------ pkg/orm/task_result.go | 67 +++++----------------------- 5 files changed, 102 insertions(+), 237 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 1b61b22..928e3a1 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -26,18 +26,62 @@ type Cache struct { Redis redis.Client } -// CacheableData interface for any data that can be cached -type CacheableData interface { - GetCacheKey() string - GetExpiration() time.Duration -} - var ( instance *Cache once sync.Once mu sync.Mutex ) +// CacheKey type for type-safe cache keys +type CacheKey string + +const ( + // Task cache keys + TaskById CacheKey = "task" // Single task by ID + TasksByWorker CacheKey = "task:worker" // List of tasks by worker + + // Task Result cache keys + TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID + TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID + + // Worker cache keys + WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address + WorkerCount CacheKey = "worker:count" // Total worker count + + // Subscription cache keys + SubByHotkey CacheKey = "sub:hotkey" // Subscription by hotkey + SubByKey CacheKey = "sub:key" // Subscription by key +) + +// CacheConfig defines cache keys and their expiration times +var CacheConfig = map[CacheKey]time.Duration{ + TaskById: 5 * time.Minute, + TasksByWorker: 2 * time.Minute, + TaskResultByTaskAndWorker: 10 * time.Minute, + TaskResultByWorker: 10 * time.Minute, + WorkerByWallet: 5 * time.Minute, + WorkerCount: 1 * time.Minute, + SubByHotkey: 5 * time.Minute, + SubByKey: 5 * time.Minute, +} + +// GetCacheExpiration returns the expiration time for a given cache key +func GetCacheExpiration(key CacheKey) time.Duration { + if duration, exists := CacheConfig[key]; exists { + return duration + } + return 5 * time.Minute // default expiration +} + +// BuildCacheKey builds a cache key with the given prefix and components +func BuildCacheKey(prefix CacheKey, components ...string) string { + key := string(prefix) + for _, component := range components { + key += ":" + component + } + return key +} + func GetCacheInstance() *Cache { once.Do(func() { mu.Lock() @@ -117,28 +161,29 @@ func (c *Cache) Shutdown() { log.Info().Msg("Successfully closed Redis connection") } -// GetCache retrieves and unmarshals data from cache using MessagePack -func (c *Cache) GetCache(data CacheableData, value interface{}) error { - cachedData, err := c.Get(data.GetCacheKey()) +// GetCacheValue retrieves and unmarshals data from cache using MessagePack +func (c *Cache) GetCacheValue(key string, value interface{}) error { + cachedData, err := c.Get(key) if err != nil || cachedData == "" { - return fmt.Errorf("cache miss for key: %s", data.GetCacheKey()) + return fmt.Errorf("cache miss for key: %s", key) } - log.Info().Msgf("Cache hit for key: %s", data.GetCacheKey()) + log.Info().Msgf("Cache hit for key: %s", key) return msgpack.Unmarshal([]byte(cachedData), value) } -// SetCache marshals and stores data in cache using MessagePack -func (c *Cache) SetCache(data CacheableData, value interface{}) error { +// SetCacheValue marshals and stores data in cache using MessagePack +func (c *Cache) SetCacheValue(key string, value interface{}) error { dataBytes, err := msgpack.Marshal(value) if err != nil { return fmt.Errorf("failed to marshal data: %w", err) } - if err := c.SetWithExpire(data.GetCacheKey(), dataBytes, data.GetExpiration()); err != nil { + expiration := GetCacheExpiration(CacheKey(key)) + if err := c.SetWithExpire(key, dataBytes, expiration); err != nil { return fmt.Errorf("failed to set cache: %w", err) } - log.Info().Msgf("Successfully set cache for key: %s", data.GetCacheKey()) + log.Info().Msgf("Successfully set cache for key: %s", key) return nil } diff --git a/pkg/orm/dojo_worker.go b/pkg/orm/dojo_worker.go index 294c4b4..6b5215f 100644 --- a/pkg/orm/dojo_worker.go +++ b/pkg/orm/dojo_worker.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strconv" - "time" "dojo-api/db" @@ -14,46 +13,6 @@ import ( "github.com/rs/zerolog/log" ) -type DojoWorkerCacheKey string - -const ( - WorkerByWalletCacheKey DojoWorkerCacheKey = "worker_by_wallet" // Short key for worker by wallet - WorkerCountCacheKey DojoWorkerCacheKey = "worker_count" // Short key for worker count -) - -type DojoWorkerCache struct { - key DojoWorkerCacheKey - walletAddress string -} - -func NewDojoWorkerCache(key DojoWorkerCacheKey) *DojoWorkerCache { - return &DojoWorkerCache{ - key: key, - } -} - -func (dc *DojoWorkerCache) GetCacheKey() string { - switch dc.key { - case WorkerByWalletCacheKey: - return fmt.Sprintf("%s:%s", dc.key, dc.walletAddress) - case WorkerCountCacheKey: - return string(dc.key) - default: - return fmt.Sprintf("dw:%s", dc.walletAddress) - } -} - -func (dc *DojoWorkerCache) GetExpiration() time.Duration { - switch dc.key { - case WorkerByWalletCacheKey: - return 5 * time.Minute - case WorkerCountCacheKey: - return 1 * time.Minute - default: - return 3 * time.Minute - } -} - type DojoWorkerORM struct { dbClient *db.PrismaClient clientWrapper *PrismaClientWrapper @@ -77,14 +36,13 @@ func (s *DojoWorkerORM) CreateDojoWorker(walletAddress string, chainId string) ( } func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db.DojoWorkerModel, error) { - workerCache := NewDojoWorkerCache(WorkerByWalletCacheKey) - workerCache.walletAddress = walletAddress + cacheKey := cache.BuildCacheKey(cache.WorkerByWallet, walletAddress) var worker *db.DojoWorkerModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(workerCache, &worker); err == nil { + if err := cache.GetCacheValue(cacheKey, &worker); err == nil { return worker, nil } @@ -105,7 +63,7 @@ func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db. } // Store in cache - if err := cache.SetCache(workerCache, worker); err != nil { + if err := cache.SetCacheValue(cacheKey, worker); err != nil { log.Warn().Err(err).Msg("Failed to set worker cache") } @@ -113,12 +71,12 @@ func (s *DojoWorkerORM) GetDojoWorkerByWalletAddress(walletAddress string) (*db. } func (s *DojoWorkerORM) GetDojoWorkers() (int, error) { - workerCache := NewDojoWorkerCache(WorkerCountCacheKey) + cacheKey := cache.BuildCacheKey(cache.WorkerCount, "") var count int cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(workerCache, &count); err == nil { + if err := cache.GetCacheValue(cacheKey, &count); err == nil { return count, nil } @@ -148,7 +106,7 @@ func (s *DojoWorkerORM) GetDojoWorkers() (int, error) { } // Store in cache - if err := cache.SetCache(workerCache, count); err != nil { + if err := cache.SetCacheValue(cacheKey, count); err != nil { log.Warn().Err(err).Msg("Failed to set worker count cache") } diff --git a/pkg/orm/subscriptionKey.go b/pkg/orm/subscriptionKey.go index e9aada4..f646285 100644 --- a/pkg/orm/subscriptionKey.go +++ b/pkg/orm/subscriptionKey.go @@ -3,8 +3,6 @@ package orm import ( "context" "errors" - "fmt" - "time" "dojo-api/db" "dojo-api/pkg/cache" @@ -17,54 +15,19 @@ type SubscriptionKeyORM struct { clientWrapper *PrismaClientWrapper } -type SubscriptionKeyCacheKey string - -const ( - SubKeysByHotkeyCacheKey SubscriptionKeyCacheKey = "sk_by_hotkey" - SubKeyByKeyCacheKey SubscriptionKeyCacheKey = "sk_by_key" -) - -type SubscriptionKeyCache struct { - key SubscriptionKeyCacheKey - hotkey string - subKey string -} - -func NewSubscriptionKeyCache(key SubscriptionKeyCacheKey) *SubscriptionKeyCache { - return &SubscriptionKeyCache{ - key: key, - } -} - -func (sc *SubscriptionKeyCache) GetCacheKey() string { - switch sc.key { - case SubKeysByHotkeyCacheKey: - return fmt.Sprintf("%s:%s", sc.key, sc.hotkey) - case SubKeyByKeyCacheKey: - return fmt.Sprintf("%s:%s", sc.key, sc.subKey) - default: - return fmt.Sprintf("sk:%s", sc.subKey) - } -} - -func (sc *SubscriptionKeyCache) GetExpiration() time.Duration { - return 5 * time.Minute -} - func NewSubscriptionKeyORM() *SubscriptionKeyORM { clientWrapper := GetPrismaClient() return &SubscriptionKeyORM{dbClient: clientWrapper.Client, clientWrapper: clientWrapper} } func (a *SubscriptionKeyORM) GetSubscriptionKeysByMinerHotkey(hotkey string) ([]db.SubscriptionKeyModel, error) { - subCache := NewSubscriptionKeyCache(SubKeysByHotkeyCacheKey) - subCache.hotkey = hotkey + cacheKey := cache.BuildCacheKey(cache.SubByHotkey, hotkey) var subKeys []db.SubscriptionKeyModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(subCache, &subKeys); err == nil { + if err := cache.GetCacheValue(cacheKey, &subKeys); err == nil { return subKeys, nil } @@ -88,7 +51,7 @@ func (a *SubscriptionKeyORM) GetSubscriptionKeysByMinerHotkey(hotkey string) ([] } // Cache the result - if err := cache.SetCache(subCache, apiKeys); err != nil { + if err := cache.SetCacheValue(cacheKey, apiKeys); err != nil { log.Error().Err(err).Msgf("Error caching subscription keys") } @@ -107,7 +70,7 @@ func (a *SubscriptionKeyORM) CreateSubscriptionKeyByHotkey(hotkey string, subscr return nil, err } - createdApiKey, err := a.dbClient.SubscriptionKey.CreateOne( + createdSubKey, err := a.dbClient.SubscriptionKey.CreateOne( db.SubscriptionKey.Key.Set(subscriptionKey), db.SubscriptionKey.MinerUser.Link( db.MinerUser.ID.Equals(minerUser.ID), @@ -118,7 +81,7 @@ func (a *SubscriptionKeyORM) CreateSubscriptionKeyByHotkey(hotkey string, subscr log.Error().Err(err).Msgf("Error creating subscription key") return nil, err } - return createdApiKey, nil + return createdSubKey, nil } func (a *SubscriptionKeyORM) DisableSubscriptionKeyByHotkey(hotkey string, subscriptionKey string) (*db.SubscriptionKeyModel, error) { @@ -139,14 +102,13 @@ func (a *SubscriptionKeyORM) DisableSubscriptionKeyByHotkey(hotkey string, subsc } func (a *SubscriptionKeyORM) GetSubscriptionByKey(subScriptionKey string) (*db.SubscriptionKeyModel, error) { - subCache := NewSubscriptionKeyCache(SubKeyByKeyCacheKey) - subCache.subKey = subScriptionKey + cacheKey := cache.BuildCacheKey(cache.SubByKey, subScriptionKey) var foundSubscriptionKey *db.SubscriptionKeyModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(subCache, &foundSubscriptionKey); err == nil { + if err := cache.GetCacheValue(cacheKey, &foundSubscriptionKey); err == nil { return foundSubscriptionKey, nil } a.clientWrapper.BeforeQuery() @@ -169,7 +131,7 @@ func (a *SubscriptionKeyORM) GetSubscriptionByKey(subScriptionKey string) (*db.S } // Cache the result - if err := cache.SetCache(subCache, foundSubscriptionKey); err != nil { + if err := cache.SetCacheValue(cacheKey, foundSubscriptionKey); err != nil { log.Error().Err(err).Msgf("Error caching subscription key") } diff --git a/pkg/orm/task.go b/pkg/orm/task.go index 27b1e8f..b702e5d 100644 --- a/pkg/orm/task.go +++ b/pkg/orm/task.go @@ -26,65 +26,6 @@ func NewTaskORM() *TaskORM { return &TaskORM{dbClient: clientWrapper.Client, clientWrapper: clientWrapper} } -// Define cache keys as constants similar to rate limiter pattern -type TaskCacheKey string - -const ( - TaskByIdCacheKey TaskCacheKey = "task" - TasksByWorkerCacheKey TaskCacheKey = "tasks_by_worker" -) - -type TaskCache struct { - key TaskCacheKey - id string - workerId string - offset int - limit int - types []db.TaskType -} - -func NewTaskCache(key TaskCacheKey) *TaskCache { - return &TaskCache{ - key: key, - } -} - -func (tc *TaskCache) GetCacheKey() string { - switch tc.key { - case TaskByIdCacheKey: - return fmt.Sprintf("%s:%s", tc.key, tc.id) - case TasksByWorkerCacheKey: - // Create a shorter hash for task types - typeHash := "" - if len(tc.types) > 0 { - typeStrs := make([]string, len(tc.types)) - for i, t := range tc.types { - typeStrs[i] = string(t)[0:2] // Take first 2 chars of each type - } - typeHash = strings.Join(typeStrs, "") - } - return fmt.Sprintf("%s:%s:%d:%d:%s", - tc.key, // "tasks_by_worker" - tc.workerId, // worker id - tc.offset, // offset number - tc.limit, // limit number - typeHash) // shortened type hash - default: - return fmt.Sprintf("task:%s", tc.id) - } -} - -func (tc *TaskCache) GetExpiration() time.Duration { - switch tc.key { - case TaskByIdCacheKey: - return 5 * time.Minute - case TasksByWorkerCacheKey: - return 2 * time.Minute - default: - return 3 * time.Minute - } -} - // DOES NOT USE ANY DEFAULT VALUES, SO REMEMBER TO SET THE RIGHT STATUS // CreateTask creates a new task in the database with the provided details. // Ignores `Status` and `NumResults` fields as they are set to default values. @@ -111,14 +52,13 @@ func (o *TaskORM) CreateTask(ctx context.Context, task db.InnerTask, minerUserId // GetById with caching func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, error) { - taskCache := NewTaskCache(TaskByIdCacheKey) - taskCache.id = taskId + cacheKey := cache.BuildCacheKey(cache.TaskById, taskId) var task *db.TaskModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(taskCache, &task); err == nil { + if err := cache.GetCacheValue(cacheKey, &task); err == nil { return task, nil } @@ -134,7 +74,7 @@ func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, er } // Store in cache - if err := cache.SetCache(taskCache, task); err != nil { + if err := cache.SetCacheValue(cacheKey, task); err != nil { log.Warn().Err(err).Msg("Failed to set cache") } @@ -143,20 +83,21 @@ func (o *TaskORM) GetById(ctx context.Context, taskId string) (*db.TaskModel, er // Modified GetTasksByWorkerSubscription with caching func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId string, offset, limit int, sortQuery db.TaskOrderByParam, taskTypes []db.TaskType) ([]db.TaskModel, int, error) { - // Initialize cache - taskCache := NewTaskCache(TasksByWorkerCacheKey) - taskCache.workerId = workerId - taskCache.offset = offset - taskCache.limit = limit - taskCache.types = taskTypes + // Convert TaskTypes to strings + typeStrs := make([]string, len(taskTypes)) + for i, t := range taskTypes { + typeStrs[i] = string(t) + } + cacheKey := cache.BuildCacheKey(cache.TasksByWorker, workerId, strconv.Itoa(offset), strconv.Itoa(limit), strings.Join(typeStrs, ",")) var tasks []db.TaskModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(taskCache, &tasks); err == nil { + if err := cache.GetCacheValue(cacheKey, &tasks); err == nil { totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, nil) if err != nil { + log.Error().Err(err).Msgf("Error fetching total tasks for worker ID %v", workerId) return tasks, 0, err } return tasks, totalTasks, nil @@ -173,6 +114,7 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str db.WorkerPartner.IsDeleteByWorker.Equals(false), ).Exec(ctx) if err != nil { + log.Error().Err(err).Msgf("Error fetching WorkerPartner by WorkerID for worker ID %v", workerId) return nil, 0, err } @@ -182,6 +124,7 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str } if len(subscriptionKeys) == 0 { + log.Error().Msgf("No subscription keys found for worker ID %v", workerId) return nil, 0, err } @@ -204,16 +147,20 @@ func (o *TaskORM) GetTasksByWorkerSubscription(ctx context.Context, workerId str Take(limit). Exec(ctx) if err != nil { + log.Error().Err(err).Msgf("Error fetching tasks for worker ID %v", workerId) return nil, 0, err } totalTasks, err := o.countTasksByWorkerSubscription(ctx, taskTypes, subscriptionKeys) if err != nil { + log.Error().Err(err).Msgf("Error fetching total tasks for worker ID %v", workerId) return nil, 0, err } + log.Info().Int("totalTasks", totalTasks).Msgf("Successfully fetched total tasks fetched for worker ID %v", workerId) + // Store in cache - if err := cache.SetCache(taskCache, tasks); err != nil { + if err := cache.SetCacheValue(cacheKey, tasks); err != nil { log.Warn().Err(err).Msg("Failed to set cache") } diff --git a/pkg/orm/task_result.go b/pkg/orm/task_result.go index 77ef43b..9b7d9d3 100644 --- a/pkg/orm/task_result.go +++ b/pkg/orm/task_result.go @@ -12,48 +12,6 @@ import ( "github.com/rs/zerolog/log" ) -type TaskResultCacheKey string - -const ( - TrByTaskAndWorkerCacheKey TaskResultCacheKey = "task_result_by_task_and_worker" // Short key for task result by worker - TrByWorkerCacheKey TaskResultCacheKey = "task_result_by_worker" // Short key for task result by worker -) - -type TaskResultCache struct { - key TaskResultCacheKey - taskId string - workerId string -} - -func NewTaskResultCache(key TaskResultCacheKey) *TaskResultCache { - return &TaskResultCache{ - key: key, - } -} - -func (tc *TaskResultCache) GetCacheKey() string { - switch tc.key { - case TrByTaskAndWorkerCacheKey: - return fmt.Sprintf("%s:%s:%s", tc.key, tc.taskId, tc.workerId) - case TrByWorkerCacheKey: - return fmt.Sprintf("%s:%s", tc.key, tc.workerId) - - default: - return fmt.Sprintf("task_result:%s", tc.taskId) - } -} - -func (tc *TaskResultCache) GetExpiration() time.Duration { - switch tc.key { - case TrByTaskAndWorkerCacheKey: - return 10 * time.Minute - case TrByWorkerCacheKey: - return 10 * time.Minute - default: - return 1 * time.Minute - } -} - type TaskResultORM struct { client *db.PrismaClient clientWrapper *PrismaClientWrapper @@ -87,16 +45,13 @@ func (t *TaskResultORM) GetTaskResultsByTaskId(ctx context.Context, taskId strin } func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, taskId string, workerId string) ([]db.TaskResultModel, error) { - // Initialize cache - resultCache := NewTaskResultCache(TrByTaskAndWorkerCacheKey) - resultCache.taskId = taskId - resultCache.workerId = workerId + cacheKey := cache.BuildCacheKey(cache.TaskResultByTaskAndWorker, taskId, workerId) var results []db.TaskResultModel - cache := cache.GetCacheInstance() + cacheInstance := cache.GetCacheInstance() - // Try to get from cache first - if err := cache.GetCache(resultCache, &results); err == nil { + // Try to get from cache + if err := cacheInstance.GetCacheValue(cacheKey, &results); err == nil { return results, nil } @@ -113,24 +68,22 @@ func (t *TaskResultORM) GetCompletedTResultByTaskAndWorker(ctx context.Context, return nil, err } - // Store in cache - if err := cache.SetCache(resultCache, results); err != nil { - log.Warn().Err(err).Msg("Failed to set task result cache") + // Set cache + if err := cacheInstance.SetCacheValue(cacheKey, results); err != nil { + log.Warn().Err(err).Msg("Failed to set cache") } return results, nil } func (t *TaskResultORM) GetCompletedTResultByWorker(ctx context.Context, workerId string) ([]db.TaskResultModel, error) { - // Initialize cache - resultCache := NewTaskResultCache(TrByWorkerCacheKey) - resultCache.workerId = workerId + cacheKey := cache.BuildCacheKey(cache.TaskResultByWorker, workerId) var results []db.TaskResultModel cache := cache.GetCacheInstance() // Try to get from cache first - if err := cache.GetCache(resultCache, &results); err == nil { + if err := cache.GetCacheValue(cacheKey, &results); err == nil { return results, nil } @@ -147,7 +100,7 @@ func (t *TaskResultORM) GetCompletedTResultByWorker(ctx context.Context, workerI } // Store in cache - if err := cache.SetCache(resultCache, results); err != nil { + if err := cache.SetCacheValue(cacheKey, results); err != nil { log.Warn().Err(err).Msg("Failed to set task result cache") }