From ee1209789f917318ce76a23418d7457178aabc8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Thu, 4 Jul 2024 16:57:16 +0200 Subject: [PATCH 1/2] restore queued request --- api/strategies.go | 134 ++++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 58 deletions(-) diff --git a/api/strategies.go b/api/strategies.go index c802147..3182784 100644 --- a/api/strategies.go +++ b/api/strategies.go @@ -43,7 +43,11 @@ func (capi *census3API) initStrategiesHandlers() error { return err } if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders", "GET", - api.MethodAccessTypePublic, capi.listStrategyHolders); err != nil { + api.MethodAccessTypePublic, capi.launchStrategyHolders); err != nil { + return err + } + if err := capi.endpoint.RegisterMethod("/strategies/{strategyID}/holders/queue/{queueID}", "GET", + api.MethodAccessTypePublic, capi.enqueueStrategyHolders); err != nil { return err } if err := capi.endpoint.RegisterMethod("/strategies/estimation", "POST", @@ -136,7 +140,6 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont } // parse and encode strategies for _, strategy := range rows { - skipMalformed := false strategyResponse := &Strategy{ ID: strategy.ID, Alias: strategy.Alias, @@ -146,19 +149,11 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont } strategyTokens, err := qtx.StrategyTokens(internalCtx, strategy.ID) if err != nil { - log.Warnw("error getting strategy tokens", "strategyID", strategy.ID) - skipMalformed = true - continue + return ErrCantGetStrategies.WithErr(err) } for _, strategyToken := range strategyTokens { if strategyToken.TokenAlias == "" { - log.Warnw("no token alias", - "strategyID", strategy.ID, - "tokenID", strategyToken.TokenID, - "chainID", strategyToken.ChainID, - "externalID", strategyToken.ExternalID) - skipMalformed = true - break + return ErrCantGetStrategies.With("invalid token symbol") } strategyResponse.Tokens[strategyToken.TokenAlias] = &StrategyToken{ ID: common.BytesToAddress(strategyToken.TokenID).String(), @@ -168,10 +163,6 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont ExternalID: strategyToken.ExternalID, } } - if skipMalformed { - log.Warnw("skipping malformed strategy", "strategyID", strategy.ID) - continue - } strategiesResponse.Strategies = append(strategiesResponse.Strategies, strategyResponse) } res, err := json.Marshal(strategiesResponse) @@ -563,9 +554,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex return ctx.Send(res, api.HTTPstatusOK) } -// listStrategyHolders function handler returns the list of the holders of the -// strategy ID provided. It returns a 400 error if the provided -func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error { +func (capi *census3API) launchStrategyHolders(_ *api.APIdata, ctx *httprouter.HTTPContext) error { // get provided strategyID iStrategyID, err := strconv.Atoi(ctx.URLParam("strategyID")) if err != nil { @@ -573,19 +562,10 @@ func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HT } strategyID := uint64(iStrategyID) // get token information from the database - internalCtx, cancel := context.WithTimeout(ctx.Request.Context(), getStrategyHoldersTimeout) + checkCtx, cancel := context.WithTimeout(ctx.Request.Context(), checkStrategyHoldersTimeout) defer cancel() - tx, err := capi.db.RO.BeginTx(internalCtx, nil) - if err != nil { - return ErrCantGetStrategyHolders.WithErr(err) - } - defer func() { - if err := tx.Rollback(); err != nil { - log.Errorw(err, "error rolling back tokens transaction") - } - }() - qtx := capi.db.QueriesRO.WithTx(tx) - strategy, err := qtx.StrategyByID(internalCtx, strategyID) + + strategy, err := capi.db.QueriesRO.StrategyByID(checkCtx, strategyID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return ErrNotFoundStrategy.WithErr(err) @@ -595,40 +575,78 @@ func (capi *census3API) listStrategyHolders(msg *api.APIdata, ctx *httprouter.HT if strategy.Predicate == "" { return ErrInvalidStrategyPredicate.With("empty predicate") } - strategyTokens, err := qtx.StrategyTokens(internalCtx, strategyID) - if err != nil { - return ErrCantGetStrategyHolders.WithErr(err) - } - strategyTokensBySymbol := map[string]*StrategyToken{} - for _, token := range strategyTokens { - strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{ - ID: common.BytesToAddress(token.TokenID).String(), - ChainID: token.ChainID, - ExternalID: token.ExternalID, - MinBalance: token.MinBalance, + // import the strategy from IPFS in background generating a queueID + queueID := capi.queue.Enqueue() + go func() { + bgCtx, cancel := context.WithTimeout(context.Background(), getStrategyHoldersTimeout) + defer cancel() + strategyTokens, err := capi.db.QueriesRO.StrategyTokens(bgCtx, strategyID) + if err != nil { + if ok := capi.queue.Fail(queueID, ErrCantGetStrategyHolders.WithErr(err)); !ok { + log.Errorf("error updating list strategy holders queue %s", queueID) + } + return } - } - strategyHolders, _, _, err := capi.CalculateStrategyHolders( - internalCtx, strategy.Predicate, strategyTokensBySymbol, nil) + strategyTokensBySymbol := map[string]*StrategyToken{} + for _, token := range strategyTokens { + strategyTokensBySymbol[token.TokenAlias] = &StrategyToken{ + ID: common.BytesToAddress(token.TokenID).String(), + ChainID: token.ChainID, + ExternalID: token.ExternalID, + MinBalance: token.MinBalance, + } + } + strategyHolders, _, _, err := capi.CalculateStrategyHolders( + bgCtx, strategy.Predicate, strategyTokensBySymbol, nil) + if err != nil { + if ok := capi.queue.Fail(queueID, ErrEvalStrategyPredicate.WithErr(err)); !ok { + log.Errorf("error updating list strategy holders queue %s", queueID) + } + return + } + if len(strategyHolders) == 0 { + if ok := capi.queue.Fail(queueID, ErrNoStrategyHolders); !ok { + log.Errorf("error updating list strategy holders queue %s", queueID) + } + return + } + holders := make(map[string]string) + // parse and encode holders + for addr, balance := range strategyHolders { + holders[addr.String()] = balance.String() + } + if ok := capi.queue.Done(queueID, holders); !ok { + log.Errorf("error updating list strategy holders queue %s", queueID) + } + }() + // encode and send the queueID + res, err := json.Marshal(QueueResponse{QueueID: queueID}) if err != nil { - return ErrEvalStrategyPredicate.WithErr(err) + return ErrEncodeQueueItem.WithErr(err) } - if len(strategyHolders) == 0 { - return ErrNoStrategyHolders + return ctx.Send(res, api.HTTPstatusOK) +} + +func (capi *census3API) enqueueStrategyHolders(msg *api.APIdata, ctx *httprouter.HTTPContext) error { + // parse queueID from url + queueID := ctx.URLParam("queueID") + if queueID == "" { + return ErrMalformedStrategyQueueID } - // init response struct with the no pagination information and empty list - // of holders - holdersResponse := GetStrategyHoldersResponse{ - Holders: make(map[string]string), + // try to get and check if the strategy is in the queue + queueItem, exists := capi.queue.IsDone(queueID) + if !exists { + return ErrNotFoundStrategy.Withf("the ID %s does not exist in the queue", queueID) } - // parse and encode holders - for addr, balance := range strategyHolders { - holdersResponse.Holders[addr.String()] = balance.String() + // check if it is not finished or some error occurred + if queueItem.Done && queueItem.Error == nil { + // remove the item from the queue and the censusID from the data + capi.queue.Dequeue(queueID) } - // encode and send the response - res, err := json.Marshal(holdersResponse) + // encode item response and send it + res, err := json.Marshal(queueItem) if err != nil { - return ErrEncodeTokenHolders.WithErr(err) + return ErrEncodeQueueItem.WithErr(err) } return ctx.Send(res, api.HTTPstatusOK) } From 91f43dd8bde5acf4e0f1e12b918bc91554d3d793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Wed, 17 Jul 2024 13:15:08 +0200 Subject: [PATCH 2/2] fixing error interface decoding from json --- apiclient/strategies.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/apiclient/strategies.go b/apiclient/strategies.go index 150eb06..ffd33e8 100644 --- a/apiclient/strategies.go +++ b/apiclient/strategies.go @@ -9,7 +9,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/vocdoni/census3/api" - "github.com/vocdoni/census3/helpers/queue" "go.vocdoni.io/dvote/log" ) @@ -157,24 +156,31 @@ func (c *HTTPclient) HoldersByStrategyQueue(strategyID uint64, queueID string) ( fmt.Errorf("%d %s", res.StatusCode, http.StatusText(res.StatusCode))) } // decode the queue response - item := &queue.QueueItem{} - if err := json.NewDecoder(res.Body).Decode(item); err != nil { + item := map[string]interface{}{} + if err := json.NewDecoder(res.Body).Decode(&item); err != nil { return nil, false, fmt.Errorf("%w: %w", ErrDecodingResponse, err) } // check if the item is done and if there is an error - if !item.Done { + if done, ok := item["done"].(bool); !ok || !done { return nil, false, nil } - if item.Error != nil { - return nil, true, item.Error + if strErr, ok := item["error"].(string); ok && strErr != "" { + return nil, true, fmt.Errorf("error in queue item: %s", strErr) } // convert the data to a map of addresses and amounts - rawHolders := item.Data.(map[string]string) + rawHolders, ok := item["data"].(map[string]interface{}) + if !ok { + return nil, true, fmt.Errorf("error getting data from queue item") + } holders := make(map[common.Address]*big.Int, len(rawHolders)) for k, v := range rawHolders { + strBalance, ok := v.(string) + if !ok { + continue + } addr := common.HexToAddress(k) amount := new(big.Int) - if _, ok := amount.SetString(v, 10); !ok { + if _, ok := amount.SetString(strBalance, 10); !ok { return nil, true, fmt.Errorf("error converting amount to big.Int") } holders[addr] = amount