Skip to content

Commit

Permalink
feat: Added methods to handle new transactions (#36)
Browse files Browse the repository at this point in the history
* fix: Fix a parameter type casting

* feat: Add a `newTransactions` event subscription

* feat: Implement an RPC transaction filter

* fix: Send events by event type

* fix: Fix comments

* feat: Add transaction filter options

* fix: Remove reflect types

* fix: Change transaction event handling

* fix: Change exception conditions when getting tx

* fix: Fix a lint error
  • Loading branch information
jinoosss authored May 8, 2024
1 parent ee4570d commit a38eb3b
Show file tree
Hide file tree
Showing 18 changed files with 537 additions and 181 deletions.
215 changes: 199 additions & 16 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]*indexerTypes.NewBlock, 0)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
blockEvent, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)

capturedEvents = append(capturedEvents, blockEvent)
if e.GetType() == indexerTypes.NewBlockEvent {
_, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
}
},
}

Expand Down Expand Up @@ -201,13 +202,20 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {
require.Len(t, capturedEvents, len(blocks)-1)

for index, event := range capturedEvents {
if event.GetType() != indexerTypes.NewBlockEvent {
continue
}

eventData, ok := event.(*indexerTypes.NewBlock)
require.True(t, ok)

// Make sure the block is valid
assert.Equal(t, blocks[index+1], event.Block)
assert.Equal(t, blocks[index+1], eventData.Block)

// Make sure the transaction results are valid
require.Len(t, event.Results, txCount)
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range event.Results {
for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
Expand All @@ -229,14 +237,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]*indexerTypes.NewBlock, 0)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
blockEvent, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)

capturedEvents = append(capturedEvents, blockEvent)
if e.GetType() == indexerTypes.NewBlockEvent {
_, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
}
},
}

Expand Down Expand Up @@ -378,12 +387,186 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) {

for index, event := range capturedEvents {
// Make sure the block is valid
assert.Equal(t, blocks[index+1], event.Block)
eventData := event.(*indexerTypes.NewBlock)
assert.Equal(t, blocks[index+1], eventData.Block)

// Make sure the transaction results are valid
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
}
}
})
}

