Skip to content

Commit

Permalink
refactor/test(ingest): more common ingester abstractions for pushing …
Browse files Browse the repository at this point in the history
…pool data (#8475)

* refactor/test: more common ingester abstractions

* lint

* remove sqs pool tracker

* renames

* lint

(cherry picked from commit dcef8c7)

# Conflicts:
#	ingest/sqs/service/sqs_streaming_service_test.go
  • Loading branch information
p0mvn authored and mergify[bot] committed Jul 4, 2024
1 parent 81365bb commit cd2c309
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 28 deletions.
5 changes: 3 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cosmos/ibc-go/v7/modules/apps/transfer"
ibc "github.com/cosmos/ibc-go/v7/modules/core"

"github.com/osmosis-labs/osmosis/v25/ingest/common/pooltracker"
"github.com/osmosis-labs/osmosis/v25/ingest/common/writelistener"
"github.com/osmosis-labs/osmosis/v25/ingest/indexer"
indexerdomain "github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
Expand Down Expand Up @@ -300,7 +301,7 @@ func NewOsmosisApp(

// Initialize the SQS ingester if it is enabled.
if sqsConfig.IsEnabled {
sqsKeepers := commondomain.PoolExtracterKeepers{
sqsKeepers := commondomain.PoolExtractorKeepers{
GammKeeper: app.GAMMKeeper,
CosmWasmPoolKeeper: app.CosmwasmPoolKeeper,
WasmKeeper: app.WasmKeeper,
Expand All @@ -318,7 +319,7 @@ func NewOsmosisApp(

// Create pool tracker that tracks pool updates
// made by the write listenetrs.
poolTracker := sqsservice.NewPoolTracker()
poolTracker := pooltracker.NewMemory()

// Create write listeners for the SQS service.
writeListeners := getSQSServiceWriteListeners(app, appCodec, poolTracker, app.WasmKeeper)
Expand Down
46 changes: 46 additions & 0 deletions ingest/common/domain/block_process_strategy_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package commondomain

// BlockProcessStrategyManager is an interface for managing the strategy of pushing the blocks.
// Either all block data or only the block update are the possible options
// It is initialized with the strategy of pushing all data.
// If it observes an error, it will switch to pushing all data.
// If it ingested initial data and observed no error, it will switch to pushing only changed data.
type BlockProcessStrategyManager interface {
// ShouldPushAllData returns true if all data should be pushed.
ShouldPushAllData() bool

// MarkInitialDataIngested marks the initial data as ingested.
// After calling this function, ShouldPushAllData should return false.
MarkInitialDataIngested()

// MarkErrorObserved marks that an error has been observed.
MarkErrorObserved()
}

type blockProcessStrategyManager struct {
shouldPushAllData bool
}

var _ BlockProcessStrategyManager = &blockProcessStrategyManager{}

// NewBlockProcessStrategyManager creates a new push strategy manager.
func NewBlockProcessStrategyManager() BlockProcessStrategyManager {
return &blockProcessStrategyManager{
shouldPushAllData: true,
}
}

// ShouldPushAllData returns true if all data should be pushed.
func (c *blockProcessStrategyManager) ShouldPushAllData() bool {
return c.shouldPushAllData
}

// MarkInitialDataIngested marks the initial data as ingested.
func (c *blockProcessStrategyManager) MarkInitialDataIngested() {
c.shouldPushAllData = false
}

// MarkErrorObserved marks that an error has been observed.
func (c *blockProcessStrategyManager) MarkErrorObserved() {
c.shouldPushAllData = true
}
50 changes: 50 additions & 0 deletions ingest/common/domain/block_process_strategy_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package commondomain_test

import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/osmosis-labs/osmosis/v25/app/apptesting"
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
)

type CommonDomainTestSuite struct {
apptesting.ConcentratedKeeperTestHelper
}

func TestCommonDomainTestSuite(t *testing.T) {
suite.Run(t, new(CommonDomainTestSuite))
}

// Validates the invariant that the block process strategy manager.
// When initialized, should push all data.
// If the block process strategy manager has observed an error, it should push all data.
// If the block process strategy manager has not observed an error after pushing all data, it should push only changed data.
func (s *CommonDomainTestSuite) TestBlockProcessStrategyManager() {

blockStrategyManager := commondomain.NewBlockProcessStrategyManager()

// ShouldPushAllData should return true when initialized
s.Require().True(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// ShouldPushAllData should return false after MarkInitialDataIngested
s.Require().False(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkErrorObserved()

// ShouldPushAllData should return true after MarkErrorObserved
s.Require().True(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// ShouldPushAllData should return false after MarkInitialDataIngested again
s.Require().False(blockStrategyManager.ShouldPushAllData())

blockStrategyManager.MarkInitialDataIngested()

// Unchanged after MarkInitialDataIngested twice
s.Require().False(blockStrategyManager.ShouldPushAllData())
}
2 changes: 1 addition & 1 deletion ingest/common/domain/keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// Chain keepers required for extracting pool data.
type PoolExtracterKeepers struct {
type PoolExtractorKeepers struct {
GammKeeper PoolKeeper
CosmWasmPoolKeeper CosmWasmPoolKeeper
WasmKeeper WasmKeeper
Expand Down
40 changes: 40 additions & 0 deletions ingest/common/domain/mocks/pools_extracter_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mocks

import (
"github.com/cosmos/cosmos-sdk/types"

commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
)

type PoolsExtractorMock struct {
// AllBlockDataError is the error to return when ProcessAllBlockData is called.
AllBlockDataError error
// ChangedBlockDataError is the error to return when ProcessChangedBlockData is called.
ChangedBlockDataError error
// IsProcessAllBlockDataCalled is a flag indicating if ProcessAllBlockData was called.
IsProcessAllBlockDataCalled bool
// IsProcessAllChangedDataCalled is a flag indicating if ProcessChangedBlockData was called.
IsProcessAllChangedDataCalled bool
// If this is non-empty, ProcessAllBlockData(...) will panic with this message.
ProcessAllBlockDataPanicMsg string
// Block pools to return
BlockPools commondomain.BlockPools
}

var _ commondomain.PoolExtractor = &PoolsExtractorMock{}

// ExtractAll implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractAll(ctx types.Context) (commondomain.BlockPools, error) {
if p.ProcessAllBlockDataPanicMsg != "" {
panic(p.ProcessAllBlockDataPanicMsg)
}

p.IsProcessAllBlockDataCalled = true
return p.BlockPools, p.AllBlockDataError
}

// ExtractChanged implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractChanged(ctx types.Context) (commondomain.BlockPools, error) {
p.IsProcessAllChangedDataCalled = true
return p.BlockPools, p.ChangedBlockDataError
}
15 changes: 15 additions & 0 deletions ingest/common/domain/pools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package commondomain

import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

// PoolExtractor defines the interface for extracting pools.
type PoolExtractor interface {
// ExtractAll extracts all the pools available within the height associated
// with the context.
ExtractAll(ctx sdk.Context) (BlockPools, error)
// ExtractChanged extracts the pools that were changed in the block height associated
// with the context.
ExtractChanged(ctx sdk.Context) (BlockPools, error)
}
13 changes: 13 additions & 0 deletions ingest/common/domain/process_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package commondomain

import sdk "github.com/cosmos/cosmos-sdk/types"

// BlockProcessor is an interface for processing a block.
type BlockProcessor interface {
// ProcessBlock processes a block.
// It returns an error if the block processing fails.
ProcessBlock(ctx sdk.Context) error

// IsFullBlockProcessor returns true if the block processor is a full block processor.
IsFullBlockProcessor() bool
}
82 changes: 82 additions & 0 deletions ingest/common/poolextractor/pool_exractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package poolextractor_test

import (
"testing"

"github.com/stretchr/testify/suite"

"github.com/osmosis-labs/osmosis/v25/app/apptesting"
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/common/poolextractor"
"github.com/osmosis-labs/osmosis/v25/ingest/common/pooltracker"
)

type PoolExtractorTestSuite struct {
apptesting.ConcentratedKeeperTestHelper
}

func TestPoolExtractorTestSuite(t *testing.T) {
suite.Run(t, new(PoolExtractorTestSuite))
}

// TestExtractor tests that the appropriate pools are extracted
// when calling ExtractAll and ExtractChanged methods of the extractor.
func (s *PoolExtractorTestSuite) TestExtractor() {

s.Setup()

// Initialized chain pools
chainPools := s.PrepareAllSupportedPools()

// Get all chain pools from state for asserting later
allChainPools, err := s.App.PoolManagerKeeper.AllPools(s.Ctx)
s.Require().NoError(err)

// Initialize a position on the concentrated pool
concentratedPoolWithPosition := s.PrepareConcentratedPoolWithCoinsAndFullRangePosition(apptesting.ETH, apptesting.USDC)

keepers := commondomain.PoolExtractorKeepers{
GammKeeper: s.App.GAMMKeeper,
CosmWasmPoolKeeper: s.App.CosmwasmPoolKeeper,
WasmKeeper: s.App.WasmKeeper,
ConcentratedKeeper: s.App.ConcentratedLiquidityKeeper,
PoolManagerKeeper: s.App.PoolManagerKeeper,
BankKeeper: s.App.BankKeeper,
}

poolTracker := pooltracker.NewMemory()

// Track only the concentrated pool
concentratedPool, err := s.App.ConcentratedLiquidityKeeper.GetPool(s.Ctx, chainPools.ConcentratedPoolID)
s.Require().NoError(err)
poolTracker.TrackConcentrated(concentratedPool)

// Track tick change for a concentraed pool.
poolTracker.TrackConcentratedPoolIDTickChange(concentratedPoolWithPosition.GetId())

// Initialize the extractor
extractor := poolextractor.New(keepers, poolTracker)

// System under test #1
blockPools, err := extractor.ExtractAll(s.Ctx)
s.Require().NoError(err)

// Validate all pools are extracted
allPools := blockPools.GetAll()
// + 1 for an extra concentrated pool.
s.Require().Equal(len(allChainPools)+1, len(allPools))

// System under test #2
// Extract the pools again but now only changed
blockPools, err = extractor.ExtractChanged(s.Ctx)
s.Require().NoError(err)

// Validate only the changed pools are extracted
changedPools := blockPools.GetAll()
s.Require().Equal(2, len(changedPools))

// Validate that the tick change is detected
s.Require().Len(blockPools.ConcentratedPoolIDTickChange, 2)
_, ok := blockPools.ConcentratedPoolIDTickChange[concentratedPoolWithPosition.GetId()]
s.Require().True(ok)
}
101 changes: 101 additions & 0 deletions ingest/common/poolextractor/pool_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package poolextractor

import (
sdk "github.com/cosmos/cosmos-sdk/types"

commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain"
)

// poolExtractor is an abstraction that extracts pools from the chain.
type poolExtractor struct {
keepers commondomain.PoolExtractorKeepers
poolTracker domain.BlockPoolUpdateTracker
}

// New creates a new pool extractor.
func New(keepers commondomain.PoolExtractorKeepers, poolTracker domain.BlockPoolUpdateTracker) commondomain.PoolExtractor {
return &poolExtractor{
keepers: keepers,
poolTracker: poolTracker,
}
}

// ExtractAll implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, error) {
// Concentrated pools

concentratedPools, err := p.keepers.ConcentratedKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// CFMM pools

cfmmPools, err := p.keepers.GammKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// CosmWasm pools

cosmWasmPools, err := p.keepers.CosmWasmPoolKeeper.GetPoolsWithWasmKeeper(ctx)
if err != nil {
return commondomain.BlockPools{}, err
}

// Generate the initial cwPool address to pool mapping
for _, pool := range cosmWasmPools {
p.poolTracker.TrackCosmWasmPoolsAddressToPoolMap(pool)
}

blockPools := commondomain.BlockPools{
ConcentratedPools: concentratedPools,
CosmWasmPools: cosmWasmPools,
CFMMPools: cfmmPools,
}

return blockPools, nil
}

// ExtractChanged implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractChanged(ctx sdk.Context) (commondomain.BlockPools, error) {
// If not cold start, we only process the pools that were changed this block.
concentratedPools := p.poolTracker.GetConcentratedPools()
concentratedPoolIDTickChange := p.poolTracker.GetConcentratedPoolIDTickChange()
cfmmPools := p.poolTracker.GetCFMMPools()
cosmWasmPools := p.poolTracker.GetCosmWasmPools()

changedBlockPools := commondomain.BlockPools{
ConcentratedPools: concentratedPools,
ConcentratedPoolIDTickChange: concentratedPoolIDTickChange,
CosmWasmPools: cosmWasmPools,
CFMMPools: cfmmPools,
}

poolIDsTracked := make(map[uint64]struct{}, len(changedBlockPools.ConcentratedPools))

// Copy over the pools that were changed in the block
for _, pool := range changedBlockPools.ConcentratedPools {
changedBlockPools.ConcentratedPoolIDTickChange[pool.GetId()] = struct{}{}

poolIDsTracked[pool.GetId()] = struct{}{}
}

// Update concentrated pools
for poolID := range concentratedPoolIDTickChange {
// Skip if the pool if it is already tracked
if _, ok := poolIDsTracked[poolID]; ok {
continue
}

pool, err := p.keepers.ConcentratedKeeper.GetConcentratedPoolById(ctx, poolID)
if err != nil {
return commondomain.BlockPools{}, err
}

changedBlockPools.ConcentratedPools = append(changedBlockPools.ConcentratedPools, pool)
}

return changedBlockPools, nil
}
Loading

0 comments on commit cd2c309

Please sign in to comment.