Skip to content

Commit

Permalink
refactor: update metric api to cumulate task count after db purge (#60)
Browse files Browse the repository at this point in the history
* refactor: update metric api to cumulate task count after db purge

- increment the task result count everytime the api is called instead of fetch from TaskResult table
- use Redis INCR for atomic increments that work across all instances

* fix: fixed from PR feedback

- added some changes for PR feedback
- removed cache on GetTasksByWorkerSubscription to fix task list not properly updated

* fix: add more changes for PR, and edges cases

- add TasksByWorker cache back that was removed because of inconsistent UI
- reset the related cache after successful task sumit, or create
- refactor some code in cache.go
- add some changes from PR feedback

* chore: clean up substrate logs

* fix: bug fixed on changing expired time

- use UTC on current time when checking on expired task
- remove the check for expire task with no result, since schedular handling it already
  • Loading branch information
codebender37 authored Dec 11, 2024
1 parent 2bf5131 commit f2654a6
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 131 deletions.
2 changes: 1 addition & 1 deletion cmd/seed/fixtures/fixture_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (o *FixtureService) CreateDefaultTask(ctx context.Context, title string, ex
db.Task.Type.Set(db.TaskTypeCodeGeneration),
db.Task.TaskData.Set(types.JSON(taskDataJSON)),
db.Task.Status.Set(db.TaskStatusInProgress),
db.Task.MaxResults.Set(10),
db.Task.MaxResults.Set(1),
db.Task.NumResults.Set(0),
db.Task.NumCriteria.Set(0),
db.Task.TotalReward.Set(101.0),
Expand Down
15 changes: 5 additions & 10 deletions cmd/seed/task_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
"max": 100,
"min": 1,
"type": "multi-score",
"options": [
"459520f8-b654-41a8-9f94-93e003c4ecb5",
"024ffca6-aac0-40d2-959b-1ab7b165b68e",
"4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"1b26f82c-935f-432b-a60d-f21204e580f2"
]
"options": ["1", "2", "3", "4"]
}
],
"responses": [
{
"model": "459520f8-b654-41a8-9f94-93e003c4ecb5",
"model": "1",
"completion": {
"files": [
{
Expand All @@ -29,7 +24,7 @@
}
},
{
"model": "024ffca6-aac0-40d2-959b-1ab7b165b68e",
"model": "2",
"completion": {
"files": [
{
Expand All @@ -42,7 +37,7 @@
}
},
{
"model": "4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"model": "3",
"completion": {
"files": [
{
Expand All @@ -55,7 +50,7 @@
}
},
{
"model": "1b26f82c-935f-432b-a60d-f21204e580f2",
"model": "4",
"completion": {
"files": [
{
Expand Down
16 changes: 16 additions & 0 deletions pkg/api/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func CreateTasksController(c *gin.Context) {
taskIds = append(taskIds, task.ID)
}

// Clean up the cache
cache := cache.GetCacheInstance()
cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*")

c.JSON(http.StatusOK, defaultSuccessResponse(taskIds))
}

Expand Down Expand Up @@ -305,6 +309,15 @@ func SubmitTaskResultController(c *gin.Context) {
return
}

// Remove from cache
cache := cache.GetCacheInstance()
cache.DeleteWithSuffix(cache.Keys.TaskResultByWorker, worker.ID)

// Clean all task:worker:* cache entries
if err := cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*"); err != nil {
log.Error().Err(err).Msg("Failed to clean task:worker cache entries")
}

// Update the metric data with goroutine
handleMetricData(taskData, updatedTask)

Expand Down Expand Up @@ -472,6 +485,9 @@ func GetTaskByIdController(c *gin.Context) {
taskID := c.Param("task-id")
taskService := task.NewTaskService()

// TODO: Remove this after testing
log.Info().Interface("Headers", c.Request.Header).Msg("Request Headers")

task, err := taskService.GetTaskResponseById(c.Request.Context(), taskID)
if err != nil {
c.JSON(http.StatusInternalServerError, defaultErrorResponse("Internal server error"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string {
// TODO - Need to check if this is the correct way without getting spoofing
if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" {
callerIp := c.Request.Header.Get("X-Original-Forwarded-For")
log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
log.Debug().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
return callerIp
}
callerIp := c.ClientIP()
log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp)
log.Debug().Msgf("Got caller IP from ClientIP: %s", callerIp)
return callerIp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/blockchain/subnet_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *SubnetStateSubscriber) OnRegisteredFound(hotkey string) {
_, err := minerUserORM.GetUserByHotkey(hotkey)
if err != nil {
if err == db.ErrNotFound {
log.Info().Msg("User not found, continuing...")
log.Debug().Msg("User not found, continuing...")
return
}
log.Error().Err(err).Msg("Error getting user by hotkey")
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockchain/substrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *SubstrateService) GetAxonInfo(subnetId int, hotkey string) (*AxonInfo,
}

if storageResponse.Value == nil {
log.Warn().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey)
log.Debug().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey)
return nil, errors.New("value is nil")
}

Expand Down
147 changes: 109 additions & 38 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type RedisConfig struct {

type Cache struct {
Redis redis.Client
Keys CacheKeys
}

var (
Expand All @@ -32,54 +33,57 @@ var (
mu sync.Mutex
)

// CacheKey type for type-safe cache keys
type CacheKey string

const (
// CacheKeys holds all cache key constants
type CacheKeys struct {
// Task cache keys
TaskById CacheKey = "task" // Single task by ID
TasksByWorker CacheKey = "task:worker" // List of tasks by worker
TaskById CacheKey
TasksByWorker CacheKey

// Task Result cache keys
TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID
TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID
TaskResultByTaskAndWorker CacheKey
TaskResultByWorker CacheKey
TaskResultsTotal CacheKey

// Worker cache keys
WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address
WorkerCount CacheKey = "worker:count" // Total worker count
WorkerByWallet CacheKey
WorkerCount CacheKey

// Subscription cache keys
SubByHotkey CacheKey = "sub:hotkey" // Subscription by hotkey
SubByKey CacheKey = "sub:key" // Subscription by key
)

// CacheConfig defines cache keys and their expiration times
var CacheConfig = map[CacheKey]time.Duration{
TaskById: 5 * time.Minute,
TasksByWorker: 2 * time.Minute,
TaskResultByTaskAndWorker: 10 * time.Minute,
TaskResultByWorker: 10 * time.Minute,
WorkerByWallet: 5 * time.Minute,
WorkerCount: 1 * time.Minute,
SubByHotkey: 5 * time.Minute,
SubByKey: 5 * time.Minute,
SubByHotkey CacheKey
SubByKey CacheKey
}

// GetCacheExpiration returns the expiration time for a given cache key
func GetCacheExpiration(key CacheKey) time.Duration {
if duration, exists := CacheConfig[key]; exists {
return duration
}
return 5 * time.Minute // default expiration
// Default cache keys
var cacheKeys = CacheKeys{
// Task cache keys
TaskById: "task",
TasksByWorker: "task:worker",

// Task Result cache keys
TaskResultByTaskAndWorker: "tr:task:worker",
TaskResultByWorker: "tr:worker",
TaskResultsTotal: "metrics:tr:total",

// Worker cache keys
WorkerByWallet: "worker:wallet",
WorkerCount: "worker:count",

// Subscription cache keys
SubByHotkey: "sub:hotkey",
SubByKey: "sub:key",
}

// BuildCacheKey builds a cache key with the given prefix and components
func BuildCacheKey(prefix CacheKey, components ...string) string {
key := string(prefix)
for _, component := range components {
key += ":" + component
}
return key
var cacheExpirations = map[CacheKey]time.Duration{
cacheKeys.TaskById: 5 * time.Minute,
cacheKeys.TasksByWorker: 2 * time.Minute,
cacheKeys.TaskResultByTaskAndWorker: 10 * time.Minute,
cacheKeys.TaskResultByWorker: 10 * time.Minute,
cacheKeys.WorkerByWallet: 5 * time.Minute,
cacheKeys.WorkerCount: 1 * time.Minute,
cacheKeys.SubByHotkey: 5 * time.Minute,
cacheKeys.SubByKey: 5 * time.Minute,
}

func GetCacheInstance() *Cache {
Expand Down Expand Up @@ -119,11 +123,31 @@ func GetCacheInstance() *Cache {
}

log.Info().Msgf("Successfully connected to Redis and ping succeeded")
instance = &Cache{Redis: *redisClient}
instance = &Cache{
Redis: *redisClient,
Keys: cacheKeys,
}
})
return instance
}

// GetCacheExpiration returns the expiration time for a given cache key
func (c *Cache) GetCacheExpiration(key CacheKey) time.Duration {
if duration, exists := cacheExpirations[key]; exists {
return duration
}
return 5 * time.Minute // default expiration
}

// BuildCacheKey builds a cache key with the given prefix and components
func (c *Cache) BuildCacheKey(prefix CacheKey, components ...string) string {
key := string(prefix)
for _, component := range components {
key += ":" + component
}
return key
}

func (c *Cache) SetWithExpire(key string, value interface{}, expiration time.Duration) error {
switch v := value.(type) {
case string:
Expand All @@ -148,7 +172,7 @@ func (c *Cache) Get(key string) (string, error) {
// val, err := rc.Redis.Do(ctx, rc.Redis.B().Get().Key(key).Build()).AsBytes()
val, err := c.Redis.Get(ctx, key).Bytes()
if err == redis.Nil {
log.Error().Err(err).Str("key", key).Msg("Key not found in Redis")
log.Debug().Str("key", key).Msg("Key not found in Redis")
return "", err
} else if err != nil {
log.Panic().Err(err).Msg("Failed to get from Redis ...")
Expand Down Expand Up @@ -179,11 +203,58 @@ func (c *Cache) SetCacheValue(key string, value interface{}) error {
return fmt.Errorf("failed to marshal data: %w", err)
}

expiration := GetCacheExpiration(CacheKey(key))
expiration := c.GetCacheExpiration(CacheKey(key))
if err := c.SetWithExpire(key, dataBytes, expiration); err != nil {
return fmt.Errorf("failed to set cache: %w", err)
}

log.Info().Msgf("Successfully set cache for key: %s", key)
return nil
}

// Delete removes a single specific cache key
// Example: cache.Delete("task:worker:123")
func (c *Cache) Delete(key string) error {
ctx := context.Background()
deleted, err := c.Redis.Del(ctx, key).Result()
if err != nil {
log.Error().Err(err).Str("key", key).Msg("Failed to clean cache key")
return fmt.Errorf("failed to delete key: %w", err)
}

if deleted > 0 {
log.Info().Str("key", key).Msg("Clean up Cache")
} else {
log.Debug().Str("key", key).Msg("Cache key not found")
}
return nil
}

// DeleteByPattern removes all cache entries matching the given pattern
// Example: cache.DeleteByPattern("task:worker:*") - deletes all worker task caches
// Example: cache.DeleteByPattern("user:123:*") - deletes all user 123's caches
func (c *Cache) DeleteByPattern(pattern string) error {
ctx := context.Background()
keys, err := c.Redis.Keys(ctx, pattern).Result()
if err != nil {
log.Error().Err(err).Str("pattern", pattern).Msg("Failed to get keys")
return fmt.Errorf("failed to get keys: %w", err)
}

if len(keys) > 0 {
deleted, err := c.Redis.Del(ctx, keys...).Result()
if err != nil {
log.Error().Err(err).Msg("Failed to delete keys")
return err
}
log.Info().Int64("deleted", deleted).Str("pattern", pattern).Msg("Clean up Cache")
}
return nil
}

// DeleteWithSuffix removes a single cache key built with prefix and suffix components
// Example: cache.DeleteWithSuffix(cache.TasksByWorker, "123") -> deletes "task:worker:123"
func (c *Cache) DeleteWithSuffix(prefix CacheKey, suffixes ...string) error {
key := c.BuildCacheKey(prefix, suffixes...)
return c.Delete(key)
}
Loading

0 comments on commit f2654a6

Please sign in to comment.