-
Notifications
You must be signed in to change notification settings - Fork 592
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor/test(ingest): more common ingester abstractions for pushing …
- Loading branch information
1 parent
81365bb
commit 094a3a9
Showing
19 changed files
with
390 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package commondomain | ||
|
||
// BlockProcessStrategyManager is an interface for managing the strategy of pushing the blocks. | ||
// Either all block data or only the block update are the possible options | ||
// It is initialized with the strategy of pushing all data. | ||
// If it observes an error, it will switch to pushing all data. | ||
// If it ingested initial data and observed no error, it will switch to pushing only changed data. | ||
type BlockProcessStrategyManager interface { | ||
// ShouldPushAllData returns true if all data should be pushed. | ||
ShouldPushAllData() bool | ||
|
||
// MarkInitialDataIngested marks the initial data as ingested. | ||
// After calling this function, ShouldPushAllData should return false. | ||
MarkInitialDataIngested() | ||
|
||
// MarkErrorObserved marks that an error has been observed. | ||
MarkErrorObserved() | ||
} | ||
|
||
type blockProcessStrategyManager struct { | ||
shouldPushAllData bool | ||
} | ||
|
||
var _ BlockProcessStrategyManager = &blockProcessStrategyManager{} | ||
|
||
// NewBlockProcessStrategyManager creates a new push strategy manager. | ||
func NewBlockProcessStrategyManager() BlockProcessStrategyManager { | ||
return &blockProcessStrategyManager{ | ||
shouldPushAllData: true, | ||
} | ||
} | ||
|
||
// ShouldPushAllData returns true if all data should be pushed. | ||
func (c *blockProcessStrategyManager) ShouldPushAllData() bool { | ||
return c.shouldPushAllData | ||
} | ||
|
||
// MarkInitialDataIngested marks the initial data as ingested. | ||
func (c *blockProcessStrategyManager) MarkInitialDataIngested() { | ||
c.shouldPushAllData = false | ||
} | ||
|
||
// MarkErrorObserved marks that an error has been observed. | ||
func (c *blockProcessStrategyManager) MarkErrorObserved() { | ||
c.shouldPushAllData = true | ||
} |
50 changes: 50 additions & 0 deletions
50
ingest/common/domain/block_process_strategy_manager_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package commondomain_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/suite" | ||
|
||
"github.com/osmosis-labs/osmosis/v25/app/apptesting" | ||
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain" | ||
) | ||
|
||
type CommonDomainTestSuite struct { | ||
apptesting.ConcentratedKeeperTestHelper | ||
} | ||
|
||
func TestCommonDomainTestSuite(t *testing.T) { | ||
suite.Run(t, new(CommonDomainTestSuite)) | ||
} | ||
|
||
// Validates the invariant that the block process strategy manager. | ||
// When initialized, should push all data. | ||
// If the block process strategy manager has observed an error, it should push all data. | ||
// If the block process strategy manager has not observed an error after pushing all data, it should push only changed data. | ||
func (s *CommonDomainTestSuite) TestBlockProcessStrategyManager() { | ||
|
||
blockStrategyManager := commondomain.NewBlockProcessStrategyManager() | ||
|
||
// ShouldPushAllData should return true when initialized | ||
s.Require().True(blockStrategyManager.ShouldPushAllData()) | ||
|
||
blockStrategyManager.MarkInitialDataIngested() | ||
|
||
// ShouldPushAllData should return false after MarkInitialDataIngested | ||
s.Require().False(blockStrategyManager.ShouldPushAllData()) | ||
|
||
blockStrategyManager.MarkErrorObserved() | ||
|
||
// ShouldPushAllData should return true after MarkErrorObserved | ||
s.Require().True(blockStrategyManager.ShouldPushAllData()) | ||
|
||
blockStrategyManager.MarkInitialDataIngested() | ||
|
||
// ShouldPushAllData should return false after MarkInitialDataIngested again | ||
s.Require().False(blockStrategyManager.ShouldPushAllData()) | ||
|
||
blockStrategyManager.MarkInitialDataIngested() | ||
|
||
// Unchanged after MarkInitialDataIngested twice | ||
s.Require().False(blockStrategyManager.ShouldPushAllData()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package mocks | ||
|
||
import ( | ||
"github.com/cosmos/cosmos-sdk/types" | ||
|
||
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain" | ||
) | ||
|
||
type PoolsExtractorMock struct { | ||
// AllBlockDataError is the error to return when ProcessAllBlockData is called. | ||
AllBlockDataError error | ||
// ChangedBlockDataError is the error to return when ProcessChangedBlockData is called. | ||
ChangedBlockDataError error | ||
// IsProcessAllBlockDataCalled is a flag indicating if ProcessAllBlockData was called. | ||
IsProcessAllBlockDataCalled bool | ||
// IsProcessAllChangedDataCalled is a flag indicating if ProcessChangedBlockData was called. | ||
IsProcessAllChangedDataCalled bool | ||
// If this is non-empty, ProcessAllBlockData(...) will panic with this message. | ||
ProcessAllBlockDataPanicMsg string | ||
// Block pools to return | ||
BlockPools commondomain.BlockPools | ||
} | ||
|
||
var _ commondomain.PoolExtractor = &PoolsExtractorMock{} | ||
|
||
// ExtractAll implements commondomain.PoolExtractor. | ||
func (p *PoolsExtractorMock) ExtractAll(ctx types.Context) (commondomain.BlockPools, error) { | ||
if p.ProcessAllBlockDataPanicMsg != "" { | ||
panic(p.ProcessAllBlockDataPanicMsg) | ||
} | ||
|
||
p.IsProcessAllBlockDataCalled = true | ||
return p.BlockPools, p.AllBlockDataError | ||
} | ||
|
||
// ExtractChanged implements commondomain.PoolExtractor. | ||
func (p *PoolsExtractorMock) ExtractChanged(ctx types.Context) (commondomain.BlockPools, error) { | ||
p.IsProcessAllChangedDataCalled = true | ||
return p.BlockPools, p.ChangedBlockDataError | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package commondomain | ||
|
||
import ( | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
) | ||
|
||
// PoolExtractor defines the interface for extracting pools. | ||
type PoolExtractor interface { | ||
// ExtractAll extracts all the pools available within the height associated | ||
// with the context. | ||
ExtractAll(ctx sdk.Context) (BlockPools, error) | ||
// ExtractChanged extracts the pools that were changed in the block height associated | ||
// with the context. | ||
ExtractChanged(ctx sdk.Context) (BlockPools, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package commondomain | ||
|
||
import sdk "github.com/cosmos/cosmos-sdk/types" | ||
|
||
// BlockProcessor is an interface for processing a block. | ||
type BlockProcessor interface { | ||
// ProcessBlock processes a block. | ||
// It returns an error if the block processing fails. | ||
ProcessBlock(ctx sdk.Context) error | ||
|
||
// IsFullBlockProcessor returns true if the block processor is a full block processor. | ||
IsFullBlockProcessor() bool | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package poolextractor_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/suite" | ||
|
||
"github.com/osmosis-labs/osmosis/v25/app/apptesting" | ||
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain" | ||
"github.com/osmosis-labs/osmosis/v25/ingest/common/poolextractor" | ||
"github.com/osmosis-labs/osmosis/v25/ingest/common/pooltracker" | ||
) | ||
|
||
type PoolExtractorTestSuite struct { | ||
apptesting.ConcentratedKeeperTestHelper | ||
} | ||
|
||
func TestPoolExtractorTestSuite(t *testing.T) { | ||
suite.Run(t, new(PoolExtractorTestSuite)) | ||
} | ||
|
||
// TestExtractor tests that the appropriate pools are extracted | ||
// when calling ExtractAll and ExtractChanged methods of the extractor. | ||
func (s *PoolExtractorTestSuite) TestExtractor() { | ||
|
||
s.Setup() | ||
|
||
// Initialized chain pools | ||
chainPools := s.PrepareAllSupportedPools() | ||
|
||
// Get all chain pools from state for asserting later | ||
allChainPools, err := s.App.PoolManagerKeeper.AllPools(s.Ctx) | ||
s.Require().NoError(err) | ||
|
||
// Initialize a position on the concentrated pool | ||
concentratedPoolWithPosition := s.PrepareConcentratedPoolWithCoinsAndFullRangePosition(apptesting.ETH, apptesting.USDC) | ||
|
||
keepers := commondomain.PoolExtractorKeepers{ | ||
GammKeeper: s.App.GAMMKeeper, | ||
CosmWasmPoolKeeper: s.App.CosmwasmPoolKeeper, | ||
WasmKeeper: s.App.WasmKeeper, | ||
ConcentratedKeeper: s.App.ConcentratedLiquidityKeeper, | ||
PoolManagerKeeper: s.App.PoolManagerKeeper, | ||
BankKeeper: s.App.BankKeeper, | ||
} | ||
|
||
poolTracker := pooltracker.NewMemory() | ||
|
||
// Track only the concentrated pool | ||
concentratedPool, err := s.App.ConcentratedLiquidityKeeper.GetPool(s.Ctx, chainPools.ConcentratedPoolID) | ||
s.Require().NoError(err) | ||
poolTracker.TrackConcentrated(concentratedPool) | ||
|
||
// Track tick change for a concentraed pool. | ||
poolTracker.TrackConcentratedPoolIDTickChange(concentratedPoolWithPosition.GetId()) | ||
|
||
// Initialize the extractor | ||
extractor := poolextractor.New(keepers, poolTracker) | ||
|
||
// System under test #1 | ||
blockPools, err := extractor.ExtractAll(s.Ctx) | ||
s.Require().NoError(err) | ||
|
||
// Validate all pools are extracted | ||
allPools := blockPools.GetAll() | ||
// + 1 for an extra concentrated pool. | ||
s.Require().Equal(len(allChainPools)+1, len(allPools)) | ||
|
||
// System under test #2 | ||
// Extract the pools again but now only changed | ||
blockPools, err = extractor.ExtractChanged(s.Ctx) | ||
s.Require().NoError(err) | ||
|
||
// Validate only the changed pools are extracted | ||
changedPools := blockPools.GetAll() | ||
s.Require().Equal(2, len(changedPools)) | ||
|
||
// Validate that the tick change is detected | ||
s.Require().Len(blockPools.ConcentratedPoolIDTickChange, 2) | ||
_, ok := blockPools.ConcentratedPoolIDTickChange[concentratedPoolWithPosition.GetId()] | ||
s.Require().True(ok) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package poolextractor | ||
|
||
import ( | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
|
||
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain" | ||
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain" | ||
) | ||
|
||
// poolExtractor is an abstraction that extracts pools from the chain. | ||
type poolExtractor struct { | ||
keepers commondomain.PoolExtractorKeepers | ||
poolTracker domain.BlockPoolUpdateTracker | ||
} | ||
|
||
// New creates a new pool extractor. | ||
func New(keepers commondomain.PoolExtractorKeepers, poolTracker domain.BlockPoolUpdateTracker) commondomain.PoolExtractor { | ||
return &poolExtractor{ | ||
keepers: keepers, | ||
poolTracker: poolTracker, | ||
} | ||
} | ||
|
||
// ExtractAll implements commondomain.PoolExtractor. | ||
func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, error) { | ||
// Concentrated pools | ||
|
||
concentratedPools, err := p.keepers.ConcentratedKeeper.GetPools(ctx) | ||
if err != nil { | ||
return commondomain.BlockPools{}, err | ||
} | ||
|
||
// CFMM pools | ||
|
||
cfmmPools, err := p.keepers.GammKeeper.GetPools(ctx) | ||
if err != nil { | ||
return commondomain.BlockPools{}, err | ||
} | ||
|
||
// CosmWasm pools | ||
|
||
cosmWasmPools, err := p.keepers.CosmWasmPoolKeeper.GetPoolsWithWasmKeeper(ctx) | ||
if err != nil { | ||
return commondomain.BlockPools{}, err | ||
} | ||
|
||
// Generate the initial cwPool address to pool mapping | ||
for _, pool := range cosmWasmPools { | ||
p.poolTracker.TrackCosmWasmPoolsAddressToPoolMap(pool) | ||
} | ||
|
||
blockPools := commondomain.BlockPools{ | ||
ConcentratedPools: concentratedPools, | ||
CosmWasmPools: cosmWasmPools, | ||
CFMMPools: cfmmPools, | ||
} | ||
|
||
return blockPools, nil | ||
} | ||
|
||
// ExtractChanged implements commondomain.PoolExtractor. | ||
func (p *poolExtractor) ExtractChanged(ctx sdk.Context) (commondomain.BlockPools, error) { | ||
// If not cold start, we only process the pools that were changed this block. | ||
concentratedPools := p.poolTracker.GetConcentratedPools() | ||
concentratedPoolIDTickChange := p.poolTracker.GetConcentratedPoolIDTickChange() | ||
cfmmPools := p.poolTracker.GetCFMMPools() | ||
cosmWasmPools := p.poolTracker.GetCosmWasmPools() | ||
|
||
changedBlockPools := commondomain.BlockPools{ | ||
ConcentratedPools: concentratedPools, | ||
ConcentratedPoolIDTickChange: concentratedPoolIDTickChange, | ||
CosmWasmPools: cosmWasmPools, | ||
CFMMPools: cfmmPools, | ||
} | ||
|
||
poolIDsTracked := make(map[uint64]struct{}, len(changedBlockPools.ConcentratedPools)) | ||
|
||
// Copy over the pools that were changed in the block | ||
for _, pool := range changedBlockPools.ConcentratedPools { | ||
changedBlockPools.ConcentratedPoolIDTickChange[pool.GetId()] = struct{}{} | ||
|
||
poolIDsTracked[pool.GetId()] = struct{}{} | ||
} | ||
|
||
// Update concentrated pools | ||
for poolID := range concentratedPoolIDTickChange { | ||
// Skip if the pool if it is already tracked | ||
if _, ok := poolIDsTracked[poolID]; ok { | ||
continue | ||
} | ||
|
||
pool, err := p.keepers.ConcentratedKeeper.GetConcentratedPoolById(ctx, poolID) | ||
if err != nil { | ||
return commondomain.BlockPools{}, err | ||
} | ||
|
||
changedBlockPools.ConcentratedPools = append(changedBlockPools.ConcentratedPools, pool) | ||
} | ||
|
||
return changedBlockPools, nil | ||
} |
Oops, something went wrong.