Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update metric api to cumulate task count after db purge #60

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/seed/fixtures/fixture_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (o *FixtureService) CreateDefaultTask(ctx context.Context, title string, ex
db.Task.Type.Set(db.TaskTypeCodeGeneration),
db.Task.TaskData.Set(types.JSON(taskDataJSON)),
db.Task.Status.Set(db.TaskStatusInProgress),
db.Task.MaxResults.Set(10),
db.Task.MaxResults.Set(1),
db.Task.NumResults.Set(0),
db.Task.NumCriteria.Set(0),
db.Task.TotalReward.Set(101.0),
Expand Down
15 changes: 5 additions & 10 deletions cmd/seed/task_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
"max": 100,
"min": 1,
"type": "multi-score",
"options": [
"459520f8-b654-41a8-9f94-93e003c4ecb5",
"024ffca6-aac0-40d2-959b-1ab7b165b68e",
"4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"1b26f82c-935f-432b-a60d-f21204e580f2"
]
"options": ["1", "2", "3", "4"]
}
],
"responses": [
{
"model": "459520f8-b654-41a8-9f94-93e003c4ecb5",
"model": "1",
"completion": {
"files": [
{
Expand All @@ -29,7 +24,7 @@
}
},
{
"model": "024ffca6-aac0-40d2-959b-1ab7b165b68e",
"model": "2",
"completion": {
"files": [
{
Expand All @@ -42,7 +37,7 @@
}
},
{
"model": "4fa05b97-cd66-4df7-bb3a-298d2b04b148",
"model": "3",
"completion": {
"files": [
{
Expand All @@ -55,7 +50,7 @@
}
},
{
"model": "1b26f82c-935f-432b-a60d-f21204e580f2",
"model": "4",
"completion": {
"files": [
{
Expand Down
16 changes: 16 additions & 0 deletions pkg/api/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func CreateTasksController(c *gin.Context) {
taskIds = append(taskIds, task.ID)
}

// Clean up the cache
cache := cache.GetCacheInstance()
cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*")

c.JSON(http.StatusOK, defaultSuccessResponse(taskIds))
}

Expand Down Expand Up @@ -305,6 +309,15 @@ func SubmitTaskResultController(c *gin.Context) {
return
}

// Remove from cache
cache := cache.GetCacheInstance()
cache.DeleteWithSuffix(cache.Keys.TaskResultByWorker, worker.ID)

// Clean all task:worker:* cache entries
if err := cache.DeleteByPattern(string(cache.Keys.TasksByWorker) + ":*"); err != nil {
log.Error().Err(err).Msg("Failed to clean task:worker cache entries")
}

// Update the metric data with goroutine
handleMetricData(taskData, updatedTask)

Expand Down Expand Up @@ -472,6 +485,9 @@ func GetTaskByIdController(c *gin.Context) {
taskID := c.Param("task-id")
taskService := task.NewTaskService()

// TODO: Remove this after testing
log.Info().Interface("Headers", c.Request.Header).Msg("Request Headers")

task, err := taskService.GetTaskResponseById(c.Request.Context(), taskID)
if err != nil {
c.JSON(http.StatusInternalServerError, defaultErrorResponse("Internal server error"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func getCallerIP(c *gin.Context) string {
// TODO - Need to check if this is the correct way without getting spoofing
if runtimeEnv := utils.LoadDotEnv("RUNTIME_ENV"); runtimeEnv == "aws" {
callerIp := c.Request.Header.Get("X-Original-Forwarded-For")
log.Trace().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
log.Debug().Msgf("Got caller IP from X-Original-Forwarded-For header: %s", callerIp)
return callerIp
}
callerIp := c.ClientIP()
log.Trace().Msgf("Got caller IP from ClientIP: %s", callerIp)
log.Debug().Msgf("Got caller IP from ClientIP: %s", callerIp)
return callerIp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/blockchain/subnet_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *SubnetStateSubscriber) OnRegisteredFound(hotkey string) {
_, err := minerUserORM.GetUserByHotkey(hotkey)
if err != nil {
if err == db.ErrNotFound {
log.Info().Msg("User not found, continuing...")
log.Debug().Msg("User not found, continuing...")
return
}
log.Error().Err(err).Msg("Error getting user by hotkey")
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockchain/substrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *SubstrateService) GetAxonInfo(subnetId int, hotkey string) (*AxonInfo,
}

if storageResponse.Value == nil {
log.Warn().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey)
log.Debug().Msgf("Value is nil for hotkey %s, means they are not serving an axon", hotkey)
return nil, errors.New("value is nil")
}

Expand Down
147 changes: 109 additions & 38 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type RedisConfig struct {

type Cache struct {
Redis redis.Client
Keys CacheKeys
}

var (
Expand All @@ -32,54 +33,57 @@ var (
mu sync.Mutex
)

// CacheKey type for type-safe cache keys
type CacheKey string

const (
// CacheKeys holds all cache key constants
type CacheKeys struct {
// Task cache keys
TaskById CacheKey = "task" // Single task by ID
TasksByWorker CacheKey = "task:worker" // List of tasks by worker
TaskById CacheKey
TasksByWorker CacheKey

// Task Result cache keys
TaskResultByTaskAndWorker CacheKey = "tr:task:worker" // Task result by task ID and worker ID
TaskResultByWorker CacheKey = "tr:worker" // Task results by worker ID
TaskResultByTaskAndWorker CacheKey
TaskResultByWorker CacheKey
TaskResultsTotal CacheKey

// Worker cache keys
WorkerByWallet CacheKey = "worker:wallet" // Worker by wallet address
WorkerCount CacheKey = "worker:count" // Total worker count
WorkerByWallet CacheKey
WorkerCount CacheKey

// Subscription cache keys
SubByHotkey CacheKey = "sub:hotkey" // Subscription by hotkey
SubByKey CacheKey = "sub:key" // Subscription by key
)

// CacheConfig defines cache keys and their expiration times
var CacheConfig = map[CacheKey]time.Duration{
TaskById: 5 * time.Minute,
TasksByWorker: 2 * time.Minute,
TaskResultByTaskAndWorker: 10 * time.Minute,
TaskResultByWorker: 10 * time.Minute,
WorkerByWallet: 5 * time.Minute,
WorkerCount: 1 * time.Minute,
SubByHotkey: 5 * time.Minute,
SubByKey: 5 * time.Minute,
SubByHotkey CacheKey
SubByKey CacheKey
}

// GetCacheExpiration returns the expiration time for a given cache key
func GetCacheExpiration(key CacheKey) time.Duration {
if duration, exists := CacheConfig[key]; exists {
return duration
}
return 5 * time.Minute // default expiration
// Default cache keys
var cacheKeys = CacheKeys{
// Task cache keys
TaskById: "task",
TasksByWorker: "task:worker",

// Task Result cache keys
TaskResultByTaskAndWorker: "tr:task:worker",
TaskResultByWorker: "tr:worker",
TaskResultsTotal: "metrics:tr:total",

// Worker cache keys
WorkerByWallet: "worker:wallet",
WorkerCount: "worker:count",

// Subscription cache keys
SubByHotkey: "sub:hotkey",
SubByKey: "sub:key",
}

// BuildCacheKey builds a cache key with the given prefix and components
func BuildCacheKey(prefix CacheKey, components ...string) string {
key := string(prefix)
for _, component := range components {
key += ":" + component
}
return key
var cacheExpirations = map[CacheKey]time.Duration{
cacheKeys.TaskById: 5 * time.Minute,
cacheKeys.TasksByWorker: 2 * time.Minute,
cacheKeys.TaskResultByTaskAndWorker: 10 * time.Minute,
cacheKeys.TaskResultByWorker: 10 * time.Minute,
cacheKeys.WorkerByWallet: 5 * time.Minute,
cacheKeys.WorkerCount: 1 * time.Minute,
cacheKeys.SubByHotkey: 5 * time.Minute,
cacheKeys.SubByKey: 5 * time.Minute,
}

func GetCacheInstance() *Cache {
Expand Down Expand Up @@ -119,11 +123,31 @@ func GetCacheInstance() *Cache {
}

log.Info().Msgf("Successfully connected to Redis and ping succeeded")
instance = &Cache{Redis: *redisClient}
instance = &Cache{
Redis: *redisClient,
Keys: cacheKeys,
}
})
return instance
}

// GetCacheExpiration returns the expiration time for a given cache key
func (c *Cache) GetCacheExpiration(key CacheKey) time.Duration {
if duration, exists := cacheExpirations[key]; exists {
return duration
}
return 5 * time.Minute // default expiration
}

// BuildCacheKey builds a cache key with the given prefix and components
func (c *Cache) BuildCacheKey(prefix CacheKey, components ...string) string {
key := string(prefix)
for _, component := range components {
key += ":" + component
}
return key
}

func (c *Cache) SetWithExpire(key string, value interface{}, expiration time.Duration) error {
switch v := value.(type) {
case string:
Expand All @@ -148,7 +172,7 @@ func (c *Cache) Get(key string) (string, error) {
// val, err := rc.Redis.Do(ctx, rc.Redis.B().Get().Key(key).Build()).AsBytes()
val, err := c.Redis.Get(ctx, key).Bytes()
if err == redis.Nil {
log.Error().Err(err).Str("key", key).Msg("Key not found in Redis")
log.Debug().Str("key", key).Msg("Key not found in Redis")
return "", err
} else if err != nil {
log.Panic().Err(err).Msg("Failed to get from Redis ...")
Expand Down Expand Up @@ -179,11 +203,58 @@ func (c *Cache) SetCacheValue(key string, value interface{}) error {
return fmt.Errorf("failed to marshal data: %w", err)
}

expiration := GetCacheExpiration(CacheKey(key))
expiration := c.GetCacheExpiration(CacheKey(key))
if err := c.SetWithExpire(key, dataBytes, expiration); err != nil {
return fmt.Errorf("failed to set cache: %w", err)
}

log.Info().Msgf("Successfully set cache for key: %s", key)
return nil
}

// Delete removes a single specific cache key
// Example: cache.Delete("task:worker:123")
func (c *Cache) Delete(key string) error {
ctx := context.Background()
deleted, err := c.Redis.Del(ctx, key).Result()
if err != nil {
log.Error().Err(err).Str("key", key).Msg("Failed to clean cache key")
return fmt.Errorf("failed to delete key: %w", err)
}

if deleted > 0 {
log.Info().Str("key", key).Msg("Clean up Cache")
} else {
log.Debug().Str("key", key).Msg("Cache key not found")
}
return nil
}

// DeleteByPattern removes all cache entries matching the given pattern
// Example: cache.DeleteByPattern("task:worker:*") - deletes all worker task caches
// Example: cache.DeleteByPattern("user:123:*") - deletes all user 123's caches
func (c *Cache) DeleteByPattern(pattern string) error {
ctx := context.Background()
keys, err := c.Redis.Keys(ctx, pattern).Result()
if err != nil {
log.Error().Err(err).Str("pattern", pattern).Msg("Failed to get keys")
return fmt.Errorf("failed to get keys: %w", err)
}

if len(keys) > 0 {
deleted, err := c.Redis.Del(ctx, keys...).Result()
if err != nil {
log.Error().Err(err).Msg("Failed to delete keys")
return err
}
log.Info().Int64("deleted", deleted).Str("pattern", pattern).Msg("Clean up Cache")
}
return nil
}

// DeleteWithSuffix removes a single cache key built with prefix and suffix components
// Example: cache.DeleteWithSuffix(cache.TasksByWorker, "123") -> deletes "task:worker:123"
func (c *Cache) DeleteWithSuffix(prefix CacheKey, suffixes ...string) error {
key := c.BuildCacheKey(prefix, suffixes...)
return c.Delete(key)
}
Loading