From de3ac65223905e9c6aa00527fcdc96fa373218c4 Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:53:58 +0700 Subject: [PATCH 1/4] perf: add retry mechanism on subtensor state (#66) * perf: added retry mechanism on substrate request * perf: added retry logic to remaining api request to sidecar * chore: removed deadcode - removed cmd/blockchain/main.go - removed block request on substrate.go --- cmd/blockchain/main.go | 56 ---------- pkg/api/controllers.go | 7 -- pkg/api/middlewares.go | 1 + pkg/blockchain/substrate.go | 205 +++++++++++------------------------- 4 files changed, 63 insertions(+), 206 deletions(-) delete mode 100644 cmd/blockchain/main.go diff --git a/cmd/blockchain/main.go b/cmd/blockchain/main.go deleted file mode 100644 index 14e031e..0000000 --- a/cmd/blockchain/main.go +++ /dev/null @@ -1,56 +0,0 @@ -package main - -import ( - "fmt" - "os" - "os/signal" - "syscall" - - "dojo-api/pkg/blockchain" - - "github.com/rs/zerolog/log" -) - -func main() { - substrateService := blockchain.NewSubstrateService() - testUid := 13 - testSubnetId := 52 - testHotkey, err := substrateService.GetHotkeyByUid(testSubnetId, testUid) - if err != nil { - log.Error().Err(err).Msg("Error getting hotkey") - } - log.Info().Msgf("Hotkey: %s", testHotkey) - substrateService.CheckIsRegistered(testSubnetId, testHotkey) - stake, err := substrateService.TotalHotkeyStake(testHotkey) - if err != nil { - log.Error().Err(err).Msg("Error getting hotkey stake") - } - log.Info().Msgf("Stake: %f", stake) - - blockchain.GetSubnetStateSubscriberInstance().GetSubnetState(testSubnetId) - - // Fetch the latest finalized block - substrateService.GetLatestFinalizedBlock() - - // Fetch the latest unfinalized block starting from a specific block ID - const initialBlockId = 1_923_000 - substrateService.GetLatestUnFinalizedBlock(initialBlockId) - - // TODO: write unit testing?? - // Initialize a SubnetStateSubscriber instance - subnetSubscriber := blockchain.GetSubnetStateSubscriberInstance() - - // Example usage of SubnetStateSubscriber - validatorHotkey := "***REMOVED***" - fmt.Println(subnetSubscriber.FindValidatorHotkeyIndex(validatorHotkey)) - - // Handling non-registered found case - subnetSubscriber.OnNonRegisteredFound(validatorHotkey) - fmt.Println(subnetSubscriber.FindValidatorHotkeyIndex(validatorHotkey)) - - // Wait for interrupt signal to gracefully shutdown the program - quit := make(chan os.Signal, 1) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - sig := <-quit - log.Info().Msgf("Received signal: %s. Shutting down...", sig) -} diff --git a/pkg/api/controllers.go b/pkg/api/controllers.go index 963f2a6..bf7ecad 100644 --- a/pkg/api/controllers.go +++ b/pkg/api/controllers.go @@ -117,11 +117,6 @@ func CreateTasksController(c *gin.Context) { // Log the headers of the request headers := c.Request.Header log.Info().Interface("headers", headers).Msg("Request headers") - - // Log the size of the request - requestSize := c.Request.ContentLength - log.Info().Int64("requestSize", requestSize).Msg("Request size") - log.Debug().Interface("request body", c.Request.Body).Msg("Creating tasks with request body") minerUserInterface, exists := c.Get("minerUser") @@ -157,8 +152,6 @@ func CreateTasksController(c *gin.Context) { return } - log.Info().Str("minerUser", fmt.Sprintf("%+v", minerUser)).Msg("Miner user found") - // Here we will handle file upload // Parse files from the form form, err := c.MultipartForm() diff --git a/pkg/api/middlewares.go b/pkg/api/middlewares.go index 58486e7..41646c2 100644 --- a/pkg/api/middlewares.go +++ b/pkg/api/middlewares.go @@ -336,6 +336,7 @@ func MinerAuthMiddleware() gin.HandlerFunc { subnetState := blockchain.GetSubnetStateSubscriberInstance() _, isFound := subnetState.FindMinerHotkeyIndex(foundApiKey.MinerUser().Hotkey) + if !isFound { log.Error().Msg("Miner hotkey is deregistered") c.AbortWithStatusJSON(http.StatusUnauthorized, defaultErrorResponse("Unauthorized")) diff --git a/pkg/blockchain/substrate.go b/pkg/blockchain/substrate.go index a3a1978..7d9c8ae 100644 --- a/pkg/blockchain/substrate.go +++ b/pkg/blockchain/substrate.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "math" + "math/rand" "net/http" "net/url" "os" @@ -28,6 +29,9 @@ const ( CacheKeyHotkeyTemplate string = "worker_api:sn%d_uid%d_hotkey" CacheKeyAxonInfoTemplate string = "worker_api:sn%d_hotkey%s_axon_info" CacheKeyTotalStakeTemplate string = "worker_api:hotkey%s_total_stake" + maxRetries = 5 + baseDelay = 2 * time.Second + maxDelay = 10 * time.Second ) type StorageResponse struct { @@ -117,23 +121,62 @@ func getCachedData[T any](cache *cache.Cache, cacheKey string) (*T, error) { return nil, err } -func (s *SubstrateService) DoGetRequest(path string, params url.Values) (*StorageResponse, error) { +func (s *SubstrateService) GetStorageRequest(path string, params url.Values) (*StorageResponse, error) { + var lastErr error + + // Exponential backoff retry + for attempt := 0; attempt <= maxRetries; attempt++ { + // Calculate base delay with exponential backoff, capped at maxDelay + delay := time.Duration(math.Min( + float64(baseDelay)*math.Pow(2, float64(attempt)), + float64(maxDelay), + )) + + // Add random jitter between 0 and 3 seconds + jitter := time.Duration(float64(3*time.Second) * rand.Float64()) + totalDelay := delay + jitter + + response, err := s.executeStorageRequest(path, params) + if err == nil { + return response, nil + } + + lastErr = err + + // Don't sleep on the last attempt + if attempt < maxRetries { + log.Warn(). + Err(err). + Int("attempt", attempt+1). + Float64("totalDelay_seconds", totalDelay.Seconds()). + Str("path", path). + Msg("Request failed, retrying...") + + time.Sleep(totalDelay) + } + } + + return nil, fmt.Errorf("all retry attempts failed after %d attempts: %w", maxRetries, lastErr) +} + +func (s *SubstrateService) executeStorageRequest(path string, params url.Values) (*StorageResponse, error) { req, err := http.NewRequest("GET", path, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create request: %w", err) } + req.URL.RawQuery = params.Encode() + resp, err := s.httpClient.Do(req) if err != nil { - return nil, err + return nil, fmt.Errorf("request failed: %w", err) } // Ensure the response body is closed defer resp.Body.Close() - // close connection after request - req.Close = true + body, err := io.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read response: %w", err) } // check if the body is empty @@ -142,11 +185,16 @@ func (s *SubstrateService) DoGetRequest(path string, params url.Values) (*Storag return nil, fmt.Errorf("empty response body") } + // Check status code + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body)) + } + var storageResponse StorageResponse - err = json.Unmarshal(body, &storageResponse) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) + if err := json.Unmarshal(body, &storageResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w, body: %s", err, string(body)) } + return &storageResponse, nil } @@ -159,7 +207,7 @@ func (s *SubstrateService) GetMaxUID(subnetId int) (int, error) { path := fmt.Sprintf("%s/pallets/subtensorModule/storage/SubnetworkN", s.substrateApiUrl) params := url.Values{} params.Add("keys[]", strconv.Itoa(subnetId)) - storageResponse, err := s.DoGetRequest(path, params) + storageResponse, err := s.GetStorageRequest(path, params) if err != nil { log.Error().Err(err).Msgf("Error getting max UID for subnet %d", subnetId) return 0, err @@ -186,7 +234,7 @@ func (s *SubstrateService) GetHotkeyByUid(subnetId int, uid int) (string, error) params := url.Values{} params.Add("keys[]", strconv.Itoa(subnetId)) params.Add("keys[]", strconv.Itoa(uid)) - storageResponse, err := s.DoGetRequest(path, params) + storageResponse, err := s.GetStorageRequest(path, params) if err != nil { log.Error().Err(err).Msgf("Error getting hotkey for uid %d", uid) return "", err @@ -215,7 +263,7 @@ func (s *SubstrateService) GetAxonInfo(subnetId int, hotkey string) (*AxonInfo, params := url.Values{} params.Add("keys[]", strconv.Itoa(subnetId)) params.Add("keys[]", hotkey) - storageResponse, err := s.DoGetRequest(path, params) + storageResponse, err := s.GetStorageRequest(path, params) if err != nil { log.Error().Err(err).Msgf("Error getting axon info for hotkey %s", hotkey) return nil, err @@ -302,7 +350,7 @@ func (s *SubstrateService) CheckIsRegistered(subnetUid int, hotkey string) (bool params := url.Values{} params.Add("keys[]", hotkey) params.Add("keys[]", strconv.Itoa(subnetUid)) - storageResponse, err := s.DoGetRequest(path, params) + storageResponse, err := s.GetStorageRequest(path, params) if err != nil { log.Error().Err(err).Msgf("Error checking if hotkey %s is registered", hotkey) return false, err @@ -325,7 +373,7 @@ func (s *SubstrateService) TotalHotkeyStake(hotkey string) (float64, error) { path := fmt.Sprintf("%s/pallets/subtensorModule/storage/TotalHotkeyStake", s.substrateApiUrl) params := url.Values{} params.Add("keys[]", hotkey) - storageResponse, err := s.DoGetRequest(path, params) + storageResponse, err := s.GetStorageRequest(path, params) if err != nil { log.Error().Err(err).Msgf("Error getting total hotkey stake for hotkey %s", hotkey) return 0, err @@ -421,132 +469,3 @@ func (s *SubstrateService) RuntimeSpec() (*RuntimeSpec, error) { s.cache.SetWithExpire(CacheKeyRuntimeSpec, string(body), 24*time.Hour) return &runtimeSpec, nil } - -type BlockErrorResponse struct { - Code int `json:"code"` - Message string `json:"message"` - Stack string `json:"stack"` - Level string `json:"level"` -} - -type BlockResponse struct { - Number string `json:"number"` - Hash string `json:"hash"` - ParentHash string `json:"parentHash"` - StateRoot string `json:"stateRoot"` - ExtrinsicsRoot string `json:"extrinsicsRoot"` - Logs []struct { - Type string `json:"type"` - Index string `json:"index"` - Value []string `json:"value"` - } `json:"logs"` - OnFinalize struct { - Events []interface{} `json:"events"` - } `json:"onFinalize"` - Finalized bool `json:"finalized"` -} - -func (s *SubstrateService) getBlockById(blockId int) (*BlockResponse, error) { - log.Info().Msgf("Fetching block with ID: %d", blockId) - path := fmt.Sprintf("%s/blocks/%d", s.substrateApiUrl, blockId) - req, err := http.NewRequest("GET", path, nil) - if err != nil { - log.Error().Err(err).Msgf("Failed to create request for block ID: %d", blockId) - return nil, err - } - resp, err := s.httpClient.Do(req) - if err != nil { - log.Error().Err(err).Msgf("Failed to fetch block ID: %d", blockId) - return nil, err - } - // Ensure the response body is closed - defer resp.Body.Close() - // close connection after request - req.Close = true - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Error().Err(err).Msgf("Failed to read response body for block ID: %d", blockId) - return nil, err - } - - var block BlockResponse - err = json.Unmarshal(body, &block) - if err != nil || reflect.DeepEqual(block, BlockResponse{}) { - log.Error().Err(err).Msgf("Failed to unmarshal block response for block ID: %d, trying to unmarshal block error response", blockId) - var blockError BlockErrorResponse - err = json.Unmarshal(body, &blockError) - if err != nil { - log.Error().Err(err).Msgf("Failed to unmarshal block error response for block ID: %d", blockId) - return nil, err - } - return nil, fmt.Errorf("block error message: %s, stack: %s", blockError.Message, blockError.Stack) - } - - log.Info().Msgf("Successfully fetched block with ID: %d, data: %v", blockId, block) - return &block, nil -} - -// Use binary search to get the latest block since substrate API's -func (s *SubstrateService) GetLatestUnFinalizedBlock(low int) (*BlockResponse, error) { - // try to get latest block, assume an initial block that's way too large - const blocksPeryear = (24 * 3600 * 360) / 12 - high := low + blocksPeryear - log.Info().Msgf("Searching for the latest block between %d and %d", low, high) - var latestBlock *BlockResponse - for low <= high { - mid := (low + high) / 2 - block, err := s.getBlockById(mid) - if err != nil { - log.Warn().Msgf("Block ID: %d not found, adjusting search range", mid) - high = mid - 1 - } else { - log.Info().Msgf("Block ID: %d found, updating latest block and adjusting search range", mid) - latestBlock = block - low = mid + 1 - } - } - - if latestBlock != nil { - log.Info().Msgf("Latest block number: %+v", latestBlock.Number) - return latestBlock, nil - } - - log.Error().Msg("Failed to find the latest block") - return nil, fmt.Errorf("failed to find the latest block") -} - -func (s *SubstrateService) GetLatestFinalizedBlock() (*BlockResponse, error) { - path := fmt.Sprintf("%s/blocks/head", s.substrateApiUrl) - - req, err := http.NewRequest("GET", path, nil) - if err != nil { - log.Error().Err(err).Msg("Failed to create new HTTP request") - return nil, err - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Error().Err(err).Msg("Failed to execute HTTP request") - return nil, err - } - // Ensure the response body is closed - defer resp.Body.Close() - // close connection after request - req.Close = true - - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Error().Err(err).Msg("Failed to read response body") - return nil, err - } - - var block BlockResponse - err = json.Unmarshal(body, &block) - if err != nil { - log.Error().Err(err).Msg("Failed to unmarshal response body into BlockResponse") - return nil, err - } - - log.Info().Msgf("Successfully fetched latest finalized block: %+v", block) - return &block, nil -} From 3c793a4751731235bc227f8812c621a8f1890c0e Mon Sep 17 00:00:00 2001 From: jarvis8x7b <157810922+jarvis8x7b@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:50:32 +0800 Subject: [PATCH 2/4] fix: add token runtime spec for testnet (#68) --- pkg/blockchain/substrate.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/blockchain/substrate.go b/pkg/blockchain/substrate.go index 7d9c8ae..413ec57 100644 --- a/pkg/blockchain/substrate.go +++ b/pkg/blockchain/substrate.go @@ -1,6 +1,8 @@ package blockchain import ( + "dojo-api/pkg/cache" + "dojo-api/utils" "encoding/json" "errors" "fmt" @@ -15,9 +17,6 @@ import ( "sync" "time" - "dojo-api/pkg/cache" - "dojo-api/utils" - "github.com/joho/godotenv" "github.com/rs/zerolog/log" ) @@ -391,7 +390,7 @@ func (s *SubstrateService) TotalHotkeyStake(hotkey string) (float64, error) { } for i, tokenSymbol := range runtimeSpec.Properties.TokenSymbol { - if tokenSymbol == "TAO" { + if tokenSymbol == "TAO" || tokenSymbol == "testTAO" { tokenDecimals, err := strconv.Atoi(runtimeSpec.Properties.TokenDecimals[i]) if err != nil { log.Error().Err(err).Msg("Error converting token decimals to int") From 65846893882b9301911a57f2e6fd3fb3b3c4dd2f Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:09:07 +0700 Subject: [PATCH 3/4] chore: added header log in custom gin log (#69) --- pkg/api/controllers.go | 6 ------ pkg/api/utils.go | 1 + 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/api/controllers.go b/pkg/api/controllers.go index bf7ecad..cf5e65b 100644 --- a/pkg/api/controllers.go +++ b/pkg/api/controllers.go @@ -114,9 +114,6 @@ func WorkerLoginController(c *gin.Context) { func CreateTasksController(c *gin.Context) { log.Info().Msg("Creating Tasks") - // Log the headers of the request - headers := c.Request.Header - log.Info().Interface("headers", headers).Msg("Request headers") log.Debug().Interface("request body", c.Request.Body).Msg("Creating tasks with request body") minerUserInterface, exists := c.Get("minerUser") @@ -469,9 +466,6 @@ func GetTaskByIdController(c *gin.Context) { taskID := c.Param("task-id") taskService := task.NewTaskService() - // TODO: Remove this after testing - log.Info().Interface("Headers", c.Request.Header).Msg("Request Headers") - task, err := taskService.GetTaskResponseById(c.Request.Context(), taskID) if err != nil { c.JSON(http.StatusInternalServerError, defaultErrorResponse("Internal server error")) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 5a57ea4..1b3280e 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -192,6 +192,7 @@ func CustomGinLogger(logger *zerolog.Logger) gin.HandlerFunc { Str("path", param.Path). Int("resp_size", param.BodySize). Int("req_size", len(body)). + Interface("headers", c.Request.Header). Msg("") } } From 5141e1f3c62915cea9e7a8c80e067d917a2d0f62 Mon Sep 17 00:00:00 2001 From: codebender <167290009+codebender37@users.noreply.github.com> Date: Fri, 20 Dec 2024 14:18:33 +0700 Subject: [PATCH 4/4] fix: disable InMetagraphOnly() middleware on create-task route to monitor the format of diff IP addr first (#70) --- pkg/api/routes.go | 3 ++- pkg/api/utils.go | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 4020044..dce9a0e 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -23,7 +23,8 @@ func LoginRoutes(router *gin.Engine) { tasks := apiV1.Group("/tasks") { tasks.PUT("/submit-result/:task-id", WriteTaskRateLimiter(), WorkerAuthMiddleware(), SubmitTaskResultController) - tasks.POST("/create-tasks", InMetagraphOnly(), WriteTaskRateLimiter(), MinerAuthMiddleware(), CreateTasksController) + // TODO: re-enable InMetagraphOnly() in future + tasks.POST("/create-tasks", WriteTaskRateLimiter(), MinerAuthMiddleware(), CreateTasksController) tasks.GET("/task-result/:task-id", ReadTaskRateLimiter(), GetTaskResultsController) tasks.GET("/:task-id", ReadTaskRateLimiter(), GetTaskByIdController) tasks.GET("/next-task/:task-id", ReadTaskRateLimiter(), WorkerAuthMiddleware(), GetNextInProgressTaskController) diff --git a/pkg/api/utils.go b/pkg/api/utils.go index 1b3280e..df8e5d1 100644 --- a/pkg/api/utils.go +++ b/pkg/api/utils.go @@ -184,14 +184,19 @@ func CustomGinLogger(logger *zerolog.Logger) gin.HandlerFunc { logger := log.With().Logger().Output(consoleWriter) + // Log main request info logger.Info(). - Int("status_code", param.StatusCode). - Str("latency", param.Latency.String()). - Str("ip", param.ClientIP). Str("method", param.Method). Str("path", param.Path). - Int("resp_size", param.BodySize). + Int("status_code", param.StatusCode). + Str("latency", param.Latency.String()). Int("req_size", len(body)). + Int("resp_size", param.BodySize). + Str("ip", param.ClientIP). + Msg("") + + // Log headers separately + logger.Info(). Interface("headers", c.Request.Header). Msg("") }