diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 36c81c47..b910aba4 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -74,14 +74,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { savedTxs = make([]*types.TxResult, 0, txCount*blockNum) savedBlocks = make([]*types.Block, 0, blockNum) - capturedEvents = make([]*indexerTypes.NewBlock, 0) + capturedEvents = make([]events.Event, 0) mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - blockEvent, ok := e.(*indexerTypes.NewBlock) - require.True(t, ok) - - capturedEvents = append(capturedEvents, blockEvent) + if e.GetType() == indexerTypes.NewBlockEvent { + _, ok := e.(*indexerTypes.NewBlock) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + } }, } @@ -201,13 +202,20 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { require.Len(t, capturedEvents, len(blocks)-1) for index, event := range capturedEvents { + if event.GetType() != indexerTypes.NewBlockEvent { + continue + } + + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) + // Make sure the block is valid - assert.Equal(t, blocks[index+1], event.Block) + assert.Equal(t, blocks[index+1], eventData.Block) // Make sure the transaction results are valid - require.Len(t, event.Results, txCount) + require.Len(t, eventData.Results, txCount) - for txIndex, tx := range event.Results { + for txIndex, tx := range eventData.Results { assert.EqualValues(t, blocks[index+1].Height, tx.Height) assert.EqualValues(t, txIndex, tx.Index) assert.Equal(t, serializedTxs[txIndex], tx.Tx) @@ -229,14 +237,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { savedTxs = make([]*types.TxResult, 0, txCount*blockNum) savedBlocks = make([]*types.Block, 0, blockNum) - capturedEvents = make([]*indexerTypes.NewBlock, 0) + capturedEvents = make([]events.Event, 0) mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - blockEvent, ok := e.(*indexerTypes.NewBlock) - require.True(t, ok) - - capturedEvents = append(capturedEvents, blockEvent) + if e.GetType() == indexerTypes.NewBlockEvent { + _, ok := e.(*indexerTypes.NewBlock) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + } }, } @@ -378,12 +387,186 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { for index, event := range capturedEvents { // Make sure the block is valid - assert.Equal(t, blocks[index+1], event.Block) + eventData := event.(*indexerTypes.NewBlock) + assert.Equal(t, blocks[index+1], eventData.Block) + + // Make sure the transaction results are valid + require.Len(t, eventData.Results, txCount) + + for txIndex, tx := range eventData.Results { + assert.EqualValues(t, blocks[index+1].Height, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) + } + } + }) +} + +func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) { + t.Parallel() + + t.Run("valid txs flow, sequential", func(t *testing.T) { + t.Parallel() + + var cancelFn context.CancelFunc + + var ( + blockNum = 1000 + txCount = 10 + txs = generateTransactions(t, txCount) + serializedTxs = serializeTxs(t, txs) + blocks = generateBlocks(t, blockNum+1, txs) + + savedTxs = make([]*types.TxResult, 0, txCount*blockNum) + savedBlocks = make([]*types.Block, 0, blockNum) + capturedEvents = make([]events.Event, 0) + + mockEvents = &mockEvents{ + signalEventFn: func(e events.Event) { + if e.GetType() == indexerTypes.NewBlockEvent { + _, ok := e.(*indexerTypes.NewBlock) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + } + }, + } + + latestSaved = uint64(0) + + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (uint64, error) { + if latestSaved == 0 { + return 0, storageErrors.ErrNotFound + } + + return latestSaved, nil + }, + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } + + latestSaved = uint64(block.Height) + + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) + + return nil + }, + } + }, + } + + mockClient = &mockClient{ + createBatchFn: func() clientTypes.Batch { + return &mockBatch{ + executeFn: func(_ context.Context) ([]any, error) { + // Force an error + return nil, errors.New("something is flaky") + }, + countFn: func() int { + return 1 // to trigger execution + }, + } + }, + getLatestBlockNumberFn: func() (uint64, error) { + return uint64(blockNum), nil + }, + getBlockFn: func(num uint64) (*core_types.ResultBlock, error) { + // Sanity check + if num > uint64(blockNum) { + t.Fatalf("invalid block requested, %d", num) + } + + if len(blocks[num].Txs) != txCount { + t.Fatalf("invalid transactions, current size: %d", len(blocks[num].Txs)) + } + + return &core_types.ResultBlock{ + Block: blocks[num], + }, nil + }, + getBlockResultsFn: func(num uint64) (*core_types.ResultBlockResults, error) { + // Sanity check + if num > uint64(blockNum) { + t.Fatalf("invalid block requested, %d", num) + } + + return &core_types.ResultBlockResults{ + Height: int64(num), + Results: &state.ABCIResponses{ + DeliverTxs: make([]abci.ResponseDeliverTx, txCount), + }, + }, nil + }, + } + ) + + // Create the fetcher + f := New( + mockStorage, + mockClient, + mockEvents, + WithMaxSlots(10), + WithMaxChunkSize(50), + ) + + // Short interval to force spawning + f.queryInterval = 100 * time.Millisecond + + // Create the context + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + // Run the fetch + require.NoError(t, f.FetchChainData(ctx)) + + // Verify the transactions are saved correctly + require.Len(t, savedTxs, blockNum*txCount) + + for blockIndex := 0; blockIndex < blockNum; blockIndex++ { + assert.Equal(t, blocks[blockIndex+1], savedBlocks[blockIndex]) + + for txIndex := 0; txIndex < txCount; txIndex++ { + // since this is a linearized array of transactions + // we can access each item with: blockNum * length + txIndx + // where blockNum is the y-axis, and txIndx is the x-axis + tx := savedTxs[blockIndex*txCount+txIndex] + + assert.EqualValues(t, blockIndex+1, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) + } + } + + // Make sure proper events were emitted + // Blocks each have as many transactions as txCount. + txEventCount := (len(blocks) - 1) + require.Len(t, capturedEvents, txEventCount) + + for index, event := range capturedEvents { + if event.GetType() != indexerTypes.NewBlockEvent { + continue + } + + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) + + // Make sure the block is valid + assert.Equal(t, blocks[index+1], eventData.Block) // Make sure the transaction results are valid - require.Len(t, event.Results, txCount) + require.Len(t, eventData.Results, txCount) - for txIndex, tx := range event.Results { + for txIndex, tx := range eventData.Results { assert.EqualValues(t, blocks[index+1].Height, tx.Height) assert.EqualValues(t, txIndex, tx.Index) assert.Equal(t, serializedTxs[txIndex], tx.Tx) diff --git a/serve/filters/filter/base.go b/serve/filters/filter/base.go index b98cb27d..37a9d98d 100644 --- a/serve/filters/filter/base.go +++ b/serve/filters/filter/base.go @@ -39,6 +39,6 @@ func (b *baseFilter) UpdateLastUsed() { b.lastUsed = time.Now() } -func (b *baseFilter) GetChanges() any { +func (b *baseFilter) GetChanges() []any { return nil } diff --git a/serve/filters/filter/block.go b/serve/filters/filter/block.go index 6fa7fb0a..4fccd8f7 100644 --- a/serve/filters/filter/block.go +++ b/serve/filters/filter/block.go @@ -20,13 +20,26 @@ func NewBlockFilter() *BlockFilter { } // GetChanges returns all new block headers from the last query -func (b *BlockFilter) GetChanges() any { +func (b *BlockFilter) GetChanges() []any { + return b.getBlockChanges() +} + +func (b *BlockFilter) UpdateWith(data any) { + if block, ok := data.(*types.Block); ok { + b.updateWithBlock(block) + } +} + +// getBlockChanges returns all new block headers from the last query +func (b *BlockFilter) getBlockChanges() []any { b.Lock() defer b.Unlock() // Get hashes - hashes := make([]types.Header, len(b.blockHeaders)) - copy(hashes, b.blockHeaders) + hashes := make([]any, len(b.blockHeaders)) + for index, blockHeader := range b.blockHeaders { + hashes[index] = blockHeader + } // Empty headers b.blockHeaders = b.blockHeaders[:0] @@ -34,7 +47,7 @@ func (b *BlockFilter) GetChanges() any { return hashes } -func (b *BlockFilter) UpdateWithBlock(block *types.Block) { +func (b *BlockFilter) updateWithBlock(block *types.Block) { b.Lock() defer b.Unlock() diff --git a/serve/filters/filter/block_test.go b/serve/filters/filter/block_test.go index 24b56e66..0e4cc1d3 100644 --- a/serve/filters/filter/block_test.go +++ b/serve/filters/filter/block_test.go @@ -40,14 +40,11 @@ func TestBlockFilter_GetChanges(t *testing.T) { for _, block := range blocks { block := block - f.UpdateWithBlock(block) + f.UpdateWith(block) } // Get changes - changesRaw := f.GetChanges() - - changes, ok := changesRaw.([]types.Header) - require.True(t, ok) + changes := f.GetChanges() // Make sure the headers match require.Len(t, changes, len(blocks)) diff --git a/serve/filters/filter/tx.go b/serve/filters/filter/tx.go index a740be98..806354d5 100644 --- a/serve/filters/filter/tx.go +++ b/serve/filters/filter/tx.go @@ -1,32 +1,55 @@ package filter import ( + "math" + "github.com/gnolang/gno/tm2/pkg/bft/types" ) -type Options struct { - GasUsed struct{ Min, Max *int64 } - GasWanted struct{ Min, Max *int64 } - GasLimit struct{ Min, Max *int64 } +type RangeFilterOption struct { + Min *int64 `json:"min,omitempty"` + Max *int64 `json:"max,omitempty"` +} + +type TxFilterOption struct { + GasUsed *RangeFilterOption `json:"gasUsed,omitempty"` + GasWanted *RangeFilterOption `json:"gasWanted,omitempty"` + GasLimit *RangeFilterOption `json:"gasLimit,omitempty"` } // TxFilter holds a slice of transaction results. // It provides methods to manipulate and query the transactions. type TxFilter struct { - opts Options + opts TxFilterOption *baseFilter - txs []*types.TxResult + txs []types.TxResult } // NewTxFilter creates a new TxFilter object. -func NewTxFilter(opts Options) *TxFilter { +func NewTxFilter(opts TxFilterOption) *TxFilter { return &TxFilter{ baseFilter: newBaseFilter(TxFilterType), - txs: make([]*types.TxResult, 0), + txs: make([]types.TxResult, 0), opts: opts, } } +// GetChanges returns all new transactions from the last query +func (tf *TxFilter) GetChanges() []any { + return tf.getTxChanges() +} + +func (tf *TxFilter) UpdateWith(data any) { + tx, ok := data.(*types.TxResult) + if !ok { + return + } + + if tf.checkFilterOptions(tx) { + tf.updateWithTx(tx) + } +} + // GetHashes iterates over all transactions in the filter and returns their hashes. func (tf *TxFilter) GetHashes() [][]byte { tf.Lock() @@ -37,7 +60,7 @@ func (tf *TxFilter) GetHashes() [][]byte { for _, txr := range tf.txs { var hash []byte - if txr != nil && txr.Tx != nil { + if txr.Tx != nil { hash = txr.Tx.Hash() } @@ -47,61 +70,63 @@ func (tf *TxFilter) GetHashes() [][]byte { return hashes } -func (tf *TxFilter) UpdateWithTx(tx *types.TxResult) { - tf.Lock() - defer tf.Unlock() - - tf.txs = append(tf.txs, tx) -} - -// Apply applies all added conditions to the transactions in the filter. -// -// It returns a slice of `TxResult` that satisfy all the conditions. If no conditions are set, -// it returns all transactions in the filter. Also, if the filter value is invalid filter will not -// be applied. -func (tf *TxFilter) Apply() []*types.TxResult { - tf.Lock() - defer tf.Unlock() - - return checkOpts(tf.txs, tf.opts) -} - -func checkOpts(txs []*types.TxResult, opts Options) []*types.TxResult { - filtered := make([]*types.TxResult, 0, len(txs)) - - for _, tx := range txs { - if checkFilterCondition(tx, opts) { - filtered = append(filtered, tx) - } +// `checkFilterOptions` checks the conditions of the options in the filter. +func (tf *TxFilter) checkFilterOptions(tx *types.TxResult) bool { + if !filteredByRangeFilterOption(tx.Response.GasUsed, tf.opts.GasUsed) { + return false } - return filtered -} - -func checkFilterCondition(tx *types.TxResult, opts Options) bool { - if opts.GasLimit.Max != nil && tx.Response.GasUsed > *opts.GasLimit.Max { + if !filteredByRangeFilterOption(tx.Response.GasWanted, tf.opts.GasWanted) { return false } - if opts.GasLimit.Min != nil && tx.Response.GasUsed < *opts.GasLimit.Min { + // GasLimit compares GasUsed. + if !filteredByRangeFilterOption(tx.Response.GasUsed, tf.opts.GasLimit) { return false } - if opts.GasUsed.Max != nil && tx.Response.GasUsed > *opts.GasUsed.Max { - return false + return true +} + +// `filteredByRangeFilterOption` checks if the number is in a range. +func filteredByRangeFilterOption(value int64, rangeFilterOption *RangeFilterOption) bool { + if rangeFilterOption == nil { + return true } - if opts.GasUsed.Min != nil && tx.Response.GasUsed < *opts.GasUsed.Min { - return false + min := int64(0) + if rangeFilterOption.Min != nil { + min = *rangeFilterOption.Min } - if opts.GasWanted.Max != nil && tx.Response.GasWanted > *opts.GasWanted.Max { - return false + max := int64(math.MaxInt64) + if rangeFilterOption.Max != nil { + max = *rangeFilterOption.Max } - if opts.GasWanted.Min != nil && tx.Response.GasWanted < *opts.GasWanted.Min { - return false + return value >= min && value <= max +} + +// getTxChanges returns all new transactions from the last query +func (tf *TxFilter) getTxChanges() []any { + tf.Lock() + defer tf.Unlock() + + // Get newTxs + newTxs := make([]any, len(tf.txs)) + for index, tx := range tf.txs { + newTxs[index] = tx } - return true + // Empty headers + tf.txs = tf.txs[:0] + + return newTxs +} + +func (tf *TxFilter) updateWithTx(tx *types.TxResult) { + tf.Lock() + defer tf.Unlock() + + tf.txs = append(tf.txs, *tx) } diff --git a/serve/filters/filter/tx_test.go b/serve/filters/filter/tx_test.go index 0dc02015..168eb806 100644 --- a/serve/filters/filter/tx_test.go +++ b/serve/filters/filter/tx_test.go @@ -24,10 +24,10 @@ func TestGetHashes(t *testing.T) { {Tx: []byte(`c1dfd96eea8cc2b62785275bca38ac261256e278`)}, } - f := NewTxFilter(Options{}) + f := NewTxFilter(TxFilterOption{}) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } hashes := f.GetHashes() @@ -111,110 +111,110 @@ func TestApplyFilters(t *testing.T) { } tests := []struct { - options Options + options TxFilterOption name string expected []*types.TxResult }{ { name: "no filter", - options: Options{}, + options: TxFilterOption{}, expected: txs, }, { name: "min gas used is 0", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(0), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(0), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "invalid gas used", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "filter by gas wanted 1", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1100), int64Ptr(1200)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1100), Max: int64Ptr(1200)}, }, expected: []*types.TxResult{txs[1], txs[3], txs[4]}, }, { name: "gas wanted min, max is same value", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(1000)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[2]}, }, { name: "filter by gas used 2", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(900), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(900), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas used min, max is same value", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[4]}, }, { name: "filter by gas wanted is invalid", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1200), int64Ptr(1100)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1200), Max: int64Ptr(1100)}, }, expected: []*types.TxResult{}, }, { name: "gas used filter is invalid", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "gas limit min value is nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{nil, int64Ptr(1000)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: nil, Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas limit max value is nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(1100), nil}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(1100), Max: nil}, }, expected: []*types.TxResult{txs[1], txs[2]}, }, { name: "gas limit range is valid", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(900), int64Ptr(1000)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(900), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas limit both min and max are nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{nil, nil}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: nil, Max: nil}, }, expected: txs, }, { name: "gas limit min is larger than max", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "gas used min is nil", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{nil, int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: nil, Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, @@ -228,21 +228,21 @@ func TestApplyFilters(t *testing.T) { f := NewTxFilter(tt.options) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } - filtered := f.Apply() + changes := f.GetChanges() require.Len( - t, filtered, len(tt.expected), + t, changes, len(tt.expected), fmt.Sprintf( "There should be one transaction after applying filters: %v", len(tt.expected), ), ) - for i, tx := range filtered { + for i, tx := range changes { assert.Equal( - t, tt.expected[i], tx, + t, *tt.expected[i], tx, fmt.Sprintf( "The filtered transaction should match the expected transaction: %v", tt.expected[i], @@ -276,19 +276,19 @@ func TestApplyFiltersWithLargeData(t *testing.T) { } tests := []struct { - options Options + options TxFilterOption name string expected int }{ { name: "no filter", - options: Options{}, + options: TxFilterOption{}, expected: txCount, }, { name: "filter by gas used", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(950), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(950), Max: int64Ptr(1000)}, }, expected: txCount / 2, }, @@ -302,15 +302,15 @@ func TestApplyFiltersWithLargeData(t *testing.T) { f := NewTxFilter(tt.options) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } - filtered := f.Apply() + changes := f.GetChanges() require.Len( - t, filtered, tt.expected, + t, changes, tt.expected, fmt.Sprintf( "There should be %d transactions after applying filters. got %d", - tt.expected, len(filtered), + tt.expected, len(changes), ), ) }) diff --git a/serve/filters/manager.go b/serve/filters/manager.go index d5897183..7335aa77 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -46,8 +46,8 @@ func NewFilterManager( opt(filterManager) } - // Subscribe to new block events - go filterManager.subscribeToNewBlockEvent() + // Subscribe to new events + go filterManager.subscribeToEvents() // Start cleanup routine go filterManager.cleanupRoutine() @@ -62,6 +62,13 @@ func (f *Manager) NewBlockFilter() string { return f.filters.newFilter(blockFilter) } +// NewTransactionFilter creates a new Transaction filter, and returns the corresponding ID +func (f *Manager) NewTxFilter(options filter.TxFilterOption) string { + txFilter := filter.NewTxFilter(options) + + return f.filters.newFilter(txFilter) +} + // UninstallFilter removes a filter from the filter map using its ID. // Returns a flag indicating if the filter was removed func (f *Manager) UninstallFilter(id string) bool { @@ -73,6 +80,11 @@ func (f *Manager) NewBlockSubscription(conn conns.WSConnection) string { return f.newSubscription(filterSubscription.NewBlockSubscription(conn)) } +// NewTransactionSubscription creates a new transaction (new transactions) subscription (over WS) +func (f *Manager) NewTransactionSubscription(conn conns.WSConnection) string { + return f.newSubscription(filterSubscription.NewTransactionSubscription(conn)) +} + // newSubscription adds new subscription to the subscription map func (f *Manager) newSubscription(subscription subscription) string { return f.subscriptions.addSubscription(subscription) @@ -84,31 +96,42 @@ func (f *Manager) UninstallSubscription(id string) bool { return f.subscriptions.deleteSubscription(id) } -// subscribeToNewBlockEvent subscribes to new block events -func (f *Manager) subscribeToNewBlockEvent() { - blockSub := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent}) - defer f.events.CancelSubscription(blockSub.ID) +// subscribeToEvents subscribes to new events +func (f *Manager) subscribeToEvents() { + subscription := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent}) + defer f.events.CancelSubscription(subscription.ID) for { select { case <-f.ctx.Done(): return - case blockRaw, more := <-blockSub.SubCh: + case event, more := <-subscription.SubCh: if !more { return } - // The following code segments - // cannot be executed in parallel (go routines) - // because data sequencing should be persisted - // (info about block X comes before info on block X + 1) - newBlock := blockRaw.(*commonTypes.NewBlock) - - // Apply block to filters - f.updateFiltersWithBlock(newBlock.Block) - - // send events to all matching subscriptions - f.subscriptions.sendBlockEvent(newBlock.Block) + if event.GetType() == commonTypes.NewBlockEvent { + // The following code segments + // cannot be executed in parallel (go routines) + // because data sequencing should be persisted + // (info about block X comes before info on block X + 1) + newBlock, ok := event.(*commonTypes.NewBlock) + if ok { + // Apply block to filters + f.updateFiltersWithBlock(newBlock.Block) + + // Send events to all `newHeads` subscriptions + f.subscriptions.sendEvent(filterSubscription.NewHeadsEvent, newBlock.Block) + + for _, txResult := range newBlock.Results { + // Apply transaction to filters + f.updateFiltersWithTxResult(txResult) + + // Send events to all `newHeads` subscriptions + f.subscriptions.sendEvent(filterSubscription.NewTransactionsEvent, txResult) + } + } + } } } } @@ -116,7 +139,14 @@ func (f *Manager) subscribeToNewBlockEvent() { // updateFiltersWithBlock updates all filters with the incoming block func (f *Manager) updateFiltersWithBlock(block *types.Block) { f.filters.rangeItems(func(filter Filter) { - filter.UpdateWithBlock(block) + filter.UpdateWith(block) + }) +} + +// updateFiltersWithTxResult updates all filters with the incoming transactions +func (f *Manager) updateFiltersWithTxResult(txResult *types.TxResult) { + f.filters.rangeItems(func(filter Filter) { + filter.UpdateWith(txResult) }) } diff --git a/serve/filters/manager_test.go b/serve/filters/manager_test.go index 7e8314d6..1b39b52f 100644 --- a/serve/filters/manager_test.go +++ b/serve/filters/manager_test.go @@ -132,16 +132,16 @@ func Test_NewBlockEvents(t *testing.T) { require.Nil(t, err) // Get changes - blockHeadersRaw := blockFilter.GetChanges() + changes := blockFilter.GetChanges() - blockHeaders, ok := blockHeadersRaw.([]tm2Types.Header) - require.True(t, ok) - - if len(blockHeaders) == 0 { + if len(changes) == 0 { continue } - capturedHeaders = blockHeaders + capturedHeaders = make([]tm2Types.Header, len(changes)) + for index, header := range changes { + capturedHeaders[index] = header.(tm2Types.Header) + } return } diff --git a/serve/filters/subscription.go b/serve/filters/subscription.go index e91d81cc..e0c663ab 100644 --- a/serve/filters/subscription.go +++ b/serve/filters/subscription.go @@ -3,12 +3,13 @@ package filters import ( "sync" - "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/google/uuid" ) type subscription interface { - WriteResponse(id string, block *types.Block) error + GetType() events.Type + WriteResponse(id string, data any) error } // subscriptionMap keeps track of ongoing data subscriptions @@ -39,9 +40,9 @@ func (sm *subscriptionMap) addSubscription(sub subscription) string { return id } -// sendBlockEvent alerts all active subscriptions of a block event. +// sendEvent alerts all active subscriptions of a event. // In case there was an error during writing, the subscription is removed -func (sm *subscriptionMap) sendBlockEvent(block *types.Block) { +func (sm *subscriptionMap) sendEvent(eventType events.Type, data any) { sm.Lock() defer sm.Unlock() @@ -61,13 +62,16 @@ func (sm *subscriptionMap) sendBlockEvent(block *types.Block) { for id, sub := range sm.subscriptions { sub := sub + if sub.GetType() != eventType { + continue + } wg.Add(1) go func(id string) { defer wg.Done() - if err := sub.WriteResponse(id, block); err != nil { + if err := sub.WriteResponse(id, data); err != nil { markInvalid(id) } }(id) diff --git a/serve/filters/subscription/block.go b/serve/filters/subscription/block.go index 7d8ec3fb..c92dad74 100644 --- a/serve/filters/subscription/block.go +++ b/serve/filters/subscription/block.go @@ -1,7 +1,10 @@ package subscription import ( + "fmt" + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/spec" @@ -23,7 +26,16 @@ func NewBlockSubscription(conn conns.WSConnection) *BlockSubscription { } } -func (b *BlockSubscription) WriteResponse(id string, block *types.Block) error { +func (b *BlockSubscription) GetType() events.Type { + return NewHeadsEvent +} + +func (b *BlockSubscription) WriteResponse(id string, data any) error { + block, ok := data.(*types.Block) + if !ok { + return fmt.Errorf("unable to cast block, %s", data) + } + encodedBlock, err := encode.PrepareValue(block.Header) if err != nil { return err diff --git a/serve/filters/subscription/tx.go b/serve/filters/subscription/tx.go new file mode 100644 index 00000000..8b3190f5 --- /dev/null +++ b/serve/filters/subscription/tx.go @@ -0,0 +1,45 @@ +package subscription + +import ( + "fmt" + + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" + "github.com/gnolang/tx-indexer/serve/conns" + "github.com/gnolang/tx-indexer/serve/encode" + "github.com/gnolang/tx-indexer/serve/spec" +) + +const ( + NewTransactionsEvent = "newTransactions" +) + +// TransactionSubscription is the new-transactions type +// subscription +type TransactionSubscription struct { + *baseSubscription +} + +func NewTransactionSubscription(conn conns.WSConnection) *TransactionSubscription { + return &TransactionSubscription{ + baseSubscription: newBaseSubscription(conn), + } +} + +func (b *TransactionSubscription) GetType() events.Type { + return NewTransactionsEvent +} + +func (b *TransactionSubscription) WriteResponse(id string, data any) error { + tx, ok := data.(*types.TxResult) + if !ok { + return fmt.Errorf("unable to cast txResult, %s", data) + } + + encodedTx, err := encode.PrepareValue(tx) + if err != nil { + return err + } + + return b.conn.WriteData(spec.NewJSONSubscribeResponse(id, encodedTx)) +} diff --git a/serve/filters/types.go b/serve/filters/types.go index f89dcd65..6bdca007 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -4,7 +4,6 @@ import ( "errors" "time" - "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/filters/filter" ) @@ -32,8 +31,8 @@ type Filter interface { UpdateLastUsed() // GetChanges returns any filter changes (specific to the filter type) - GetChanges() any + GetChanges() []any - // UpdateWithBlock updates the specific filter type with a new block - UpdateWithBlock(block *types.Block) + // UpdateWith updates the specific filter type with a event's data + UpdateWith(data any) } diff --git a/serve/handlers/subs/subs.go b/serve/handlers/subs/subs.go index e2ba341d..3f5acbe0 100644 --- a/serve/handlers/subs/subs.go +++ b/serve/handlers/subs/subs.go @@ -3,9 +3,9 @@ package subs import ( "fmt" - "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/filters" + "github.com/gnolang/tx-indexer/serve/filters/filter" "github.com/gnolang/tx-indexer/serve/filters/subscription" "github.com/gnolang/tx-indexer/serve/metadata" "github.com/gnolang/tx-indexer/serve/spec" @@ -44,6 +44,30 @@ func (h *Handler) newBlockFilter() string { return h.filterManager.NewBlockFilter() } +// NewTransactionFilterHandler creates a transaction filter object +func (h *Handler) NewTransactionFilterHandler( + _ *metadata.Metadata, + params []any, +) (any, *spec.BaseJSONError) { + // Check the params + if len(params) < 1 { + return nil, spec.GenerateInvalidParamCountError() + } + + var options filter.TxFilterOption + + err := spec.ParseObjectParameter(params[0], &options) + if err != nil { + return nil, spec.GenerateInvalidParamError(1) + } + + return h.newTxFilter(options), nil +} + +func (h *Handler) newTxFilter(options filter.TxFilterOption) string { + return h.filterManager.NewTxFilter(options) +} + // UninstallFilterHandler uninstalls a filter with given id func (h *Handler) UninstallFilterHandler( _ *metadata.Metadata, @@ -110,6 +134,8 @@ func (h *Handler) subscribe(connID, eventType string) (string, error) { switch eventType { case subscription.NewHeadsEvent: return h.filterManager.NewBlockSubscription(conn), nil + case subscription.NewTransactionsEvent: + return h.filterManager.NewTransactionSubscription(conn), nil default: return "", fmt.Errorf("invalid event type: %s", eventType) } @@ -164,27 +190,19 @@ func (h *Handler) GetFilterChangesHandler(_ *metadata.Metadata, params []any) (a return nil, spec.GenerateResponseError(err) } - // Handle block filter changes - changes := h.getBlockChanges(f) + // Handle filter changes + changes := f.GetChanges() - // Encode the response - encodedResponses := make([]string, len(changes)) + results := make([]string, len(changes)) - for index, change := range changes { - encodedResponse, encodeErr := encode.PrepareValue(change) + for index, changed := range changes { + encodedResponse, encodeErr := encode.PrepareValue(changed) if encodeErr != nil { return nil, spec.GenerateResponseError(encodeErr) } - encodedResponses[index] = encodedResponse + results[index] = encodedResponse } - return encodedResponses, nil -} - -func (h *Handler) getBlockChanges(filter filters.Filter) []types.Header { - // Get updates - blockHeaders, _ := filter.GetChanges().([]types.Header) - - return blockHeaders + return results, nil } diff --git a/serve/handlers/tx/tx.go b/serve/handlers/tx/tx.go index fed0bf8f..4338d6d4 100644 --- a/serve/handlers/tx/tx.go +++ b/serve/handlers/tx/tx.go @@ -2,6 +2,8 @@ package tx import ( "errors" + "fmt" + "strconv" "github.com/gnolang/gno/tm2/pkg/bft/types" @@ -31,18 +33,18 @@ func (h *Handler) GetTxHandler( } // Extract the params - blockNum, ok := params[0].(uint64) - if !ok { + blockNum, err := toUint64(params[0]) + if err != nil { return nil, spec.GenerateInvalidParamError(1) } - txIndex, ok := params[1].(uint32) - if !ok { - return nil, spec.GenerateInvalidParamError(1) + txIndex, err := toUint64(params[1]) + if err != nil { + return nil, spec.GenerateInvalidParamError(2) } // Run the handler - response, err := h.getTx(blockNum, txIndex) + response, err := h.getTx(blockNum, uint32(txIndex)) if err != nil { return nil, spec.GenerateResponseError(err) } @@ -123,3 +125,7 @@ func (h *Handler) getTxByHash(hash string) (*types.TxResult, error) { return tx, nil } + +func toUint64(data any) (uint64, error) { + return strconv.ParseUint(fmt.Sprintf("%v", data), 10, 64) +} diff --git a/serve/handlers/tx/tx_test.go b/serve/handlers/tx/tx_test.go index be723028..56bb26d7 100644 --- a/serve/handlers/tx/tx_test.go +++ b/serve/handlers/tx/tx_test.go @@ -23,7 +23,7 @@ func TestGetTx_InvalidParams(t *testing.T) { }{ { "invalid param length", - []any{1, 2, 3}, + []any{1}, }, { "invalid param type", diff --git a/serve/jsonrpc.go b/serve/jsonrpc.go index 94d3a0a2..df6b3fe0 100644 --- a/serve/jsonrpc.go +++ b/serve/jsonrpc.go @@ -160,6 +160,11 @@ func (j *JSONRPC) RegisterSubEndpoints(db storage.Storage) { subsHandler.NewBlockFilterHandler, ) + j.RegisterHandler( + "newTransactionFilter", + subsHandler.NewTransactionFilterHandler, + ) + j.RegisterHandler( "getFilterChanges", subsHandler.GetFilterChangesHandler, diff --git a/serve/spec/spec.go b/serve/spec/spec.go index 2503d485..4af3eea0 100644 --- a/serve/spec/spec.go +++ b/serve/spec/spec.go @@ -1,6 +1,9 @@ package spec -import "fmt" +import ( + "encoding/json" + "fmt" +) const ( JSONRPCVersion = "2.0" @@ -151,3 +154,17 @@ func GenerateInvalidParamCountError() *BaseJSONError { InvalidParamsErrorCode, ) } + +func ParseObjectParameter[T any](param any, data *T) error { + marshaled, err := json.Marshal(param) + if err != nil { + return err + } + + err = json.Unmarshal(marshaled, data) + if err != nil { + return err + } + + return nil +} diff --git a/types/events.go b/types/events.go index ec731419..e06f9109 100644 --- a/types/events.go +++ b/types/events.go @@ -6,7 +6,9 @@ import ( ) // NewBlockEvent is the event for when new blocks appear -var NewBlockEvent events.Type = "newHeads" +var ( + NewBlockEvent events.Type = "newHeads" +) type NewBlock struct { Block *types.Block