From 99c399e230849d3e27015e4532b28faef53f48b0 Mon Sep 17 00:00:00 2001 From: Narangde <0xhanslee@gmail.com> Date: Wed, 18 Sep 2024 19:05:13 +0900 Subject: [PATCH] perf(evmstaking): improve processing queued msg --- client/x/evmstaking/keeper/abci.go | 15 ++---------- client/x/evmstaking/keeper/deposit_test.go | 7 +----- client/x/evmstaking/keeper/keeper_test.go | 7 +----- client/x/evmstaking/keeper/msg_queue.go | 24 +++++++++++-------- client/x/evmstaking/keeper/redelegate_test.go | 7 +----- client/x/evmstaking/keeper/validator_test.go | 7 +----- client/x/evmstaking/keeper/withdraw_test.go | 7 +----- 7 files changed, 21 insertions(+), 53 deletions(-) diff --git a/client/x/evmstaking/keeper/abci.go b/client/x/evmstaking/keeper/abci.go index 5e10d5a8..85287289 100644 --- a/client/x/evmstaking/keeper/abci.go +++ b/client/x/evmstaking/keeper/abci.go @@ -86,23 +86,12 @@ func (k *Keeper) EndBlock(ctx context.Context) (abci.ValidatorUpdates, error) { return nil, errors.Wrap(err, "check if the next epoch starts") } - //nolint:nestif // readability if isNextEpoch { - // get queued messages - queuedMsgs, err := k.DequeueAllMsgs(ctx) - if err != nil { + // process all queued messages + if err := k.ProcessAllMsgs(ctx); err != nil { return nil, errors.Wrap(err, "get current epoch queued messages") } - // execute queued message - for _, msg := range queuedMsgs { - if err := k.ProcessMsg(ctx, msg); err != nil { - // TODO(Narangde): do we need to handle failed message? - log.Error(ctx, "Error occurred while processing queued message", err) - continue - } - } - // if it is epoch based, increase epoch number if err := k.IncCurEpochNumber(ctx); err != nil { return nil, errors.Wrap(err, "increase current epoch number") diff --git a/client/x/evmstaking/keeper/deposit_test.go b/client/x/evmstaking/keeper/deposit_test.go index 12bf28bc..435f20ee 100644 --- a/client/x/evmstaking/keeper/deposit_test.go +++ b/client/x/evmstaking/keeper/deposit_test.go @@ -210,12 +210,7 @@ func (s *TestSuite) TestProcessDeposit() { var err error err = keeper.HandleDepositEvent(cachedCtx, tc.deposit) if !keeper.MessageQueue.IsEmpty(cachedCtx) { - var queuedMsgs []*types.QueuedMessage - queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx) - - for _, msg := range queuedMsgs { - err = keeper.ProcessMsg(cachedCtx, msg) - } + err = keeper.ProcessAllMsgs(cachedCtx) } if tc.expectedErr != "" { require.ErrorContains(err, tc.expectedErr) diff --git a/client/x/evmstaking/keeper/keeper_test.go b/client/x/evmstaking/keeper/keeper_test.go index fe6c9aaf..96d82818 100644 --- a/client/x/evmstaking/keeper/keeper_test.go +++ b/client/x/evmstaking/keeper/keeper_test.go @@ -642,12 +642,7 @@ func (s *TestSuite) TestProcessStakingEvents() { } err = evmstakingKeeper.ProcessStakingEvents(cachedCtx, 1, evmLogs) if !evmstakingKeeper.MessageQueue.IsEmpty(cachedCtx) { - var queuedMsgs []*types.QueuedMessage - queuedMsgs, err = evmstakingKeeper.DequeueAllMsgs(cachedCtx) - - for _, msg := range queuedMsgs { - err = evmstakingKeeper.ProcessMsg(cachedCtx, msg) - } + err = evmstakingKeeper.ProcessAllMsgs(cachedCtx) } if tc.expectedError != "" { require.Error(err) diff --git a/client/x/evmstaking/keeper/msg_queue.go b/client/x/evmstaking/keeper/msg_queue.go index 49b18bc7..cbbf9219 100644 --- a/client/x/evmstaking/keeper/msg_queue.go +++ b/client/x/evmstaking/keeper/msg_queue.go @@ -9,6 +9,7 @@ import ( "github.com/piplabs/story/client/x/evmstaking/types" "github.com/piplabs/story/lib/errors" + "github.com/piplabs/story/lib/log" ) // EnqueueMsg enqueues a message to the queue of the current epoch. @@ -16,27 +17,30 @@ func (k Keeper) EnqueueMsg(ctx context.Context, msg types.QueuedMessage) error { return k.MessageQueue.Enqueue(ctx, msg) } -// DequeueAllMsgs returns the set of messages queued in a given epoch. -func (k Keeper) DequeueAllMsgs(ctx context.Context) ([]*types.QueuedMessage, error) { +// ProcessAllMsgs returns the set of messages queued in a given epoch. +func (k Keeper) ProcessAllMsgs(ctx context.Context) error { iterator, err := k.MessageQueue.Iterate(ctx) if err != nil { - return nil, err + return errors.Wrap(err, "message queue iterator") } - var queuedMsgs []*types.QueuedMessage for ; iterator.Valid(); iterator.Next() { - queuedMsg, err := k.MessageQueue.Dequeue(ctx) + qMsg, err := iterator.Value() if err != nil { - return nil, err + return errors.Wrap(err, "get value of message queue") + } + + if err := k.processMsg(ctx, &qMsg); err != nil { + log.Warn(ctx, "Failed to process queued message", err, "tx_id", string(qMsg.TxId)) + return errors.Wrap(err, "process queued message") } - queuedMsgs = append(queuedMsgs, &queuedMsg) } - return queuedMsgs, nil + return nil } -// ProcessMsg processes queues message depending on the type of message. -func (k Keeper) ProcessMsg(ctx context.Context, msg *types.QueuedMessage) error { +// processMsg processes queues message depending on the type of message. +func (k Keeper) processMsg(ctx context.Context, msg *types.QueuedMessage) error { var ( unwrappedMsgWithType sdk.Msg err error diff --git a/client/x/evmstaking/keeper/redelegate_test.go b/client/x/evmstaking/keeper/redelegate_test.go index 5f40d6ae..aecd82f6 100644 --- a/client/x/evmstaking/keeper/redelegate_test.go +++ b/client/x/evmstaking/keeper/redelegate_test.go @@ -185,12 +185,7 @@ func (s *TestSuite) TestRedelegation() { var err error err = keeper.HandleRedelegateEvent(cachedCtx, &input) if !keeper.MessageQueue.IsEmpty(cachedCtx) { - var queuedMsgs []*types.QueuedMessage - queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx) - - for _, msg := range queuedMsgs { - err = keeper.ProcessMsg(cachedCtx, msg) - } + err = keeper.ProcessAllMsgs(cachedCtx) } if tc.expectedError != "" { require.ErrorContains(err, tc.expectedError) diff --git a/client/x/evmstaking/keeper/validator_test.go b/client/x/evmstaking/keeper/validator_test.go index 52424367..450d764c 100644 --- a/client/x/evmstaking/keeper/validator_test.go +++ b/client/x/evmstaking/keeper/validator_test.go @@ -223,12 +223,7 @@ func (s *TestSuite) TestProcessCreateValidator() { Raw: gethtypes.Log{}, }) if !keeper.MessageQueue.IsEmpty(cachedCtx) { - var queuedMsgs []*types.QueuedMessage - queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx) - - for _, msg := range queuedMsgs { - err = keeper.ProcessMsg(cachedCtx, msg) - } + err = keeper.ProcessAllMsgs(cachedCtx) } if tc.expectedError != "" { require.Error(err, tc.expectedError) diff --git a/client/x/evmstaking/keeper/withdraw_test.go b/client/x/evmstaking/keeper/withdraw_test.go index 4ad11901..153146aa 100644 --- a/client/x/evmstaking/keeper/withdraw_test.go +++ b/client/x/evmstaking/keeper/withdraw_test.go @@ -399,12 +399,7 @@ func (s *TestSuite) TestProcessWithdraw() { err = keeper.HandleWithdrawEvent(cachedCtx, tc.withdraw) if !keeper.MessageQueue.IsEmpty(cachedCtx) { - var queuedMsgs []*types.QueuedMessage - queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx) - - for _, msg := range queuedMsgs { - err = keeper.ProcessMsg(cachedCtx, msg) - } + err = keeper.ProcessAllMsgs(cachedCtx) } if tc.expectedErr != "" { require.ErrorContains(err, tc.expectedErr)