Skip to content

Commit

Permalink
threads
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch committed Sep 9, 2024
1 parent 2b4b107 commit 0e287bf
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 46 deletions.
55 changes: 10 additions & 45 deletions x/streamer/keeper/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/osmosis-labs/osmosis/v15/osmoutils"

"github.com/dymensionxyz/dymension/v3/utils/pagination"
"github.com/dymensionxyz/dymension/v3/x/streamer/types"
Expand Down Expand Up @@ -53,48 +54,6 @@ func (k Keeper) DistributeToGauge(ctx sdk.Context, coins sdk.Coins, record types
return totalAllocated, nil
}

// UpdateStreamAtEpochStart updates the stream for a new epoch. Streams distribute rewards post factum.
// Meaning, first increase the filled epoch pointer, then distribute rewards for this epoch.
func (k Keeper) UpdateStreamAtEpochStart(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
// Check if stream has completed its distribution. This is a post factum check.
if stream.FilledEpochs >= stream.NumEpochsPaidOver {
err := k.moveActiveStreamToFinishedStream(ctx, stream)
if err != nil {
return types.Stream{}, fmt.Errorf("move active stream to finished stream: %w", err)
}
return stream, nil
}

// If the stream is not finalized, update it for the next distribution

remainCoins := stream.Coins.Sub(stream.DistributedCoins...)
remainEpochs := stream.NumEpochsPaidOver - stream.FilledEpochs
epochCoins := remainCoins.QuoInt(math.NewIntFromUint64(remainEpochs))

// If the stream uses a sponsorship plan, query it and update stream distr info. The distribution
// might be empty and this is a valid scenario. In that case, we'll just skip at without
// filling the epoch.
if stream.Sponsored {
distr, err := k.sk.GetDistribution(ctx)
if err != nil {
return types.Stream{}, fmt.Errorf("get sponsorship distribution: %w", err)
}
// Update stream distr info
stream.DistributeTo = types.DistrInfoFromDistribution(distr)
}

// Add coins to distribute during the next epoch
stream.EpochCoins = epochCoins

// Don't fill streams in which there's nothing to fill. Note that rewards are distributed post factum.
// I.e., first increase the filled epoch number, then distribute rewards during the epoch.
if !stream.DistributeTo.TotalWeight.IsZero() {
stream.FilledEpochs += 1
}

return stream, nil
}

type DistributeRewardsResult struct {
NewPointer types.EpochPointer
FilledStreams []types.Stream
Expand All @@ -117,12 +76,18 @@ func (k Keeper) DistributeRewards(

// Distribute to all the remaining gauges that are left after EndBlock
newPointer, iterations := IterateEpochPointer(pointer, streams, limit, func(v StreamGauge) pagination.Stop {
distributed, errX := k.DistributeToGauge(ctx, v.Stream.EpochCoins, v.Gauge, v.Stream.DistributeTo.TotalWeight)
if errX != nil {
var distributed sdk.Coins
err := osmoutils.ApplyFuncIfNoError(ctx, func(ctx sdk.Context) error {
var err error
distributed, err = k.DistributeToGauge(ctx, v.Stream.EpochCoins, v.Gauge, v.Stream.DistributeTo.TotalWeight)
return err
})
if err != nil {
// Ignore this gauge
k.Logger(ctx).
With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", errX.Error()).
With("streamID", v.Stream.Id, "gaugeID", v.Gauge.GaugeId, "error", err.Error()).
Error("Failed to distribute to gauge")
return pagination.Continue
}

totalDistributed = totalDistributed.Add(distributed...)
Expand Down
6 changes: 5 additions & 1 deletion x/streamer/keeper/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ func (k Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string) (sdk.Coin

// Update streams with respect to a new epoch and save them
for _, s := range distrResult.FilledStreams {
updated, err := k.UpdateStreamAtEpochEnd(ctx, s)
if err != nil {
return sdk.Coins{}, fmt.Errorf("update stream '%d' at epoch start: %w", s.Id, err)
}
// Save the stream
err = k.SetStream(ctx, &s)
err = k.SetStream(ctx, &updated)
if err != nil {
return sdk.Coins{}, fmt.Errorf("set stream: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions x/streamer/keeper/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,56 @@ import (
"fmt"
"sort"

"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"

"github.com/dymensionxyz/dymension/v3/x/streamer/types"
)

// UpdateStreamAtEpochStart updates the stream for a new epoch: estimates coins that streamer will
// distribute during this epoch and updates a sponsored distribution if needed.
func (k Keeper) UpdateStreamAtEpochStart(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
remainCoins := stream.Coins.Sub(stream.DistributedCoins...)
remainEpochs := stream.NumEpochsPaidOver - stream.FilledEpochs
epochCoins := remainCoins.QuoInt(math.NewIntFromUint64(remainEpochs))

// If the stream uses a sponsorship plan, query it and update stream distr info. The distribution
// might be empty and this is a valid scenario. In that case, we'll just skip without filling the epoch.
if stream.Sponsored {
distr, err := k.sk.GetDistribution(ctx)
if err != nil {
return types.Stream{}, fmt.Errorf("get sponsorship distribution: %w", err)
}
// Update stream distr info
stream.DistributeTo = types.DistrInfoFromDistribution(distr)
}

// Add coins to distribute during the next epoch
stream.EpochCoins = epochCoins

return stream, nil
}

// UpdateStreamAtEpochEnd updates the stream at the end of the epoch: increases the filled epoch number
// and makes the stream finished if needed.
func (k Keeper) UpdateStreamAtEpochEnd(ctx sdk.Context, stream types.Stream) (types.Stream, error) {
// Don't fill streams in which there's nothing to fill. This might happen when using sponsored streams.
if !stream.DistributeTo.TotalWeight.IsZero() {
stream.FilledEpochs += 1
}

// Check if stream has completed its distribution. This is a post factum check.
if stream.FilledEpochs >= stream.NumEpochsPaidOver {
err := k.moveActiveStreamToFinishedStream(ctx, stream)
if err != nil {
return types.Stream{}, fmt.Errorf("move active stream to finished stream: %w", err)
}
}

return stream, nil
}

// GetStreamByID returns stream from stream ID.
func (k Keeper) GetStreamByID(ctx sdk.Context, streamID uint64) (*types.Stream, error) {
stream := types.Stream{}
Expand Down

0 comments on commit 0e287bf

Please sign in to comment.