Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streamer): added streamer pagination #1100

Merged
merged 15 commits into from
Sep 10, 2024
1 change: 1 addition & 0 deletions app/keepers/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func (a *AppKeepers) InitKeepers(
)

a.StreamerKeeper = *streamermodulekeeper.NewKeeper(
appCodec,
a.keys[streamermoduletypes.StoreKey],
a.GetSubspace(streamermoduletypes.ModuleName),
a.BankKeeper,
Expand Down
2 changes: 1 addition & 1 deletion app/keepers/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,12 @@ var InitGenesis = []string{
rollappmoduletypes.ModuleName,
sequencertypes.ModuleName,
sponsorshiptypes.ModuleName,
streamermoduletypes.ModuleName,
denommetadatamoduletypes.ModuleName, // must after `x/bank` to trigger hooks
delayedacktypes.ModuleName,
eibcmoduletypes.ModuleName,
dymnstypes.ModuleName,
epochstypes.ModuleName,
streamermoduletypes.ModuleName, // must be after x/epochs to fill epoch pointers
lockuptypes.ModuleName,
gammtypes.ModuleName,
poolmanagertypes.ModuleName,
Expand Down
17 changes: 17 additions & 0 deletions app/upgrades/v4/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v4
import (
"github.com/cometbft/cometbft/crypto"
"github.com/cosmos/cosmos-sdk/baseapp"
epochskeeper "github.com/osmosis-labs/osmosis/v15/x/epochs/keeper"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
Expand Down Expand Up @@ -33,6 +34,8 @@ import (
rollapptypes "github.com/dymensionxyz/dymension/v3/x/rollapp/types"
sequencerkeeper "github.com/dymensionxyz/dymension/v3/x/sequencer/keeper"
sequencertypes "github.com/dymensionxyz/dymension/v3/x/sequencer/types"
streamerkeeper "github.com/dymensionxyz/dymension/v3/x/streamer/keeper"
streamertypes "github.com/dymensionxyz/dymension/v3/x/streamer/types"
)

// CreateUpgradeHandler creates an SDK upgrade handler for v4
Expand All @@ -56,6 +59,9 @@ func CreateUpgradeHandler(

migrateSequencers(ctx, keepers.SequencerKeeper)
migrateRollappLightClients(ctx, keepers.RollappKeeper, keepers.LightClientKeeper, keepers.IBCKeeper.ChannelKeeper)
if err := migrateStreamer(ctx, keepers.StreamerKeeper, keepers.EpochsKeeper); err != nil {
return nil, err
}

// TODO: create rollapp gauges for each existing rollapp (https://github.com/dymensionxyz/dymension/issues/1005)

Expand Down Expand Up @@ -173,6 +179,17 @@ func migrateRollappLightClients(ctx sdk.Context, rollappkeeper *rollappkeeper.Ke
}
}

// migrateStreamer creates epoch pointers for all epoch infos.
func migrateStreamer(ctx sdk.Context, sk streamerkeeper.Keeper, ek *epochskeeper.Keeper) error {
for _, epoch := range ek.AllEpochInfos(ctx) {
err := sk.SaveEpochPointer(ctx, streamertypes.NewEpochPointer(epoch.Identifier, epoch.Duration))
if err != nil {
return err
}
}
return nil
}

func ConvertOldRollappToNew(oldRollapp rollapptypes.Rollapp) rollapptypes.Rollapp {
return rollapptypes.Rollapp{
RollappId: oldRollapp.RollappId,
Expand Down
18 changes: 18 additions & 0 deletions app/upgrades/v4/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/dymensionxyz/dymension/v3/testutil/sample"
rollapptypes "github.com/dymensionxyz/dymension/v3/x/rollapp/types"
sequencertypes "github.com/dymensionxyz/dymension/v3/x/sequencer/types"
streamertypes "github.com/dymensionxyz/dymension/v3/x/streamer/types"
)

// UpgradeTestSuite defines the structure for the upgrade test suite
Expand Down Expand Up @@ -120,6 +121,8 @@ func (s *UpgradeTestSuite) TestUpgrade() {
return
}

s.validateStreamerMigration()

// TODO: check for rollapp gauges creation

return
Expand Down Expand Up @@ -228,6 +231,21 @@ func (s *UpgradeTestSuite) validateSequencersMigration(numSeq int) error {
return nil
}

func (s *UpgradeTestSuite) validateStreamerMigration() {
epochInfos := s.App.EpochsKeeper.AllEpochInfos(s.Ctx)

pointers, err := s.App.StreamerKeeper.GetAllEpochPointers(s.Ctx)
s.Require().NoError(err)

var expected []streamertypes.EpochPointer
for _, info := range epochInfos {
expected = append(expected, streamertypes.NewEpochPointer(info.Identifier, info.Duration))
}

// Equal also checks the order of pointers
s.Require().Equal(expected, pointers)
}

func (s *UpgradeTestSuite) seedAndStoreRollapps(numRollapps int) {
for _, rollapp := range s.seedRollapps(numRollapps) {
s.App.RollappKeeper.SetRollapp(s.Ctx, rollapp)
Expand Down
31 changes: 31 additions & 0 deletions proto/dymensionxyz/dymension/streamer/events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";
package dymensionxyz.dymension.streamer;

import "gogoproto/gogo.proto";
import "cosmos/base/v1beta1/coin.proto";
import "dymensionxyz/dymension/streamer/streamer.proto";

option go_package = "github.com/dymensionxyz/dymension/v3/x/streamer/types";

message EventEndBlock {
uint64 iterations = 1;
uint64 max_iterations = 2;
// Distributed is the total amount of coins that have been distributed
repeated cosmos.base.v1beta1.Coin distributed = 3 [
(gogoproto.nullable) = false,
(gogoproto.castrepeated) = "github.com/cosmos/cosmos-sdk/types.Coins"
];
}

message EventEpochEnd {
uint64 iterations = 1;
// Distributed is the total amount of coins that have been distributed
repeated cosmos.base.v1beta1.Coin distributed = 2 [
(gogoproto.nullable) = false,
(gogoproto.castrepeated) = "github.com/cosmos/cosmos-sdk/types.Coins"
];
}

message EventEpochStart {
uint64 active_streams_num = 1;
}
3 changes: 3 additions & 0 deletions proto/dymensionxyz/dymension/streamer/genesis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";
import "dymensionxyz/dymension/streamer/params.proto";
import "dymensionxyz/dymension/streamer/stream.proto";
import "dymensionxyz/dymension/streamer/streamer.proto";

option go_package = "github.com/dymensionxyz/dymension/v3/x/streamer/types";

Expand All @@ -19,4 +20,6 @@ message GenesisState {
// last_stream_id is what the stream number will increment from when creating
// the next stream after genesis
uint64 last_stream_id = 3;
// EpochPointers are pointers to the last rewarded gauges
repeated EpochPointer epoch_pointers = 4 [ (gogoproto.nullable) = false ];
}
3 changes: 3 additions & 0 deletions proto/dymensionxyz/dymension/streamer/params.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ option go_package = "github.com/dymensionxyz/dymension/v3/x/streamer/types";

// Params holds parameters for the streamer module
message Params {
// MaxIterationPerBlock defines the maximum number of gauges that could be processed in a single block.
// This param is used during the pagination process.
uint64 max_iterations_per_block = 1;
}
6 changes: 6 additions & 0 deletions proto/dymensionxyz/dymension/streamer/stream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ message Stream {

// Sponsored indicates if the stream is based on the sponsorship distribution.
bool sponsored = 9;

// EpochCoins are coins that need to be distributed in this epoch.
repeated cosmos.base.v1beta1.Coin epoch_coins = 10 [
(gogoproto.nullable) = false,
(gogoproto.castrepeated) = "github.com/cosmos/cosmos-sdk/types.Coins"
];
}
25 changes: 25 additions & 0 deletions proto/dymensionxyz/dymension/streamer/streamer.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";
package dymensionxyz.dymension.streamer;

import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";

option go_package = "github.com/dymensionxyz/dymension/v3/x/streamer/types";

// EpochPointer is a special object used for the streamer pagination. It helps iterate over
// streams with the specified epoch identifier within one epoch. Additionally, holds coins
// that must be distributed in this epoch.
message EpochPointer {
// StreamID is the ID of a stream.
uint64 stream_id = 1;
// GaugeID is the ID of a gauge.
uint64 gauge_id = 2;
// EpochIdentifier is a unique reference to this particular timer.
string epoch_identifier = 3;
// EpochDuration is the time in between epoch ticks. It is stored in order to have
// an ability to sort the EpochPointer slice.
google.protobuf.Duration epoch_duration = 4 [
(gogoproto.nullable) = false,
(gogoproto.stdduration) = true
];
}
1 change: 1 addition & 0 deletions testutil/keeper/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func StreamerKeeper(t testing.TB) (*keeper.Keeper, sdk.Context) {
"StreamerParams",
)
k := keeper.NewKeeper(
cdc,
storeKey,
paramsSubspace,
nil,
Expand Down
29 changes: 29 additions & 0 deletions utils/pagination/paginate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pagination

type Iterator[T any] interface {
Next()
Value() T
Valid() bool
}

type Stop bool

const (
Break Stop = true
Continue Stop = false
)

// Paginate is a function that paginates over an iterator. The callback is executed for each iteration and if it
// returns true, the pagination stops. The function returns the amount of iterations before stopping.
func Paginate[T any](iter Iterator[T], perPage uint64, cb func(T) Stop) uint64 {
iterations := uint64(0)
for ; iterations < perPage && iter.Valid(); iter.Next() {
iterations++

stop := cb(iter.Value())
if stop {
break
}
}
return iterations
}
83 changes: 83 additions & 0 deletions utils/pagination/paginate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package pagination_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymension/v3/utils/pagination"
)

type testIterator struct {
data []int
index int
}

func newTestIterator(data []int) *testIterator {
return &testIterator{data: data, index: 0}
}

func (t *testIterator) Next() {
t.index++
}

func (t *testIterator) Value() int {
return t.data[t.index]
}

func (t *testIterator) Valid() bool {
return t.index < len(t.data)
}

func TestPaginate(t *testing.T) {
testCases := []struct {
name string
iterator pagination.Iterator[int]
perPage uint64
stopValue int
expected uint64
}{
{
name: "Empty iterator",
iterator: newTestIterator([]int{}),
perPage: 5,
stopValue: -1,
expected: 0,
},
{
name: "Non-Empty iterator less than perPage",
iterator: newTestIterator([]int{1, 2, 3}),
perPage: 10,
stopValue: -1,
expected: 3,
},
{
name: "Non-empty iterator greater than perPage",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 5,
stopValue: -1,
expected: 5,
},
{
name: "Zero perPage",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 0,
stopValue: 6,
expected: 0,
},
{
name: "Non-Empty iterator with stop condition",
iterator: newTestIterator([]int{1, 2, 3, 4, 5, 6, 7}),
perPage: 10,
stopValue: 3,
expected: 3,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := pagination.Paginate(tc.iterator, tc.perPage, func(i int) pagination.Stop { return i == tc.stopValue })
require.Equal(t, tc.expected, result)
})
}
}
64 changes: 64 additions & 0 deletions x/streamer/keeper/abci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package keeper

import (
"fmt"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dymensionxyz/sdk-utils/utils/uevent"

"github.com/dymensionxyz/dymension/v3/x/streamer/types"
)

func (k Keeper) EndBlock(ctx sdk.Context) error {
streams := k.GetActiveStreams(ctx)
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed

epochPointers, err := k.GetAllEpochPointers(ctx)
if err != nil {
return fmt.Errorf("get all epoch pointers: %w", err)
}

// Sort epoch pointers to distribute to shorter epochs first
types.SortEpochPointers(epochPointers)

maxIterations := k.GetParams(ctx).MaxIterationsPerBlock
totalIterations := uint64(0)
totalDistributed := sdk.NewCoins()

for _, p := range epochPointers {
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
remainIterations := maxIterations - totalIterations

if remainIterations <= 0 {
break // no more iterations available for this block
}

result := k.DistributeRewards(ctx, p, remainIterations, streams)

totalIterations += result.Iterations
totalDistributed = totalDistributed.Add(result.DistributedCoins...)
streams = result.FilledStreams

err = k.SaveEpochPointer(ctx, result.NewPointer)
if err != nil {
return fmt.Errorf("save epoch pointer: %w", err)
}
}

// Save stream updates
for _, stream := range streams {
err = k.SetStream(ctx, &stream)
if err != nil {
return fmt.Errorf("set stream: %w", err)
}
}

err = uevent.EmitTypedEvent(ctx, &types.EventEndBlock{
Iterations: totalIterations,
MaxIterations: maxIterations,
Distributed: totalDistributed,
})
if err != nil {
return fmt.Errorf("emit typed event: %w", err)
}

return nil
}
Loading
Loading