Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/test(ingest): more common ingester abstractions for pushing pool data #8475

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cosmos/ibc-go/v8/modules/apps/transfer"
ibc "github.com/cosmos/ibc-go/v8/modules/core"

"github.com/osmosis-labs/osmosis/v25/ingest/common/pooltracker"
"github.com/osmosis-labs/osmosis/v25/ingest/common/writelistener"
"github.com/osmosis-labs/osmosis/v25/ingest/indexer"
indexerdomain "github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
Expand Down Expand Up @@ -312,7 +313,7 @@ func NewOsmosisApp(

// Initialize the SQS ingester if it is enabled.
if sqsConfig.IsEnabled {
sqsKeepers := commondomain.PoolExtracterKeepers{
sqsKeepers := commondomain.PoolExtractorKeepers{
GammKeeper: app.GAMMKeeper,
CosmWasmPoolKeeper: app.CosmwasmPoolKeeper,
WasmKeeper: app.WasmKeeper,
Expand All @@ -330,7 +331,7 @@ func NewOsmosisApp(

// Create pool tracker that tracks pool updates
// made by the write listenetrs.
poolTracker := sqsservice.NewPoolTracker()
poolTracker := pooltracker.NewMemory()

// Create write listeners for the SQS service.
writeListeners, storeKeyMap := getSQSServiceWriteListeners(app, appCodec, poolTracker, app.WasmKeeper)
Expand Down
46 changes: 46 additions & 0 deletions ingest/common/domain/block_process_strategy_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package commondomain

// BlockProcessStrategyManager is an interface for managing the strategy of pushing the blocks.
// Either all block data or only the block update are the possible options
// It is initialized with the strategy of pushing all data.
// If it observes an error, it will switch to pushing all data.
// If it ingested initial data and observed no error, it will switch to pushing only changed data.
type BlockProcessStrategyManager interface {
// ShouldPushAllData returns true if all data should be pushed.
ShouldPushAllData() bool

// MarkInitialDataIngested marks the initial data as ingested.
// After calling this function, ShouldPushAllData should return false.
MarkInitialDataIngested()

// MarkErrorObserved marks that an error has been observed.
MarkErrorObserved()
}

type blockProcessStrategyManager struct {
shouldPushAllData bool
}

var _ BlockProcessStrategyManager = &blockProcessStrategyManager{}

// NewBlockProcessStrategyManager creates a new push strategy manager.
func NewBlockProcessStrategyManager() BlockProcessStrategyManager {
return &blockProcessStrategyManager{
shouldPushAllData: true,
}
}

// ShouldPushAllData returns true if all data should be pushed.
func (c *blockProcessStrategyManager) ShouldPushAllData() bool {
return c.shouldPushAllData
}

// MarkInitialDataIngested marks the initial data as ingested.
func (c *blockProcessStrategyManager) MarkInitialDataIngested() {
c.shouldPushAllData = false
}

// MarkErrorObserved marks that an error has been observed.
func (c *blockProcessStrategyManager) MarkErrorObserved() {
c.shouldPushAllData = true
}
50 changes: 50 additions & 0 deletions ingest/common/domain/block_process_strategy_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package commondomain_test

import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/osmosis-labs/osmosis/v25/app/apptesting"
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
)

type CommonDomainTestSuite struct {
apptesting.ConcentratedKeeperTestHelper
}

func TestCommonDomainTestSuite(t *testing.T) {
suite.Run(t, new(CommonDomainTestSuite))
}

// Validates the invariant that the block process strategy manager.
// When initialized, should push all data.
// If the block process strategy manager has observed an error, it should push all data.
// If the block process strategy manager has not observed an error after pushing all data, it should push only changed data.
func (s *CommonDomainTestSuite) TestBlockProcessStrategyManager() {

blockStrategyManager := commondomain.NewBlockProcessStrategyManager()

// ShouldPushAllData should return true when initialized
s.Require().True(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// ShouldPushAllData should return false after MarkInitialDataIngested
s.Require().False(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkErrorObserved()

// ShouldPushAllData should return true after MarkErrorObserved
s.Require().True(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// ShouldPushAllData should return false after MarkInitialDataIngested again
s.Require().False(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// Unchanged after MarkInitialDataIngested twice
s.Require().False(blockStrategyManager.ShouldPushAllData())
}
2 changes: 1 addition & 1 deletion ingest/common/domain/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// Chain keepers required for extracting pool data.
type PoolExtracterKeepers struct {
type PoolExtractorKeepers struct {
GammKeeper PoolKeeper
CosmWasmPoolKeeper CosmWasmPoolKeeper
WasmKeeper WasmKeeper
Expand Down
40 changes: 40 additions & 0 deletions ingest/common/domain/mocks/pools_extracter_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mocks

import (
"github.com/cosmos/cosmos-sdk/types"

commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
)

type PoolsExtractorMock struct {
// AllBlockDataError is the error to return when ProcessAllBlockData is called.
AllBlockDataError error
// ChangedBlockDataError is the error to return when ProcessChangedBlockData is called.
ChangedBlockDataError error
// IsProcessAllBlockDataCalled is a flag indicating if ProcessAllBlockData was called.
IsProcessAllBlockDataCalled bool
// IsProcessAllChangedDataCalled is a flag indicating if ProcessChangedBlockData was called.
IsProcessAllChangedDataCalled bool
// If this is non-empty, ProcessAllBlockData(...) will panic with this message.
ProcessAllBlockDataPanicMsg string
// Block pools to return
BlockPools commondomain.BlockPools
}

var _ commondomain.PoolExtractor = &PoolsExtractorMock{}

// ExtractAll implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractAll(ctx types.Context) (commondomain.BlockPools, error) {
if p.ProcessAllBlockDataPanicMsg != "" {
panic(p.ProcessAllBlockDataPanicMsg)
}

p.IsProcessAllBlockDataCalled = true
return p.BlockPools, p.AllBlockDataError
}

// ExtractChanged implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractChanged(ctx types.Context) (commondomain.BlockPools, error) {
p.IsProcessAllChangedDataCalled = true
return p.BlockPools, p.ChangedBlockDataError
}
15 changes: 15 additions & 0 deletions ingest/common/domain/pools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package commondomain

import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

// PoolExtractor defines the interface for extracting pools.
type PoolExtractor interface {
// ExtractAll extracts all the pools available within the height associated
// with the context.
ExtractAll(ctx sdk.Context) (BlockPools, error)
// ExtractChanged extracts the pools that were changed in the block height associated
// with the context.
ExtractChanged(ctx sdk.Context) (BlockPools, error)
}
13 changes: 13 additions & 0 deletions ingest/common/domain/process_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package commondomain

import sdk "github.com/cosmos/cosmos-sdk/types"

// BlockProcessor is an interface for processing a block.
type BlockProcessor interface {
// ProcessBlock processes a block.
// It returns an error if the block processing fails.
ProcessBlock(ctx sdk.Context) error

// IsFullBlockProcessor returns true if the block processor is a full block processor.
IsFullBlockProcessor() bool
}
82 changes: 82 additions & 0 deletions ingest/common/poolextractor/pool_exractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package poolextractor_test

import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/osmosis-labs/osmosis/v25/app/apptesting"
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/common/poolextractor"
"github.com/osmosis-labs/osmosis/v25/ingest/common/pooltracker"
)

type PoolExtractorTestSuite struct {
apptesting.ConcentratedKeeperTestHelper
}

func TestPoolExtractorTestSuite(t *testing.T) {
suite.Run(t, new(PoolExtractorTestSuite))
}

// TestExtractor tests that the appropriate pools are extracted
// when calling ExtractAll and ExtractChanged methods of the extractor.
func (s *PoolExtractorTestSuite) TestExtractor() {

s.Setup()

// Initialized chain pools
chainPools := s.PrepareAllSupportedPools()

// Get all chain pools from state for asserting later
allChainPools, err := s.App.PoolManagerKeeper.AllPools(s.Ctx)
s.Require().NoError(err)

// Initialize a position on the concentrated pool
concentratedPoolWithPosition := s.PrepareConcentratedPoolWithCoinsAndFullRangePosition(apptesting.ETH, apptesting.USDC)

keepers := commondomain.PoolExtractorKeepers{
GammKeeper: s.App.GAMMKeeper,
CosmWasmPoolKeeper: s.App.CosmwasmPoolKeeper,
WasmKeeper: s.App.WasmKeeper,
ConcentratedKeeper: s.App.ConcentratedLiquidityKeeper,
PoolManagerKeeper: s.App.PoolManagerKeeper,
BankKeeper: s.App.BankKeeper,
}

poolTracker := pooltracker.NewMemory()

// Track only the concentrated pool
concentratedPool, err := s.App.ConcentratedLiquidityKeeper.GetPool(s.Ctx, chainPools.ConcentratedPoolID)
s.Require().NoError(err)
poolTracker.TrackConcentrated(concentratedPool)

// Track tick change for a concentraed pool.
poolTracker.TrackConcentratedPoolIDTickChange(concentratedPoolWithPosition.GetId())

// Initialize the extractor
extractor := poolextractor.New(keepers, poolTracker)

// System under test #1
blockPools, err := extractor.ExtractAll(s.Ctx)
s.Require().NoError(err)

// Validate all pools are extracted
allPools := blockPools.GetAll()
// + 1 for an extra concentrated pool.
s.Require().Equal(len(allChainPools)+1, len(allPools))

// System under test #2
// Extract the pools again but now only changed
blockPools, err = extractor.ExtractChanged(s.Ctx)
s.Require().NoError(err)

// Validate only the changed pools are extracted
changedPools := blockPools.GetAll()
s.Require().Equal(2, len(changedPools))

// Validate that the tick change is detected
s.Require().Len(blockPools.ConcentratedPoolIDTickChange, 2)
_, ok := blockPools.ConcentratedPoolIDTickChange[concentratedPoolWithPosition.GetId()]
s.Require().True(ok)
}
101 changes: 101 additions & 0 deletions ingest/common/poolextractor/pool_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package poolextractor

import (
sdk "github.com/cosmos/cosmos-sdk/types"

commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain"
)

// poolExtractor is an abstraction that extracts pools from the chain.
type poolExtractor struct {
keepers commondomain.PoolExtractorKeepers
poolTracker domain.BlockPoolUpdateTracker
}

// New creates a new pool extractor.
func New(keepers commondomain.PoolExtractorKeepers, poolTracker domain.BlockPoolUpdateTracker) commondomain.PoolExtractor {
return &poolExtractor{
keepers: keepers,
poolTracker: poolTracker,
}
}

// ExtractAll implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, error) {
// Concentrated pools

concentratedPools, err := p.keepers.ConcentratedKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// CFMM pools

cfmmPools, err := p.keepers.GammKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// CosmWasm pools

cosmWasmPools, err := p.keepers.CosmWasmPoolKeeper.GetPoolsWithWasmKeeper(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// Generate the initial cwPool address to pool mapping
for _, pool := range cosmWasmPools {
p.poolTracker.TrackCosmWasmPoolsAddressToPoolMap(pool)
}

blockPools := commondomain.BlockPools{
ConcentratedPools: concentratedPools,
CosmWasmPools: cosmWasmPools,
CFMMPools: cfmmPools,
}

return blockPools, nil
}

// ExtractChanged implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractChanged(ctx sdk.Context) (commondomain.BlockPools, error) {
// If not cold start, we only process the pools that were changed this block.
concentratedPools := p.poolTracker.GetConcentratedPools()
concentratedPoolIDTickChange := p.poolTracker.GetConcentratedPoolIDTickChange()
cfmmPools := p.poolTracker.GetCFMMPools()
cosmWasmPools := p.poolTracker.GetCosmWasmPools()

changedBlockPools := commondomain.BlockPools{
ConcentratedPools: concentratedPools,
ConcentratedPoolIDTickChange: concentratedPoolIDTickChange,
CosmWasmPools: cosmWasmPools,
CFMMPools: cfmmPools,
}

poolIDsTracked := make(map[uint64]struct{}, len(changedBlockPools.ConcentratedPools))

// Copy over the pools that were changed in the block
for _, pool := range changedBlockPools.ConcentratedPools {
changedBlockPools.ConcentratedPoolIDTickChange[pool.GetId()] = struct{}{}

poolIDsTracked[pool.GetId()] = struct{}{}
}

// Update concentrated pools
for poolID := range concentratedPoolIDTickChange {
// Skip if the pool if it is already tracked
if _, ok := poolIDsTracked[poolID]; ok {
continue
}

pool, err := p.keepers.ConcentratedKeeper.GetConcentratedPoolById(ctx, poolID)
if err != nil {
return commondomain.BlockPools{}, err
}

changedBlockPools.ConcentratedPools = append(changedBlockPools.ConcentratedPools, pool)
}

return changedBlockPools, nil
}
Loading