Skip to content

Commit

Permalink
perf(evmstaking): improve processing queued msg
Browse files Browse the repository at this point in the history
  • Loading branch information
Narangde committed Sep 18, 2024
1 parent ea58321 commit 99c399e
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 53 deletions.
15 changes: 2 additions & 13 deletions client/x/evmstaking/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,12 @@ func (k *Keeper) EndBlock(ctx context.Context) (abci.ValidatorUpdates, error) {
return nil, errors.Wrap(err, "check if the next epoch starts")
}

//nolint:nestif // readability
if isNextEpoch {
// get queued messages
queuedMsgs, err := k.DequeueAllMsgs(ctx)
if err != nil {
// process all queued messages
if err := k.ProcessAllMsgs(ctx); err != nil {
return nil, errors.Wrap(err, "get current epoch queued messages")
}

// execute queued message
for _, msg := range queuedMsgs {
if err := k.ProcessMsg(ctx, msg); err != nil {
// TODO(Narangde): do we need to handle failed message?
log.Error(ctx, "Error occurred while processing queued message", err)
continue
}
}

// if it is epoch based, increase epoch number
if err := k.IncCurEpochNumber(ctx); err != nil {
return nil, errors.Wrap(err, "increase current epoch number")
Expand Down
7 changes: 1 addition & 6 deletions client/x/evmstaking/keeper/deposit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,7 @@ func (s *TestSuite) TestProcessDeposit() {
var err error
err = keeper.HandleDepositEvent(cachedCtx, tc.deposit)
if !keeper.MessageQueue.IsEmpty(cachedCtx) {
var queuedMsgs []*types.QueuedMessage
queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx)

for _, msg := range queuedMsgs {
err = keeper.ProcessMsg(cachedCtx, msg)
}
err = keeper.ProcessAllMsgs(cachedCtx)
}
if tc.expectedErr != "" {
require.ErrorContains(err, tc.expectedErr)
Expand Down
7 changes: 1 addition & 6 deletions client/x/evmstaking/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,12 +642,7 @@ func (s *TestSuite) TestProcessStakingEvents() {
}
err = evmstakingKeeper.ProcessStakingEvents(cachedCtx, 1, evmLogs)
if !evmstakingKeeper.MessageQueue.IsEmpty(cachedCtx) {
var queuedMsgs []*types.QueuedMessage
queuedMsgs, err = evmstakingKeeper.DequeueAllMsgs(cachedCtx)

for _, msg := range queuedMsgs {
err = evmstakingKeeper.ProcessMsg(cachedCtx, msg)
}
err = evmstakingKeeper.ProcessAllMsgs(cachedCtx)
}
if tc.expectedError != "" {
require.Error(err)
Expand Down
24 changes: 14 additions & 10 deletions client/x/evmstaking/keeper/msg_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,38 @@ import (

"github.com/piplabs/story/client/x/evmstaking/types"
"github.com/piplabs/story/lib/errors"
"github.com/piplabs/story/lib/log"
)

// EnqueueMsg enqueues a message to the queue of the current epoch.
func (k Keeper) EnqueueMsg(ctx context.Context, msg types.QueuedMessage) error {
return k.MessageQueue.Enqueue(ctx, msg)
}

// DequeueAllMsgs returns the set of messages queued in a given epoch.
func (k Keeper) DequeueAllMsgs(ctx context.Context) ([]*types.QueuedMessage, error) {
// ProcessAllMsgs returns the set of messages queued in a given epoch.
func (k Keeper) ProcessAllMsgs(ctx context.Context) error {
iterator, err := k.MessageQueue.Iterate(ctx)
if err != nil {
return nil, err
return errors.Wrap(err, "message queue iterator")
}

var queuedMsgs []*types.QueuedMessage
for ; iterator.Valid(); iterator.Next() {
queuedMsg, err := k.MessageQueue.Dequeue(ctx)
qMsg, err := iterator.Value()
if err != nil {
return nil, err
return errors.Wrap(err, "get value of message queue")
}

if err := k.processMsg(ctx, &qMsg); err != nil {
log.Warn(ctx, "Failed to process queued message", err, "tx_id", string(qMsg.TxId))
return errors.Wrap(err, "process queued message")
}
queuedMsgs = append(queuedMsgs, &queuedMsg)
}

return queuedMsgs, nil
return nil
}

// ProcessMsg processes queues message depending on the type of message.
func (k Keeper) ProcessMsg(ctx context.Context, msg *types.QueuedMessage) error {
// processMsg processes queues message depending on the type of message.
func (k Keeper) processMsg(ctx context.Context, msg *types.QueuedMessage) error {
var (
unwrappedMsgWithType sdk.Msg
err error
Expand Down
7 changes: 1 addition & 6 deletions client/x/evmstaking/keeper/redelegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ func (s *TestSuite) TestRedelegation() {
var err error
err = keeper.HandleRedelegateEvent(cachedCtx, &input)
if !keeper.MessageQueue.IsEmpty(cachedCtx) {
var queuedMsgs []*types.QueuedMessage
queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx)

for _, msg := range queuedMsgs {
err = keeper.ProcessMsg(cachedCtx, msg)
}
err = keeper.ProcessAllMsgs(cachedCtx)
}
if tc.expectedError != "" {
require.ErrorContains(err, tc.expectedError)
Expand Down
7 changes: 1 addition & 6 deletions client/x/evmstaking/keeper/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,7 @@ func (s *TestSuite) TestProcessCreateValidator() {
Raw: gethtypes.Log{},
})
if !keeper.MessageQueue.IsEmpty(cachedCtx) {
var queuedMsgs []*types.QueuedMessage
queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx)

for _, msg := range queuedMsgs {
err = keeper.ProcessMsg(cachedCtx, msg)
}
err = keeper.ProcessAllMsgs(cachedCtx)
}
if tc.expectedError != "" {
require.Error(err, tc.expectedError)
Expand Down
7 changes: 1 addition & 6 deletions client/x/evmstaking/keeper/withdraw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,7 @@ func (s *TestSuite) TestProcessWithdraw() {

err = keeper.HandleWithdrawEvent(cachedCtx, tc.withdraw)
if !keeper.MessageQueue.IsEmpty(cachedCtx) {
var queuedMsgs []*types.QueuedMessage
queuedMsgs, err = keeper.DequeueAllMsgs(cachedCtx)

for _, msg := range queuedMsgs {
err = keeper.ProcessMsg(cachedCtx, msg)
}
err = keeper.ProcessAllMsgs(cachedCtx)
}
if tc.expectedErr != "" {
require.ErrorContains(err, tc.expectedErr)
Expand Down

0 comments on commit 99c399e

Please sign in to comment.