func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) {
t.Parallel()

t.Run("valid txs flow, sequential", func(t *testing.T) {
t.Parallel()

var cancelFn context.CancelFunc

var (
blockNum = 1000
txCount = 10
txs = generateTransactions(t, txCount)
serializedTxs = serializeTxs(t, txs)
blocks = generateBlocks(t, blockNum+1, txs)

savedTxs = make([]*types.TxResult, 0, txCount*blockNum)
savedBlocks = make([]*types.Block, 0, blockNum)
capturedEvents = make([]events.Event, 0)

mockEvents = &mockEvents{
signalEventFn: func(e events.Event) {
if e.GetType() == indexerTypes.NewBlockEvent {
_, ok := e.(*indexerTypes.NewBlock)
require.True(t, ok)
capturedEvents = append(capturedEvents, e)
}
},
}

latestSaved = uint64(0)

mockStorage = &mock.Storage{
GetLatestSavedHeightFn: func() (uint64, error) {
if latestSaved == 0 {
return 0, storageErrors.ErrNotFound
}

return latestSaved, nil
},
GetWriteBatchFn: func() storage.Batch {
return &mock.WriteBatch{
SetBlockFn: func(block *types.Block) error {
savedBlocks = append(savedBlocks, block)

// Check if all blocks are saved
if block.Height == int64(blockNum) {
// At this point, we can cancel the process
cancelFn()
}

latestSaved = uint64(block.Height)

return nil
},
SetTxFn: func(result *types.TxResult) error {
savedTxs = append(savedTxs, result)

return nil
},
}
},
}

mockClient = &mockClient{
createBatchFn: func() clientTypes.Batch {
return &mockBatch{
executeFn: func(_ context.Context) ([]any, error) {
// Force an error
return nil, errors.New("something is flaky")
},
countFn: func() int {
return 1 // to trigger execution
},
}
},
getLatestBlockNumberFn: func() (uint64, error) {
return uint64(blockNum), nil
},
getBlockFn: func(num uint64) (*core_types.ResultBlock, error) {
// Sanity check
if num > uint64(blockNum) {
t.Fatalf("invalid block requested, %d", num)
}

if len(blocks[num].Txs) != txCount {
t.Fatalf("invalid transactions, current size: %d", len(blocks[num].Txs))
}

return &core_types.ResultBlock{
Block: blocks[num],
}, nil
},
getBlockResultsFn: func(num uint64) (*core_types.ResultBlockResults, error) {
// Sanity check
if num > uint64(blockNum) {
t.Fatalf("invalid block requested, %d", num)
}

return &core_types.ResultBlockResults{
Height: int64(num),
Results: &state.ABCIResponses{
DeliverTxs: make([]abci.ResponseDeliverTx, txCount),
},
}, nil
},
}
)

// Create the fetcher
f := New(
mockStorage,
mockClient,
mockEvents,
WithMaxSlots(10),
WithMaxChunkSize(50),
)

// Short interval to force spawning
f.queryInterval = 100 * time.Millisecond

// Create the context
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

// Run the fetch
require.NoError(t, f.FetchChainData(ctx))

// Verify the transactions are saved correctly
require.Len(t, savedTxs, blockNum*txCount)

for blockIndex := 0; blockIndex < blockNum; blockIndex++ {
assert.Equal(t, blocks[blockIndex+1], savedBlocks[blockIndex])

for txIndex := 0; txIndex < txCount; txIndex++ {
// since this is a linearized array of transactions
// we can access each item with: blockNum * length + txIndx
// where blockNum is the y-axis, and txIndx is the x-axis
tx := savedTxs[blockIndex*txCount+txIndex]

assert.EqualValues(t, blockIndex+1, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
}
}

// Make sure proper events were emitted
// Blocks each have as many transactions as txCount.
txEventCount := (len(blocks) - 1)
require.Len(t, capturedEvents, txEventCount)

for index, event := range capturedEvents {
if event.GetType() != indexerTypes.NewBlockEvent {
continue
}

eventData, ok := event.(*indexerTypes.NewBlock)
require.True(t, ok)

// Make sure the block is valid
assert.Equal(t, blocks[index+1], eventData.Block)

// Make sure the transaction results are valid
require.Len(t, event.Results, txCount)
require.Len(t, eventData.Results, txCount)

for txIndex, tx := range event.Results {
for txIndex, tx := range eventData.Results {
assert.EqualValues(t, blocks[index+1].Height, tx.Height)
assert.EqualValues(t, txIndex, tx.Index)
assert.Equal(t, serializedTxs[txIndex], tx.Tx)
Expand Down
2 changes: 1 addition & 1 deletion serve/filters/filter/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ func (b *baseFilter) UpdateLastUsed() {
b.lastUsed = time.Now()
}

func (b *baseFilter) GetChanges() any {
func (b *baseFilter) GetChanges() []any {
return nil
}
21 changes: 17 additions & 4 deletions serve/filters/filter/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@ func NewBlockFilter() *BlockFilter {
}

// GetChanges returns all new block headers from the last query
func (b *BlockFilter) GetChanges() any {
func (b *BlockFilter) GetChanges() []any {
return b.getBlockChanges()
}

func (b *BlockFilter) UpdateWith(data any) {
if block, ok := data.(*types.Block); ok {
b.updateWithBlock(block)
}
}

// getBlockChanges returns all new block headers from the last query
func (b *BlockFilter) getBlockChanges() []any {
b.Lock()
defer b.Unlock()

// Get hashes
hashes := make([]types.Header, len(b.blockHeaders))
copy(hashes, b.blockHeaders)
hashes := make([]any, len(b.blockHeaders))
for index, blockHeader := range b.blockHeaders {
hashes[index] = blockHeader
}

// Empty headers
b.blockHeaders = b.blockHeaders[:0]

return hashes
}

func (b *BlockFilter) UpdateWithBlock(block *types.Block) {
func (b *BlockFilter) updateWithBlock(block *types.Block) {
b.Lock()
defer b.Unlock()

Expand Down
7 changes: 2 additions & 5 deletions serve/filters/filter/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ func TestBlockFilter_GetChanges(t *testing.T) {
for _, block := range blocks {
block := block

f.UpdateWithBlock(block)
f.UpdateWith(block)
}

// Get changes
changesRaw := f.GetChanges()

changes, ok := changesRaw.([]types.Header)
require.True(t, ok)
changes := f.GetChanges()

// Make sure the headers match
require.Len(t, changes, len(blocks))
Expand Down
Loading

0 comments on commit a38eb3b

Please sign in to comment.