From 5fbb00a8ee0e5698124cccf15cd03bcbbfaea00e Mon Sep 17 00:00:00 2001 From: jinoosss Date: Wed, 17 Apr 2024 10:46:30 +0900 Subject: [PATCH 01/10] fix: Fix a parameter type casting --- serve/handlers/tx/tx.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/serve/handlers/tx/tx.go b/serve/handlers/tx/tx.go index fed0bf8f..4338d6d4 100644 --- a/serve/handlers/tx/tx.go +++ b/serve/handlers/tx/tx.go @@ -2,6 +2,8 @@ package tx import ( "errors" + "fmt" + "strconv" "github.com/gnolang/gno/tm2/pkg/bft/types" @@ -31,18 +33,18 @@ func (h *Handler) GetTxHandler( } // Extract the params - blockNum, ok := params[0].(uint64) - if !ok { + blockNum, err := toUint64(params[0]) + if err != nil { return nil, spec.GenerateInvalidParamError(1) } - txIndex, ok := params[1].(uint32) - if !ok { - return nil, spec.GenerateInvalidParamError(1) + txIndex, err := toUint64(params[1]) + if err != nil { + return nil, spec.GenerateInvalidParamError(2) } // Run the handler - response, err := h.getTx(blockNum, txIndex) + response, err := h.getTx(blockNum, uint32(txIndex)) if err != nil { return nil, spec.GenerateResponseError(err) } @@ -123,3 +125,7 @@ func (h *Handler) getTxByHash(hash string) (*types.TxResult, error) { return tx, nil } + +func toUint64(data any) (uint64, error) { + return strconv.ParseUint(fmt.Sprintf("%v", data), 10, 64) +} From 3bde65350176aef65d4a0c265ea5172212b9c1d3 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Wed, 17 Apr 2024 17:09:36 +0900 Subject: [PATCH 02/10] feat: Add a `newTransactions` event subscription --- fetch/fetch.go | 7 + fetch/fetch_test.go | 240 +++++++++++++++++++++++++--- serve/filters/subscription/block.go | 9 +- serve/filters/subscription/tx.go | 40 +++++ serve/handlers/subs/subs.go | 61 +++++-- types/events.go | 17 +- 6 files changed, 339 insertions(+), 35 deletions(-) create mode 100644 serve/filters/subscription/tx.go diff --git a/fetch/fetch.go b/fetch/fetch.go index 16024f78..bfda87fc 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -219,6 +219,13 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { } f.events.SignalEvent(event) + + for _, txResult := range txResults { + event := &types.NewTransaction{ + TxResult: txResult, + } + f.events.SignalEvent(event) + } } f.logger.Info( diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 09e2bf72..8a60dbcf 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -74,14 +74,15 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { savedTxs = make([]*types.TxResult, 0, txCount*blockNum) savedBlocks = make([]*types.Block, 0, blockNum) - capturedEvents = make([]*indexerTypes.NewBlock, 0) + capturedEvents = make([]events.Event, 0) mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - blockEvent, ok := e.(*indexerTypes.NewBlock) - require.True(t, ok) - - capturedEvents = append(capturedEvents, blockEvent) + if e.GetType() == indexerTypes.NewBlockEvent { + _, ok := e.(*indexerTypes.NewBlock) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + } }, } @@ -201,16 +202,25 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { require.Len(t, capturedEvents, len(blocks)-1) for index, event := range capturedEvents { - // Make sure the block is valid - assert.Equal(t, blocks[index+1], event.Block) + switch event.GetType() { + case indexerTypes.NewBlockEvent: + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) - // Make sure the transaction results are valid - require.Len(t, event.Results, txCount) + // Make sure the block is valid + assert.Equal(t, blocks[index+1], eventData.Block) - for txIndex, tx := range event.Results { - assert.EqualValues(t, blocks[index+1].Height, tx.Height) - assert.EqualValues(t, txIndex, tx.Index) - assert.Equal(t, serializedTxs[txIndex], tx.Tx) + // Make sure the transaction results are valid + require.Len(t, eventData.Results, txCount) + + for txIndex, tx := range eventData.Results { + assert.EqualValues(t, blocks[index+1].Height, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) + } + case indexerTypes.NewTransactionsEvent: + _, ok := event.(*indexerTypes.NewTransaction) + require.True(t, ok) } } }) @@ -229,14 +239,19 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { savedTxs = make([]*types.TxResult, 0, txCount*blockNum) savedBlocks = make([]*types.Block, 0, blockNum) - capturedEvents = make([]*indexerTypes.NewBlock, 0) + capturedEvents = make([]events.Event, 0) mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - blockEvent, ok := e.(*indexerTypes.NewBlock) - require.True(t, ok) - - capturedEvents = append(capturedEvents, blockEvent) + switch e.GetType() { + case indexerTypes.NewBlockEvent: + _, ok := e.(*indexerTypes.NewBlock) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + case indexerTypes.NewTransactionsEvent: + _, ok := e.(*indexerTypes.NewTransaction) + require.True(t, ok) + } }, } @@ -378,12 +393,13 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { for index, event := range capturedEvents { // Make sure the block is valid - assert.Equal(t, blocks[index+1], event.Block) + eventData := event.(*indexerTypes.NewBlock) + assert.Equal(t, blocks[index+1], eventData.Block) // Make sure the transaction results are valid - require.Len(t, event.Results, txCount) + require.Len(t, eventData.Results, txCount) - for txIndex, tx := range event.Results { + for txIndex, tx := range eventData.Results { assert.EqualValues(t, blocks[index+1].Height, tx.Height) assert.EqualValues(t, txIndex, tx.Index) assert.Equal(t, serializedTxs[txIndex], tx.Tx) @@ -392,6 +408,188 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { }) } +func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) { + t.Parallel() + + t.Run("valid txs flow, sequential", func(t *testing.T) { + t.Parallel() + + var cancelFn context.CancelFunc + + var ( + blockNum = 1000 + txCount = 10 + txs = generateTransactions(t, txCount) + serializedTxs = serializeTxs(t, txs) + blocks = generateBlocks(t, blockNum+1, txs) + + savedTxs = make([]*types.TxResult, 0, txCount*blockNum) + savedBlocks = make([]*types.Block, 0, blockNum) + capturedEvents = make([]events.Event, 0) + + mockEvents = &mockEvents{ + signalEventFn: func(e events.Event) { + if e.GetType() == indexerTypes.NewTransactionsEvent { + _, ok := e.(*indexerTypes.NewTransaction) + require.True(t, ok) + capturedEvents = append(capturedEvents, e) + } + }, + } + + latestSaved = uint64(0) + + mockStorage = &mock.Storage{ + GetLatestSavedHeightFn: func() (uint64, error) { + if latestSaved == 0 { + return 0, storageErrors.ErrNotFound + } + + return latestSaved, nil + }, + GetWriteBatchFn: func() storage.Batch { + return &mock.WriteBatch{ + SetBlockFn: func(block *types.Block) error { + savedBlocks = append(savedBlocks, block) + + // Check if all blocks are saved + if block.Height == int64(blockNum) { + // At this point, we can cancel the process + cancelFn() + } + + latestSaved = uint64(block.Height) + + return nil + }, + SetTxFn: func(result *types.TxResult) error { + savedTxs = append(savedTxs, result) + + return nil + }, + } + }, + } + + mockClient = &mockClient{ + createBatchFn: func() clientTypes.Batch { + return &mockBatch{ + executeFn: func() ([]any, error) { + // Force an error + return nil, errors.New("something is flaky") + }, + countFn: func() int { + return 1 // to trigger execution + }, + } + }, + getLatestBlockNumberFn: func() (uint64, error) { + return uint64(blockNum), nil + }, + getBlockFn: func(num uint64) (*core_types.ResultBlock, error) { + // Sanity check + if num > uint64(blockNum) { + t.Fatalf("invalid block requested, %d", num) + } + + if len(blocks[num].Txs) != txCount { + t.Fatalf("invalid transactions, current size: %d", len(blocks[num].Txs)) + } + + return &core_types.ResultBlock{ + Block: blocks[num], + }, nil + }, + getBlockResultsFn: func(num uint64) (*core_types.ResultBlockResults, error) { + // Sanity check + if num > uint64(blockNum) { + t.Fatalf("invalid block requested, %d", num) + } + + return &core_types.ResultBlockResults{ + Height: int64(num), + Results: &state.ABCIResponses{ + DeliverTxs: make([]abci.ResponseDeliverTx, txCount), + }, + }, nil + }, + } + ) + + // Create the fetcher + f := New( + mockStorage, + mockClient, + mockEvents, + WithMaxSlots(10), + WithMaxChunkSize(50), + ) + + // Short interval to force spawning + f.queryInterval = 100 * time.Millisecond + + // Create the context + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + + // Run the fetch + require.NoError(t, f.FetchChainData(ctx)) + + // Verify the transactions are saved correctly + require.Len(t, savedTxs, blockNum*txCount) + + for blockIndex := 0; blockIndex < blockNum; blockIndex++ { + assert.Equal(t, blocks[blockIndex+1], savedBlocks[blockIndex]) + + for txIndex := 0; txIndex < txCount; txIndex++ { + // since this is a linearized array of transactions + // we can access each item with: blockNum * length + txIndx + // where blockNum is the y-axis, and txIndx is the x-axis + tx := savedTxs[blockIndex*txCount+txIndex] + + assert.EqualValues(t, blockIndex+1, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) + } + } + + // Make sure proper events were emitted + // Blocks each have as many transactions as txCount. + txEventCount := (len(blocks) - 1) * txCount + require.Len(t, capturedEvents, txEventCount) + + for index, event := range capturedEvents { + switch event.GetType() { + case indexerTypes.NewBlockEvent: + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) + + // Make sure the block is valid + assert.Equal(t, blocks[index+1], eventData.Block) + + // Make sure the transaction results are valid + require.Len(t, eventData.Results, txCount) + + for txIndex, tx := range eventData.Results { + assert.EqualValues(t, blocks[index+1].Height, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) + } + case indexerTypes.NewTransactionsEvent: + eventData, ok := event.(*indexerTypes.NewTransaction) + require.True(t, ok) + + blockIndex := index / txCount + txIndex := index % txCount + + // Make sure the tx is valid + assert.Equal(t, blocks[blockIndex+1].Txs[txIndex], eventData.TxResult.Tx) + assert.Equal(t, blocks[blockIndex+1].Height, eventData.TxResult.Height) + } + } + }) +} + func TestFetcher_FetchTransactions_Valid_EmptyBlocks(t *testing.T) { t.Parallel() diff --git a/serve/filters/subscription/block.go b/serve/filters/subscription/block.go index 7d8ec3fb..714064ec 100644 --- a/serve/filters/subscription/block.go +++ b/serve/filters/subscription/block.go @@ -1,6 +1,8 @@ package subscription import ( + "fmt" + "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/encode" @@ -23,7 +25,12 @@ func NewBlockSubscription(conn conns.WSConnection) *BlockSubscription { } } -func (b *BlockSubscription) WriteResponse(id string, block *types.Block) error { +func (b *BlockSubscription) WriteResponse(id string, data any) error { + block, ok := data.(*types.Block) + if !ok { + return fmt.Errorf("unable to cast block, %s", data) + } + encodedBlock, err := encode.PrepareValue(block.Header) if err != nil { return err diff --git a/serve/filters/subscription/tx.go b/serve/filters/subscription/tx.go new file mode 100644 index 00000000..0c2c2b79 --- /dev/null +++ b/serve/filters/subscription/tx.go @@ -0,0 +1,40 @@ +package subscription + +import ( + "fmt" + + "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/serve/conns" + "github.com/gnolang/tx-indexer/serve/encode" + "github.com/gnolang/tx-indexer/serve/spec" +) + +const ( + NewTransactionsEvent = "newTransactions" +) + +// TransactionSubscription is the new-heads type +// subscription +type TransactionSubscription struct { + *baseSubscription +} + +func NewTransactionSubscription(conn conns.WSConnection) *TransactionSubscription { + return &TransactionSubscription{ + baseSubscription: newBaseSubscription(conn), + } +} + +func (b *TransactionSubscription) WriteResponse(id string, data any) error { + tx, ok := data.(*types.TxResult) + if !ok { + return fmt.Errorf("unable to cast txResult, %s", data) + } + + encodedTx, err := encode.PrepareValue(tx) + if err != nil { + return err + } + + return b.conn.WriteData(spec.NewJSONSubscribeResponse(id, encodedTx)) +} diff --git a/serve/handlers/subs/subs.go b/serve/handlers/subs/subs.go index e2ba341d..f1f72ee6 100644 --- a/serve/handlers/subs/subs.go +++ b/serve/handlers/subs/subs.go @@ -2,8 +2,8 @@ package subs import ( "fmt" + "reflect" - "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/filters" "github.com/gnolang/tx-indexer/serve/filters/subscription" @@ -44,6 +44,23 @@ func (h *Handler) newBlockFilter() string { return h.filterManager.NewBlockFilter() } +// NewTransactionFilterHandler creates a transaction filter object +func (h *Handler) NewTransactionFilterHandler( + _ *metadata.Metadata, + params []any, +) (any, *spec.BaseJSONError) { + // Check the params + if len(params) != 0 { + return nil, spec.GenerateInvalidParamCountError() + } + + return h.newTxFilter(), nil +} + +func (h *Handler) newTxFilter() string { + return h.filterManager.NewTxFilter() +} + // UninstallFilterHandler uninstalls a filter with given id func (h *Handler) UninstallFilterHandler( _ *metadata.Metadata, @@ -110,6 +127,8 @@ func (h *Handler) subscribe(connID, eventType string) (string, error) { switch eventType { case subscription.NewHeadsEvent: return h.filterManager.NewBlockSubscription(conn), nil + case subscription.NewTransactionsEvent: + return h.filterManager.NewTransactionSubscription(conn), nil default: return "", fmt.Errorf("invalid event type: %s", eventType) } @@ -164,27 +183,45 @@ func (h *Handler) GetFilterChangesHandler(_ *metadata.Metadata, params []any) (a return nil, spec.GenerateResponseError(err) } - // Handle block filter changes - changes := h.getBlockChanges(f) + // Handle filter changes + changes, err := h.getFilterChanges(f) + if err != nil { + return nil, spec.GenerateResponseError(err) + } - // Encode the response - encodedResponses := make([]string, len(changes)) + results := make([]string, len(changes)) - for index, change := range changes { - encodedResponse, encodeErr := encode.PrepareValue(change) + for index, changed := range changes { + encodedResponse, encodeErr := encode.PrepareValue(changed) if encodeErr != nil { return nil, spec.GenerateResponseError(encodeErr) } - encodedResponses[index] = encodedResponse + results[index] = encodedResponse } - return encodedResponses, nil + return results, nil } -func (h *Handler) getBlockChanges(filter filters.Filter) []types.Header { +func (h *Handler) getFilterChanges(filter filters.Filter) ([]any, error) { // Get updates - blockHeaders, _ := filter.GetChanges().([]types.Header) + changes := filter.GetChanges() + value := reflect.ValueOf(changes) + + if value.Kind() == reflect.Ptr { + value = value.Elem() + } + + if value.Kind() != reflect.Slice { + return nil, fmt.Errorf("forEachValue: expected slice type, found %q", value.Kind().String()) + } + + results := make([]any, value.Len()) + + for i := 0; i < value.Len(); i++ { + val := value.Index(i).Interface() + results[i] = val + } - return blockHeaders + return results, nil } diff --git a/types/events.go b/types/events.go index ec731419..8ed13710 100644 --- a/types/events.go +++ b/types/events.go @@ -6,7 +6,10 @@ import ( ) // NewBlockEvent is the event for when new blocks appear -var NewBlockEvent events.Type = "newHeads" +var ( + NewBlockEvent events.Type = "newHeads" + NewTransactionsEvent events.Type = "newTransactions" +) type NewBlock struct { Block *types.Block @@ -20,3 +23,15 @@ func (n *NewBlock) GetType() events.Type { func (n *NewBlock) GetData() any { return n } + +type NewTransaction struct { + TxResult *types.TxResult +} + +func (n *NewTransaction) GetType() events.Type { + return NewTransactionsEvent +} + +func (n *NewTransaction) GetData() any { + return n +} From 04fd3dc1485c3e58bcb9c1d033087f3108902f01 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Wed, 17 Apr 2024 17:09:56 +0900 Subject: [PATCH 03/10] feat: Implement an RPC transaction filter --- serve/filters/filter/block.go | 8 +++- serve/filters/filter/block_test.go | 2 +- serve/filters/filter/tx.go | 54 ++++++++++++++++------ serve/filters/filter/tx_test.go | 8 ++-- serve/filters/manager.go | 74 ++++++++++++++++++++++-------- serve/filters/subscription.go | 44 +++++++++++++++++- serve/filters/types.go | 3 +- serve/jsonrpc.go | 5 ++ 8 files changed, 156 insertions(+), 42 deletions(-) diff --git a/serve/filters/filter/block.go b/serve/filters/filter/block.go index 6fa7fb0a..c3a2cb98 100644 --- a/serve/filters/filter/block.go +++ b/serve/filters/filter/block.go @@ -34,7 +34,13 @@ func (b *BlockFilter) GetChanges() any { return hashes } -func (b *BlockFilter) UpdateWithBlock(block *types.Block) { +func (b *BlockFilter) UpdateWith(data any) { + if block, ok := data.(*types.Block); ok { + b.updateWithBlock(block) + } +} + +func (b *BlockFilter) updateWithBlock(block *types.Block) { b.Lock() defer b.Unlock() diff --git a/serve/filters/filter/block_test.go b/serve/filters/filter/block_test.go index 24b56e66..f7c7f72a 100644 --- a/serve/filters/filter/block_test.go +++ b/serve/filters/filter/block_test.go @@ -40,7 +40,7 @@ func TestBlockFilter_GetChanges(t *testing.T) { for _, block := range blocks { block := block - f.UpdateWithBlock(block) + f.UpdateWith(block) } // Get changes diff --git a/serve/filters/filter/tx.go b/serve/filters/filter/tx.go index a740be98..58f7325f 100644 --- a/serve/filters/filter/tx.go +++ b/serve/filters/filter/tx.go @@ -15,18 +15,29 @@ type Options struct { type TxFilter struct { opts Options *baseFilter - txs []*types.TxResult + txs []types.TxResult } // NewTxFilter creates a new TxFilter object. func NewTxFilter(opts Options) *TxFilter { return &TxFilter{ baseFilter: newBaseFilter(TxFilterType), - txs: make([]*types.TxResult, 0), + txs: make([]types.TxResult, 0), opts: opts, } } +// GetChanges returns all new transactions from the last query +func (tf *TxFilter) GetChanges() any { + return tf.getTxChanges() +} + +func (tf *TxFilter) UpdateWith(data any) { + if tx, ok := data.(*types.TxResult); ok { + tf.updateWithTx(tx) + } +} + // GetHashes iterates over all transactions in the filter and returns their hashes. func (tf *TxFilter) GetHashes() [][]byte { tf.Lock() @@ -37,7 +48,7 @@ func (tf *TxFilter) GetHashes() [][]byte { for _, txr := range tf.txs { var hash []byte - if txr != nil && txr.Tx != nil { + if txr.Tx != nil { hash = txr.Tx.Hash() } @@ -47,27 +58,20 @@ func (tf *TxFilter) GetHashes() [][]byte { return hashes } -func (tf *TxFilter) UpdateWithTx(tx *types.TxResult) { - tf.Lock() - defer tf.Unlock() - - tf.txs = append(tf.txs, tx) -} - // Apply applies all added conditions to the transactions in the filter. // // It returns a slice of `TxResult` that satisfy all the conditions. If no conditions are set, // it returns all transactions in the filter. Also, if the filter value is invalid filter will not // be applied. -func (tf *TxFilter) Apply() []*types.TxResult { +func (tf *TxFilter) Apply() []types.TxResult { tf.Lock() defer tf.Unlock() return checkOpts(tf.txs, tf.opts) } -func checkOpts(txs []*types.TxResult, opts Options) []*types.TxResult { - filtered := make([]*types.TxResult, 0, len(txs)) +func checkOpts(txs []types.TxResult, opts Options) []types.TxResult { + filtered := make([]types.TxResult, 0, len(txs)) for _, tx := range txs { if checkFilterCondition(tx, opts) { @@ -78,7 +82,7 @@ func checkOpts(txs []*types.TxResult, opts Options) []*types.TxResult { return filtered } -func checkFilterCondition(tx *types.TxResult, opts Options) bool { +func checkFilterCondition(tx types.TxResult, opts Options) bool { if opts.GasLimit.Max != nil && tx.Response.GasUsed > *opts.GasLimit.Max { return false } @@ -105,3 +109,25 @@ func checkFilterCondition(tx *types.TxResult, opts Options) bool { return true } + +// getTxChanges returns all new transactions from the last query +func (tf *TxFilter) getTxChanges() []types.TxResult { + tf.Lock() + defer tf.Unlock() + + // Get newTxs + newTxs := make([]types.TxResult, len(tf.txs)) + copy(newTxs, tf.txs) + + // Empty headers + tf.txs = tf.txs[:0] + + return newTxs +} + +func (tf *TxFilter) updateWithTx(tx *types.TxResult) { + tf.Lock() + defer tf.Unlock() + + tf.txs = append(tf.txs, *tx) +} diff --git a/serve/filters/filter/tx_test.go b/serve/filters/filter/tx_test.go index 0dc02015..93b1d51c 100644 --- a/serve/filters/filter/tx_test.go +++ b/serve/filters/filter/tx_test.go @@ -27,7 +27,7 @@ func TestGetHashes(t *testing.T) { f := NewTxFilter(Options{}) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } hashes := f.GetHashes() @@ -228,7 +228,7 @@ func TestApplyFilters(t *testing.T) { f := NewTxFilter(tt.options) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } filtered := f.Apply() @@ -242,7 +242,7 @@ func TestApplyFilters(t *testing.T) { for i, tx := range filtered { assert.Equal( - t, tt.expected[i], tx, + t, *tt.expected[i], tx, fmt.Sprintf( "The filtered transaction should match the expected transaction: %v", tt.expected[i], @@ -302,7 +302,7 @@ func TestApplyFiltersWithLargeData(t *testing.T) { f := NewTxFilter(tt.options) for _, tx := range txs { - f.UpdateWithTx(tx) + f.UpdateWith(tx) } filtered := f.Apply() diff --git a/serve/filters/manager.go b/serve/filters/manager.go index d5897183..cb3ca3e5 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -46,8 +46,8 @@ func NewFilterManager( opt(filterManager) } - // Subscribe to new block events - go filterManager.subscribeToNewBlockEvent() + // Subscribe to new events + go filterManager.subscribeToEvents() // Start cleanup routine go filterManager.cleanupRoutine() @@ -62,6 +62,13 @@ func (f *Manager) NewBlockFilter() string { return f.filters.newFilter(blockFilter) } +// NewTransactionFilter creates a new Transaction filter, and returns the corresponding ID +func (f *Manager) NewTxFilter() string { + txFilter := filter.NewTxFilter(filter.Options{}) + + return f.filters.newFilter(txFilter) +} + // UninstallFilter removes a filter from the filter map using its ID. // Returns a flag indicating if the filter was removed func (f *Manager) UninstallFilter(id string) bool { @@ -73,6 +80,11 @@ func (f *Manager) NewBlockSubscription(conn conns.WSConnection) string { return f.newSubscription(filterSubscription.NewBlockSubscription(conn)) } +// NewTransactionSubscription creates a new transaction (new heads) subscription (over WS) +func (f *Manager) NewTransactionSubscription(conn conns.WSConnection) string { + return f.newSubscription(filterSubscription.NewTransactionSubscription(conn)) +} + // newSubscription adds new subscription to the subscription map func (f *Manager) newSubscription(subscription subscription) string { return f.subscriptions.addSubscription(subscription) @@ -84,31 +96,48 @@ func (f *Manager) UninstallSubscription(id string) bool { return f.subscriptions.deleteSubscription(id) } -// subscribeToNewBlockEvent subscribes to new block events -func (f *Manager) subscribeToNewBlockEvent() { - blockSub := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent}) - defer f.events.CancelSubscription(blockSub.ID) +// subscribeToEvents subscribes to new events +func (f *Manager) subscribeToEvents() { + subscription := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent, commonTypes.NewTransactionsEvent}) + defer f.events.CancelSubscription(subscription.ID) for { select { case <-f.ctx.Done(): return - case blockRaw, more := <-blockSub.SubCh: + case event, more := <-subscription.SubCh: if !more { return } - // The following code segments - // cannot be executed in parallel (go routines) - // because data sequencing should be persisted - // (info about block X comes before info on block X + 1) - newBlock := blockRaw.(*commonTypes.NewBlock) - - // Apply block to filters - f.updateFiltersWithBlock(newBlock.Block) - - // send events to all matching subscriptions - f.subscriptions.sendBlockEvent(newBlock.Block) + switch event.GetType() { + case commonTypes.NewBlockEvent: + // The following code segments + // cannot be executed in parallel (go routines) + // because data sequencing should be persisted + // (info about block X comes before info on block X + 1) + newBlock, ok := event.(*commonTypes.NewBlock) + if ok { + // Apply block to filters + f.updateFiltersWithBlock(newBlock.Block) + + // send events to all matching subscriptions + f.subscriptions.sendBlockEvent(newBlock.Block) + } + case commonTypes.NewTransactionsEvent: + if !more { + return + } + + newTransaction, ok := event.(*commonTypes.NewTransaction) + if ok { + // Apply block to filters + f.updateFiltersWithTxResult(newTransaction.TxResult) + + // send events to all matching subscriptions + f.subscriptions.sendTransactionEvent(newTransaction.TxResult) + } + } } } } @@ -116,7 +145,14 @@ func (f *Manager) subscribeToNewBlockEvent() { // updateFiltersWithBlock updates all filters with the incoming block func (f *Manager) updateFiltersWithBlock(block *types.Block) { f.filters.rangeItems(func(filter Filter) { - filter.UpdateWithBlock(block) + filter.UpdateWith(block) + }) +} + +// updateFiltersWithTxResult updates all filters with the incoming transactions +func (f *Manager) updateFiltersWithTxResult(txResult *types.TxResult) { + f.filters.rangeItems(func(filter Filter) { + filter.UpdateWith(txResult) }) } diff --git a/serve/filters/subscription.go b/serve/filters/subscription.go index e91d81cc..61d81f27 100644 --- a/serve/filters/subscription.go +++ b/serve/filters/subscription.go @@ -8,7 +8,7 @@ import ( ) type subscription interface { - WriteResponse(id string, block *types.Block) error + WriteResponse(id string, data any) error } // subscriptionMap keeps track of ongoing data subscriptions @@ -81,6 +81,48 @@ func (sm *subscriptionMap) sendBlockEvent(block *types.Block) { } } +// sendBlockEvent alerts all active subscriptions of a block event. +// In case there was an error during writing, the subscription is removed +func (sm *subscriptionMap) sendTransactionEvent(txResult *types.TxResult) { + sm.Lock() + defer sm.Unlock() + + var ( + invalidSends = make([]string, 0, len(sm.subscriptions)) + + invalidSendsMux sync.Mutex + wg sync.WaitGroup + ) + + markInvalid := func(id string) { + invalidSendsMux.Lock() + defer invalidSendsMux.Unlock() + + invalidSends = append(invalidSends, id) + } + + for id, sub := range sm.subscriptions { + sub := sub + + wg.Add(1) + + go func(id string) { + defer wg.Done() + + if err := sub.WriteResponse(id, txResult); err != nil { + markInvalid(id) + } + }(id) + } + + wg.Wait() + + // Prune out the invalid subscriptions + for _, invalidID := range invalidSends { + delete(sm.subscriptions, invalidID) + } +} + // deleteSubscription removes a subscription using the ID. // Returns a flag indicating if the subscription was indeed present and removedd func (sm *subscriptionMap) deleteSubscription(id string) bool { diff --git a/serve/filters/types.go b/serve/filters/types.go index f89dcd65..0c85dd6a 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -4,7 +4,6 @@ import ( "errors" "time" - "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/filters/filter" ) @@ -35,5 +34,5 @@ type Filter interface { GetChanges() any // UpdateWithBlock updates the specific filter type with a new block - UpdateWithBlock(block *types.Block) + UpdateWith(data any) } diff --git a/serve/jsonrpc.go b/serve/jsonrpc.go index 94d3a0a2..df6b3fe0 100644 --- a/serve/jsonrpc.go +++ b/serve/jsonrpc.go @@ -160,6 +160,11 @@ func (j *JSONRPC) RegisterSubEndpoints(db storage.Storage) { subsHandler.NewBlockFilterHandler, ) + j.RegisterHandler( + "newTransactionFilter", + subsHandler.NewTransactionFilterHandler, + ) + j.RegisterHandler( "getFilterChanges", subsHandler.GetFilterChangesHandler, From af691c6c4f37c3ac5e84cd02b4c7a719c8ef0076 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Wed, 17 Apr 2024 17:56:00 +0900 Subject: [PATCH 04/10] fix: Send events by event type --- serve/filters/manager.go | 8 ++--- serve/filters/subscription.go | 54 +++++------------------------ serve/filters/subscription/block.go | 5 +++ serve/filters/subscription/tx.go | 5 +++ serve/handlers/tx/tx.go | 2 +- 5 files changed, 23 insertions(+), 51 deletions(-) diff --git a/serve/filters/manager.go b/serve/filters/manager.go index cb3ca3e5..bac6850e 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -121,8 +121,8 @@ func (f *Manager) subscribeToEvents() { // Apply block to filters f.updateFiltersWithBlock(newBlock.Block) - // send events to all matching subscriptions - f.subscriptions.sendBlockEvent(newBlock.Block) + // send events to all `newHeads` subscriptions + f.subscriptions.sendEvent(commonTypes.NewBlockEvent, newBlock.Block) } case commonTypes.NewTransactionsEvent: if !more { @@ -134,8 +134,8 @@ func (f *Manager) subscribeToEvents() { // Apply block to filters f.updateFiltersWithTxResult(newTransaction.TxResult) - // send events to all matching subscriptions - f.subscriptions.sendTransactionEvent(newTransaction.TxResult) + // Send events to all `newTransactions` subscriptions + f.subscriptions.sendEvent(commonTypes.NewTransactionsEvent, newTransaction.TxResult) } } } diff --git a/serve/filters/subscription.go b/serve/filters/subscription.go index 61d81f27..e0c663ab 100644 --- a/serve/filters/subscription.go +++ b/serve/filters/subscription.go @@ -3,11 +3,12 @@ package filters import ( "sync" - "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/google/uuid" ) type subscription interface { + GetType() events.Type WriteResponse(id string, data any) error } @@ -39,9 +40,9 @@ func (sm *subscriptionMap) addSubscription(sub subscription) string { return id } -// sendBlockEvent alerts all active subscriptions of a block event. +// sendEvent alerts all active subscriptions of a event. // In case there was an error during writing, the subscription is removed -func (sm *subscriptionMap) sendBlockEvent(block *types.Block) { +func (sm *subscriptionMap) sendEvent(eventType events.Type, data any) { sm.Lock() defer sm.Unlock() @@ -61,55 +62,16 @@ func (sm *subscriptionMap) sendBlockEvent(block *types.Block) { for id, sub := range sm.subscriptions { sub := sub + if sub.GetType() != eventType { + continue + } wg.Add(1) go func(id string) { defer wg.Done() - if err := sub.WriteResponse(id, block); err != nil { - markInvalid(id) - } - }(id) - } - - wg.Wait() - - // Prune out the invalid subscriptions - for _, invalidID := range invalidSends { - delete(sm.subscriptions, invalidID) - } -} - -// sendBlockEvent alerts all active subscriptions of a block event. -// In case there was an error during writing, the subscription is removed -func (sm *subscriptionMap) sendTransactionEvent(txResult *types.TxResult) { - sm.Lock() - defer sm.Unlock() - - var ( - invalidSends = make([]string, 0, len(sm.subscriptions)) - - invalidSendsMux sync.Mutex - wg sync.WaitGroup - ) - - markInvalid := func(id string) { - invalidSendsMux.Lock() - defer invalidSendsMux.Unlock() - - invalidSends = append(invalidSends, id) - } - - for id, sub := range sm.subscriptions { - sub := sub - - wg.Add(1) - - go func(id string) { - defer wg.Done() - - if err := sub.WriteResponse(id, txResult); err != nil { + if err := sub.WriteResponse(id, data); err != nil { markInvalid(id) } }(id) diff --git a/serve/filters/subscription/block.go b/serve/filters/subscription/block.go index 714064ec..c92dad74 100644 --- a/serve/filters/subscription/block.go +++ b/serve/filters/subscription/block.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/spec" @@ -25,6 +26,10 @@ func NewBlockSubscription(conn conns.WSConnection) *BlockSubscription { } } +func (b *BlockSubscription) GetType() events.Type { + return NewHeadsEvent +} + func (b *BlockSubscription) WriteResponse(id string, data any) error { block, ok := data.(*types.Block) if !ok { diff --git a/serve/filters/subscription/tx.go b/serve/filters/subscription/tx.go index 0c2c2b79..6ec4a001 100644 --- a/serve/filters/subscription/tx.go +++ b/serve/filters/subscription/tx.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/gnolang/gno/tm2/pkg/bft/types" + "github.com/gnolang/tx-indexer/events" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/spec" @@ -25,6 +26,10 @@ func NewTransactionSubscription(conn conns.WSConnection) *TransactionSubscriptio } } +func (b *TransactionSubscription) GetType() events.Type { + return NewTransactionsEvent +} + func (b *TransactionSubscription) WriteResponse(id string, data any) error { tx, ok := data.(*types.TxResult) if !ok { diff --git a/serve/handlers/tx/tx.go b/serve/handlers/tx/tx.go index 4338d6d4..34b5acd4 100644 --- a/serve/handlers/tx/tx.go +++ b/serve/handlers/tx/tx.go @@ -28,7 +28,7 @@ func (h *Handler) GetTxHandler( params []any, ) (any, *spec.BaseJSONError) { // Check the params - if len(params) < 2 { + if len(params) != 2 { return nil, spec.GenerateInvalidParamCountError() } From 74735868f84f294d6a416c3390ed8a06ed69e7e5 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Wed, 17 Apr 2024 19:00:58 +0900 Subject: [PATCH 05/10] fix: Fix comments --- serve/filters/manager.go | 6 +++--- serve/filters/subscription/tx.go | 2 +- serve/filters/types.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/serve/filters/manager.go b/serve/filters/manager.go index bac6850e..bc8a542e 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -80,7 +80,7 @@ func (f *Manager) NewBlockSubscription(conn conns.WSConnection) string { return f.newSubscription(filterSubscription.NewBlockSubscription(conn)) } -// NewTransactionSubscription creates a new transaction (new heads) subscription (over WS) +// NewTransactionSubscription creates a new transaction (new transactions) subscription (over WS) func (f *Manager) NewTransactionSubscription(conn conns.WSConnection) string { return f.newSubscription(filterSubscription.NewTransactionSubscription(conn)) } @@ -121,7 +121,7 @@ func (f *Manager) subscribeToEvents() { // Apply block to filters f.updateFiltersWithBlock(newBlock.Block) - // send events to all `newHeads` subscriptions + // Send events to all `newHeads` subscriptions f.subscriptions.sendEvent(commonTypes.NewBlockEvent, newBlock.Block) } case commonTypes.NewTransactionsEvent: @@ -131,7 +131,7 @@ func (f *Manager) subscribeToEvents() { newTransaction, ok := event.(*commonTypes.NewTransaction) if ok { - // Apply block to filters + // Apply transaction to filters f.updateFiltersWithTxResult(newTransaction.TxResult) // Send events to all `newTransactions` subscriptions diff --git a/serve/filters/subscription/tx.go b/serve/filters/subscription/tx.go index 6ec4a001..8b3190f5 100644 --- a/serve/filters/subscription/tx.go +++ b/serve/filters/subscription/tx.go @@ -14,7 +14,7 @@ const ( NewTransactionsEvent = "newTransactions" ) -// TransactionSubscription is the new-heads type +// TransactionSubscription is the new-transactions type // subscription type TransactionSubscription struct { *baseSubscription diff --git a/serve/filters/types.go b/serve/filters/types.go index 0c85dd6a..88f01fac 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -33,6 +33,6 @@ type Filter interface { // GetChanges returns any filter changes (specific to the filter type) GetChanges() any - // UpdateWithBlock updates the specific filter type with a new block + // UpdateWith updates the specific filter type with a event's data UpdateWith(data any) } From 0a00823a099763b4592a0ff5a3cbc4007320d195 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Fri, 19 Apr 2024 18:58:47 +0900 Subject: [PATCH 06/10] feat: Add transaction filter options --- serve/filters/filter/tx.go | 81 ++++++++++++++++----------------- serve/filters/filter/tx_test.go | 76 ++++++++++++++++--------------- serve/filters/manager.go | 4 +- serve/handlers/subs/subs.go | 16 +++++-- serve/spec/spec.go | 19 +++++++- 5 files changed, 110 insertions(+), 86 deletions(-) diff --git a/serve/filters/filter/tx.go b/serve/filters/filter/tx.go index 58f7325f..40c6960f 100644 --- a/serve/filters/filter/tx.go +++ b/serve/filters/filter/tx.go @@ -1,25 +1,32 @@ package filter import ( + "math" + "github.com/gnolang/gno/tm2/pkg/bft/types" ) -type Options struct { - GasUsed struct{ Min, Max *int64 } - GasWanted struct{ Min, Max *int64 } - GasLimit struct{ Min, Max *int64 } +type RangeFilterOption struct { + Min *int64 `json:"min,omitempty"` + Max *int64 `json:"max,omitempty"` +} + +type TxFilterOption struct { + GasUsed *RangeFilterOption `json:"gasUsed,omitempty"` + GasWanted *RangeFilterOption `json:"gasWanted,omitempty"` + GasLimit *RangeFilterOption `json:"gasLimit,omitempty"` } // TxFilter holds a slice of transaction results. // It provides methods to manipulate and query the transactions. type TxFilter struct { - opts Options + opts TxFilterOption *baseFilter txs []types.TxResult } // NewTxFilter creates a new TxFilter object. -func NewTxFilter(opts Options) *TxFilter { +func NewTxFilter(opts TxFilterOption) *TxFilter { return &TxFilter{ baseFilter: newBaseFilter(TxFilterType), txs: make([]types.TxResult, 0), @@ -33,7 +40,12 @@ func (tf *TxFilter) GetChanges() any { } func (tf *TxFilter) UpdateWith(data any) { - if tx, ok := data.(*types.TxResult); ok { + tx, ok := data.(*types.TxResult) + if !ok { + return + } + + if tf.checkFilterOptions(tx) { tf.updateWithTx(tx) } } @@ -58,56 +70,41 @@ func (tf *TxFilter) GetHashes() [][]byte { return hashes } -// Apply applies all added conditions to the transactions in the filter. -// -// It returns a slice of `TxResult` that satisfy all the conditions. If no conditions are set, -// it returns all transactions in the filter. Also, if the filter value is invalid filter will not -// be applied. -func (tf *TxFilter) Apply() []types.TxResult { - tf.Lock() - defer tf.Unlock() - - return checkOpts(tf.txs, tf.opts) -} - -func checkOpts(txs []types.TxResult, opts Options) []types.TxResult { - filtered := make([]types.TxResult, 0, len(txs)) - - for _, tx := range txs { - if checkFilterCondition(tx, opts) { - filtered = append(filtered, tx) - } - } - - return filtered -} - -func checkFilterCondition(tx types.TxResult, opts Options) bool { - if opts.GasLimit.Max != nil && tx.Response.GasUsed > *opts.GasLimit.Max { +// `checkFilterOptions` checks the conditions of the options in the filter. +func (tf *TxFilter) checkFilterOptions(tx *types.TxResult) bool { + if !filteredByRangeFilterOption(tx.Response.GasUsed, tf.opts.GasUsed) { return false } - if opts.GasLimit.Min != nil && tx.Response.GasUsed < *opts.GasLimit.Min { + if !filteredByRangeFilterOption(tx.Response.GasWanted, tf.opts.GasWanted) { return false } - if opts.GasUsed.Max != nil && tx.Response.GasUsed > *opts.GasUsed.Max { + // GasLimit compares GasUsed. + if !filteredByRangeFilterOption(tx.Response.GasUsed, tf.opts.GasLimit) { return false } - if opts.GasUsed.Min != nil && tx.Response.GasUsed < *opts.GasUsed.Min { - return false + return true +} + +// `filteredByRangeFilterOption` checks if the number is in a range. +func filteredByRangeFilterOption(value int64, rangeFilterOption *RangeFilterOption) bool { + if rangeFilterOption == nil { + return true } - if opts.GasWanted.Max != nil && tx.Response.GasWanted > *opts.GasWanted.Max { - return false + min := int64(0) + if rangeFilterOption.Min != nil { + min = *rangeFilterOption.Min } - if opts.GasWanted.Min != nil && tx.Response.GasWanted < *opts.GasWanted.Min { - return false + max := int64(math.MaxInt64) + if rangeFilterOption.Max != nil { + max = *rangeFilterOption.Max } - return true + return value >= min && value <= max } // getTxChanges returns all new transactions from the last query diff --git a/serve/filters/filter/tx_test.go b/serve/filters/filter/tx_test.go index 93b1d51c..d34feac6 100644 --- a/serve/filters/filter/tx_test.go +++ b/serve/filters/filter/tx_test.go @@ -24,7 +24,7 @@ func TestGetHashes(t *testing.T) { {Tx: []byte(`c1dfd96eea8cc2b62785275bca38ac261256e278`)}, } - f := NewTxFilter(Options{}) + f := NewTxFilter(TxFilterOption{}) for _, tx := range txs { f.UpdateWith(tx) @@ -111,110 +111,110 @@ func TestApplyFilters(t *testing.T) { } tests := []struct { - options Options + options TxFilterOption name string expected []*types.TxResult }{ { name: "no filter", - options: Options{}, + options: TxFilterOption{}, expected: txs, }, { name: "min gas used is 0", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(0), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(0), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "invalid gas used", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "filter by gas wanted 1", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1100), int64Ptr(1200)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1100), Max: int64Ptr(1200)}, }, expected: []*types.TxResult{txs[1], txs[3], txs[4]}, }, { name: "gas wanted min, max is same value", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(1000)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[2]}, }, { name: "filter by gas used 2", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(900), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(900), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas used min, max is same value", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[4]}, }, { name: "filter by gas wanted is invalid", - options: Options{ - GasWanted: struct{ Min, Max *int64 }{int64Ptr(1200), int64Ptr(1100)}, + options: TxFilterOption{ + GasWanted: &RangeFilterOption{Min: int64Ptr(1200), Max: int64Ptr(1100)}, }, expected: []*types.TxResult{}, }, { name: "gas used filter is invalid", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "gas limit min value is nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{nil, int64Ptr(1000)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: nil, Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas limit max value is nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(1100), nil}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(1100), Max: nil}, }, expected: []*types.TxResult{txs[1], txs[2]}, }, { name: "gas limit range is valid", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(900), int64Ptr(1000)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(900), Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, { name: "gas limit both min and max are nil", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{nil, nil}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: nil, Max: nil}, }, expected: txs, }, { name: "gas limit min is larger than max", - options: Options{ - GasLimit: struct{ Min, Max *int64 }{int64Ptr(1000), int64Ptr(900)}, + options: TxFilterOption{ + GasLimit: &RangeFilterOption{Min: int64Ptr(1000), Max: int64Ptr(900)}, }, expected: []*types.TxResult{}, }, { name: "gas used min is nil", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{nil, int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: nil, Max: int64Ptr(1000)}, }, expected: []*types.TxResult{txs[0], txs[3], txs[4]}, }, @@ -231,7 +231,8 @@ func TestApplyFilters(t *testing.T) { f.UpdateWith(tx) } - filtered := f.Apply() + changes := f.GetChanges() + filtered := changes.([]types.TxResult) require.Len( t, filtered, len(tt.expected), fmt.Sprintf( @@ -276,19 +277,19 @@ func TestApplyFiltersWithLargeData(t *testing.T) { } tests := []struct { - options Options + options TxFilterOption name string expected int }{ { name: "no filter", - options: Options{}, + options: TxFilterOption{}, expected: txCount, }, { name: "filter by gas used", - options: Options{ - GasUsed: struct{ Min, Max *int64 }{int64Ptr(950), int64Ptr(1000)}, + options: TxFilterOption{ + GasUsed: &RangeFilterOption{Min: int64Ptr(950), Max: int64Ptr(1000)}, }, expected: txCount / 2, }, @@ -305,7 +306,8 @@ func TestApplyFiltersWithLargeData(t *testing.T) { f.UpdateWith(tx) } - filtered := f.Apply() + changes := f.GetChanges() + filtered := changes.([]types.TxResult) require.Len( t, filtered, tt.expected, fmt.Sprintf( diff --git a/serve/filters/manager.go b/serve/filters/manager.go index bc8a542e..475f32fe 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -63,8 +63,8 @@ func (f *Manager) NewBlockFilter() string { } // NewTransactionFilter creates a new Transaction filter, and returns the corresponding ID -func (f *Manager) NewTxFilter() string { - txFilter := filter.NewTxFilter(filter.Options{}) +func (f *Manager) NewTxFilter(options filter.TxFilterOption) string { + txFilter := filter.NewTxFilter(options) return f.filters.newFilter(txFilter) } diff --git a/serve/handlers/subs/subs.go b/serve/handlers/subs/subs.go index f1f72ee6..e36e49f5 100644 --- a/serve/handlers/subs/subs.go +++ b/serve/handlers/subs/subs.go @@ -6,6 +6,7 @@ import ( "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/filters" + "github.com/gnolang/tx-indexer/serve/filters/filter" "github.com/gnolang/tx-indexer/serve/filters/subscription" "github.com/gnolang/tx-indexer/serve/metadata" "github.com/gnolang/tx-indexer/serve/spec" @@ -50,15 +51,22 @@ func (h *Handler) NewTransactionFilterHandler( params []any, ) (any, *spec.BaseJSONError) { // Check the params - if len(params) != 0 { + if len(params) < 1 { return nil, spec.GenerateInvalidParamCountError() } - return h.newTxFilter(), nil + var options filter.TxFilterOption + + err := spec.ParseObjectParameter(params[0], &options) + if err != nil { + return nil, spec.GenerateInvalidParamError(1) + } + + return h.newTxFilter(options), nil } -func (h *Handler) newTxFilter() string { - return h.filterManager.NewTxFilter() +func (h *Handler) newTxFilter(options filter.TxFilterOption) string { + return h.filterManager.NewTxFilter(options) } // UninstallFilterHandler uninstalls a filter with given id diff --git a/serve/spec/spec.go b/serve/spec/spec.go index 2503d485..4af3eea0 100644 --- a/serve/spec/spec.go +++ b/serve/spec/spec.go @@ -1,6 +1,9 @@ package spec -import "fmt" +import ( + "encoding/json" + "fmt" +) const ( JSONRPCVersion = "2.0" @@ -151,3 +154,17 @@ func GenerateInvalidParamCountError() *BaseJSONError { InvalidParamsErrorCode, ) } + +func ParseObjectParameter[T any](param any, data *T) error { + marshaled, err := json.Marshal(param) + if err != nil { + return err + } + + err = json.Unmarshal(marshaled, data) + if err != nil { + return err + } + + return nil +} From 002aa94ab342a16e0bd6319b7382466b1c8f016a Mon Sep 17 00:00:00 2001 From: jinoosss Date: Mon, 22 Apr 2024 13:44:14 +0900 Subject: [PATCH 07/10] fix: Remove reflect types --- serve/filters/filter/base.go | 2 +- serve/filters/filter/block.go | 25 ++++++++++++++++--------- serve/filters/filter/block_test.go | 5 +---- serve/filters/filter/tx.go | 10 ++++++---- serve/filters/filter/tx_test.go | 10 ++++------ serve/filters/types.go | 2 +- serve/handlers/subs/subs.go | 29 +---------------------------- 7 files changed, 30 insertions(+), 53 deletions(-) diff --git a/serve/filters/filter/base.go b/serve/filters/filter/base.go index b98cb27d..37a9d98d 100644 --- a/serve/filters/filter/base.go +++ b/serve/filters/filter/base.go @@ -39,6 +39,6 @@ func (b *baseFilter) UpdateLastUsed() { b.lastUsed = time.Now() } -func (b *baseFilter) GetChanges() any { +func (b *baseFilter) GetChanges() []any { return nil } diff --git a/serve/filters/filter/block.go b/serve/filters/filter/block.go index c3a2cb98..4fccd8f7 100644 --- a/serve/filters/filter/block.go +++ b/serve/filters/filter/block.go @@ -20,13 +20,26 @@ func NewBlockFilter() *BlockFilter { } // GetChanges returns all new block headers from the last query -func (b *BlockFilter) GetChanges() any { +func (b *BlockFilter) GetChanges() []any { + return b.getBlockChanges() +} + +func (b *BlockFilter) UpdateWith(data any) { + if block, ok := data.(*types.Block); ok { + b.updateWithBlock(block) + } +} + +// getBlockChanges returns all new block headers from the last query +func (b *BlockFilter) getBlockChanges() []any { b.Lock() defer b.Unlock() // Get hashes - hashes := make([]types.Header, len(b.blockHeaders)) - copy(hashes, b.blockHeaders) + hashes := make([]any, len(b.blockHeaders)) + for index, blockHeader := range b.blockHeaders { + hashes[index] = blockHeader + } // Empty headers b.blockHeaders = b.blockHeaders[:0] @@ -34,12 +47,6 @@ func (b *BlockFilter) GetChanges() any { return hashes } -func (b *BlockFilter) UpdateWith(data any) { - if block, ok := data.(*types.Block); ok { - b.updateWithBlock(block) - } -} - func (b *BlockFilter) updateWithBlock(block *types.Block) { b.Lock() defer b.Unlock() diff --git a/serve/filters/filter/block_test.go b/serve/filters/filter/block_test.go index f7c7f72a..0e4cc1d3 100644 --- a/serve/filters/filter/block_test.go +++ b/serve/filters/filter/block_test.go @@ -44,10 +44,7 @@ func TestBlockFilter_GetChanges(t *testing.T) { } // Get changes - changesRaw := f.GetChanges() - - changes, ok := changesRaw.([]types.Header) - require.True(t, ok) + changes := f.GetChanges() // Make sure the headers match require.Len(t, changes, len(blocks)) diff --git a/serve/filters/filter/tx.go b/serve/filters/filter/tx.go index 40c6960f..806354d5 100644 --- a/serve/filters/filter/tx.go +++ b/serve/filters/filter/tx.go @@ -35,7 +35,7 @@ func NewTxFilter(opts TxFilterOption) *TxFilter { } // GetChanges returns all new transactions from the last query -func (tf *TxFilter) GetChanges() any { +func (tf *TxFilter) GetChanges() []any { return tf.getTxChanges() } @@ -108,13 +108,15 @@ func filteredByRangeFilterOption(value int64, rangeFilterOption *RangeFilterOpti } // getTxChanges returns all new transactions from the last query -func (tf *TxFilter) getTxChanges() []types.TxResult { +func (tf *TxFilter) getTxChanges() []any { tf.Lock() defer tf.Unlock() // Get newTxs - newTxs := make([]types.TxResult, len(tf.txs)) - copy(newTxs, tf.txs) + newTxs := make([]any, len(tf.txs)) + for index, tx := range tf.txs { + newTxs[index] = tx + } // Empty headers tf.txs = tf.txs[:0] diff --git a/serve/filters/filter/tx_test.go b/serve/filters/filter/tx_test.go index d34feac6..168eb806 100644 --- a/serve/filters/filter/tx_test.go +++ b/serve/filters/filter/tx_test.go @@ -232,16 +232,15 @@ func TestApplyFilters(t *testing.T) { } changes := f.GetChanges() - filtered := changes.([]types.TxResult) require.Len( - t, filtered, len(tt.expected), + t, changes, len(tt.expected), fmt.Sprintf( "There should be one transaction after applying filters: %v", len(tt.expected), ), ) - for i, tx := range filtered { + for i, tx := range changes { assert.Equal( t, *tt.expected[i], tx, fmt.Sprintf( @@ -307,12 +306,11 @@ func TestApplyFiltersWithLargeData(t *testing.T) { } changes := f.GetChanges() - filtered := changes.([]types.TxResult) require.Len( - t, filtered, tt.expected, + t, changes, tt.expected, fmt.Sprintf( "There should be %d transactions after applying filters. got %d", - tt.expected, len(filtered), + tt.expected, len(changes), ), ) }) diff --git a/serve/filters/types.go b/serve/filters/types.go index 88f01fac..6bdca007 100644 --- a/serve/filters/types.go +++ b/serve/filters/types.go @@ -31,7 +31,7 @@ type Filter interface { UpdateLastUsed() // GetChanges returns any filter changes (specific to the filter type) - GetChanges() any + GetChanges() []any // UpdateWith updates the specific filter type with a event's data UpdateWith(data any) diff --git a/serve/handlers/subs/subs.go b/serve/handlers/subs/subs.go index e36e49f5..3f5acbe0 100644 --- a/serve/handlers/subs/subs.go +++ b/serve/handlers/subs/subs.go @@ -2,7 +2,6 @@ package subs import ( "fmt" - "reflect" "github.com/gnolang/tx-indexer/serve/encode" "github.com/gnolang/tx-indexer/serve/filters" @@ -192,10 +191,7 @@ func (h *Handler) GetFilterChangesHandler(_ *metadata.Metadata, params []any) (a } // Handle filter changes - changes, err := h.getFilterChanges(f) - if err != nil { - return nil, spec.GenerateResponseError(err) - } + changes := f.GetChanges() results := make([]string, len(changes)) @@ -210,26 +206,3 @@ func (h *Handler) GetFilterChangesHandler(_ *metadata.Metadata, params []any) (a return results, nil } - -func (h *Handler) getFilterChanges(filter filters.Filter) ([]any, error) { - // Get updates - changes := filter.GetChanges() - value := reflect.ValueOf(changes) - - if value.Kind() == reflect.Ptr { - value = value.Elem() - } - - if value.Kind() != reflect.Slice { - return nil, fmt.Errorf("forEachValue: expected slice type, found %q", value.Kind().String()) - } - - results := make([]any, value.Len()) - - for i := 0; i < value.Len(); i++ { - val := value.Index(i).Interface() - results[i] = val - } - - return results, nil -} From 7dda6f7ba11db407dca89b1351ab8de2fbb91798 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Mon, 22 Apr 2024 13:45:54 +0900 Subject: [PATCH 08/10] fix: Change transaction event handling --- fetch/fetch.go | 7 ---- fetch/fetch_test.go | 77 ++++++++++++++--------------------- serve/filters/manager.go | 24 ++++------- serve/filters/manager_test.go | 12 +++--- types/events.go | 15 +------ 5 files changed, 47 insertions(+), 88 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index bfda87fc..16024f78 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -219,13 +219,6 @@ func (f *Fetcher) FetchChainData(ctx context.Context) error { } f.events.SignalEvent(event) - - for _, txResult := range txResults { - event := &types.NewTransaction{ - TxResult: txResult, - } - f.events.SignalEvent(event) - } } f.logger.Info( diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 8a60dbcf..0bfff137 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -202,25 +202,23 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { require.Len(t, capturedEvents, len(blocks)-1) for index, event := range capturedEvents { - switch event.GetType() { - case indexerTypes.NewBlockEvent: - eventData, ok := event.(*indexerTypes.NewBlock) - require.True(t, ok) + if event.GetType() != indexerTypes.NewBlockEvent { + continue + } - // Make sure the block is valid - assert.Equal(t, blocks[index+1], eventData.Block) + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) - // Make sure the transaction results are valid - require.Len(t, eventData.Results, txCount) + // Make sure the block is valid + assert.Equal(t, blocks[index+1], eventData.Block) - for txIndex, tx := range eventData.Results { - assert.EqualValues(t, blocks[index+1].Height, tx.Height) - assert.EqualValues(t, txIndex, tx.Index) - assert.Equal(t, serializedTxs[txIndex], tx.Tx) - } - case indexerTypes.NewTransactionsEvent: - _, ok := event.(*indexerTypes.NewTransaction) - require.True(t, ok) + // Make sure the transaction results are valid + require.Len(t, eventData.Results, txCount) + + for txIndex, tx := range eventData.Results { + assert.EqualValues(t, blocks[index+1].Height, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) } } }) @@ -243,14 +241,10 @@ func TestFetcher_FetchTransactions_Valid_FullBlocks(t *testing.T) { mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - switch e.GetType() { - case indexerTypes.NewBlockEvent: + if e.GetType() == indexerTypes.NewBlockEvent { _, ok := e.(*indexerTypes.NewBlock) require.True(t, ok) capturedEvents = append(capturedEvents, e) - case indexerTypes.NewTransactionsEvent: - _, ok := e.(*indexerTypes.NewTransaction) - require.True(t, ok) } }, } @@ -429,8 +423,8 @@ func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) { mockEvents = &mockEvents{ signalEventFn: func(e events.Event) { - if e.GetType() == indexerTypes.NewTransactionsEvent { - _, ok := e.(*indexerTypes.NewTransaction) + if e.GetType() == indexerTypes.NewBlockEvent { + _, ok := e.(*indexerTypes.NewBlock) require.True(t, ok) capturedEvents = append(capturedEvents, e) } @@ -555,36 +549,27 @@ func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) { // Make sure proper events were emitted // Blocks each have as many transactions as txCount. - txEventCount := (len(blocks) - 1) * txCount + txEventCount := (len(blocks) - 1) require.Len(t, capturedEvents, txEventCount) for index, event := range capturedEvents { - switch event.GetType() { - case indexerTypes.NewBlockEvent: - eventData, ok := event.(*indexerTypes.NewBlock) - require.True(t, ok) - - // Make sure the block is valid - assert.Equal(t, blocks[index+1], eventData.Block) + if event.GetType() != indexerTypes.NewBlockEvent { + continue + } - // Make sure the transaction results are valid - require.Len(t, eventData.Results, txCount) + eventData, ok := event.(*indexerTypes.NewBlock) + require.True(t, ok) - for txIndex, tx := range eventData.Results { - assert.EqualValues(t, blocks[index+1].Height, tx.Height) - assert.EqualValues(t, txIndex, tx.Index) - assert.Equal(t, serializedTxs[txIndex], tx.Tx) - } - case indexerTypes.NewTransactionsEvent: - eventData, ok := event.(*indexerTypes.NewTransaction) - require.True(t, ok) + // Make sure the block is valid + assert.Equal(t, blocks[index+1], eventData.Block) - blockIndex := index / txCount - txIndex := index % txCount + // Make sure the transaction results are valid + require.Len(t, eventData.Results, txCount) - // Make sure the tx is valid - assert.Equal(t, blocks[blockIndex+1].Txs[txIndex], eventData.TxResult.Tx) - assert.Equal(t, blocks[blockIndex+1].Height, eventData.TxResult.Height) + for txIndex, tx := range eventData.Results { + assert.EqualValues(t, blocks[index+1].Height, tx.Height) + assert.EqualValues(t, txIndex, tx.Index) + assert.Equal(t, serializedTxs[txIndex], tx.Tx) } } }) diff --git a/serve/filters/manager.go b/serve/filters/manager.go index 475f32fe..7335aa77 100644 --- a/serve/filters/manager.go +++ b/serve/filters/manager.go @@ -98,7 +98,7 @@ func (f *Manager) UninstallSubscription(id string) bool { // subscribeToEvents subscribes to new events func (f *Manager) subscribeToEvents() { - subscription := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent, commonTypes.NewTransactionsEvent}) + subscription := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent}) defer f.events.CancelSubscription(subscription.ID) for { @@ -110,8 +110,7 @@ func (f *Manager) subscribeToEvents() { return } - switch event.GetType() { - case commonTypes.NewBlockEvent: + if event.GetType() == commonTypes.NewBlockEvent { // The following code segments // cannot be executed in parallel (go routines) // because data sequencing should be persisted @@ -122,20 +121,15 @@ func (f *Manager) subscribeToEvents() { f.updateFiltersWithBlock(newBlock.Block) // Send events to all `newHeads` subscriptions - f.subscriptions.sendEvent(commonTypes.NewBlockEvent, newBlock.Block) - } - case commonTypes.NewTransactionsEvent: - if !more { - return - } + f.subscriptions.sendEvent(filterSubscription.NewHeadsEvent, newBlock.Block) - newTransaction, ok := event.(*commonTypes.NewTransaction) - if ok { - // Apply transaction to filters - f.updateFiltersWithTxResult(newTransaction.TxResult) + for _, txResult := range newBlock.Results { + // Apply transaction to filters + f.updateFiltersWithTxResult(txResult) - // Send events to all `newTransactions` subscriptions - f.subscriptions.sendEvent(commonTypes.NewTransactionsEvent, newTransaction.TxResult) + // Send events to all `newHeads` subscriptions + f.subscriptions.sendEvent(filterSubscription.NewTransactionsEvent, txResult) + } } } } diff --git a/serve/filters/manager_test.go b/serve/filters/manager_test.go index 7e8314d6..1b39b52f 100644 --- a/serve/filters/manager_test.go +++ b/serve/filters/manager_test.go @@ -132,16 +132,16 @@ func Test_NewBlockEvents(t *testing.T) { require.Nil(t, err) // Get changes - blockHeadersRaw := blockFilter.GetChanges() + changes := blockFilter.GetChanges() - blockHeaders, ok := blockHeadersRaw.([]tm2Types.Header) - require.True(t, ok) - - if len(blockHeaders) == 0 { + if len(changes) == 0 { continue } - capturedHeaders = blockHeaders + capturedHeaders = make([]tm2Types.Header, len(changes)) + for index, header := range changes { + capturedHeaders[index] = header.(tm2Types.Header) + } return } diff --git a/types/events.go b/types/events.go index 8ed13710..e06f9109 100644 --- a/types/events.go +++ b/types/events.go @@ -7,8 +7,7 @@ import ( // NewBlockEvent is the event for when new blocks appear var ( - NewBlockEvent events.Type = "newHeads" - NewTransactionsEvent events.Type = "newTransactions" + NewBlockEvent events.Type = "newHeads" ) type NewBlock struct { @@ -23,15 +22,3 @@ func (n *NewBlock) GetType() events.Type { func (n *NewBlock) GetData() any { return n } - -type NewTransaction struct { - TxResult *types.TxResult -} - -func (n *NewTransaction) GetType() events.Type { - return NewTransactionsEvent -} - -func (n *NewTransaction) GetData() any { - return n -} From f82a2dfb3bb87998d62664c89270b79e9647c063 Mon Sep 17 00:00:00 2001 From: jinoosss Date: Thu, 25 Apr 2024 10:57:44 +0900 Subject: [PATCH 09/10] fix: Change exception conditions when getting tx --- serve/handlers/tx/tx.go | 2 +- serve/handlers/tx/tx_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/serve/handlers/tx/tx.go b/serve/handlers/tx/tx.go index 34b5acd4..4338d6d4 100644 --- a/serve/handlers/tx/tx.go +++ b/serve/handlers/tx/tx.go @@ -28,7 +28,7 @@ func (h *Handler) GetTxHandler( params []any, ) (any, *spec.BaseJSONError) { // Check the params - if len(params) != 2 { + if len(params) < 2 { return nil, spec.GenerateInvalidParamCountError() } diff --git a/serve/handlers/tx/tx_test.go b/serve/handlers/tx/tx_test.go index be723028..56bb26d7 100644 --- a/serve/handlers/tx/tx_test.go +++ b/serve/handlers/tx/tx_test.go @@ -23,7 +23,7 @@ func TestGetTx_InvalidParams(t *testing.T) { }{ { "invalid param length", - []any{1, 2, 3}, + []any{1}, }, { "invalid param type", From 7b291710fada80231663029f688d081fd87d8aea Mon Sep 17 00:00:00 2001 From: jinoosss Date: Tue, 30 Apr 2024 16:02:20 +0900 Subject: [PATCH 10/10] fix: Fix a lint error --- fetch/fetch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 460e3f46..b910aba4 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -468,7 +468,7 @@ func TestFetcher_FetchTransactions_Valid_FullTransactions(t *testing.T) { mockClient = &mockClient{ createBatchFn: func() clientTypes.Batch { return &mockBatch{ - executeFn: func() ([]any, error) { + executeFn: func(_ context.Context) ([]any, error) { // Force an error return nil, errors.New("something is flaky") },