diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 431305c92e..db32f2d033 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -137,11 +137,11 @@ func (d *Dispatcher) handleSubscribe(req Request, conn wsConn) (string, Error) { if subscribeMethod == "newHeads" { filterID = d.filterManager.NewBlockFilter(conn) } else if subscribeMethod == "logs" { - logFilter, err := decodeLogFilterFromInterface(params[1]) + logQuery, err := decodeLogQueryFromInterface(params[1]) if err != nil { return "", NewInternalError(err.Error()) } - filterID = d.filterManager.NewLogFilter(logFilter, conn) + filterID = d.filterManager.NewLogFilter(logQuery, conn) } else { return "", NewSubscriptionNotFoundError(subscribeMethod) } @@ -178,8 +178,7 @@ func (d *Dispatcher) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) { if req.Method == "eth_subscribe" { filterID, err := d.handleSubscribe(req, conn) if err != nil { - //nolint - NewRPCResponse(req.ID, "2.0", nil, err).Bytes() + return NewRPCResponse(req.ID, "2.0", nil, err).Bytes() } resp, err := formatFilterResponse(req.ID, filterID) @@ -209,6 +208,7 @@ func (d *Dispatcher) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) { return []byte(resp), nil } + // its a normal query that we handle with the dispatcher resp, err := d.handleReq(req) if err != nil { diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index 5d179df1cf..0172ecdc4d 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -187,7 +187,7 @@ func (m *mockService) BlockPtr(a string, f *BlockNumber) (interface{}, error) { return nil, nil } -func (m *mockService) Filter(f LogFilter) (interface{}, error) { +func (m *mockService) Filter(f LogQuery) (interface{}, error) { m.msgCh <- f return nil, nil @@ -249,9 +249,10 @@ func TestDispatcherFuncDecode(t *testing.T) { { "filter", `[{"fromBlock": "pending", "toBlock": "earliest"}]`, - LogFilter{fromBlock: PendingBlockNumber, toBlock: EarliestBlockNumber}, + LogQuery{fromBlock: PendingBlockNumber, toBlock: EarliestBlockNumber}, }, } + for _, c := range cases { res := handleReq(c.typ, c.msg) if !reflect.DeepEqual(res, c.res) { diff --git a/jsonrpc/eth_blockchain_test.go b/jsonrpc/eth_blockchain_test.go index a32f585227..1db17d939c 100644 --- a/jsonrpc/eth_blockchain_test.go +++ b/jsonrpc/eth_blockchain_test.go @@ -111,36 +111,36 @@ func TestEth_Block_GetLogs(t *testing.T) { testTable := []struct { name string - filterOptions *LogFilter + query *LogQuery shouldFail bool expectedLength int }{ {"Found matching logs, fromBlock < toBlock", - &LogFilter{ + &LogQuery{ fromBlock: 1, toBlock: 3, Topics: topics, }, false, 3}, {"Found matching logs, fromBlock == toBlock", - &LogFilter{ + &LogQuery{ fromBlock: 2, toBlock: 2, Topics: topics, }, false, 1}, {"Found matching logs, BlockHash present", - &LogFilter{ + &LogQuery{ BlockHash: &blockHash, Topics: topics, }, false, 1}, - {"No logs found", &LogFilter{ + {"No logs found", &LogQuery{ fromBlock: 4, toBlock: 5, Topics: topics, }, false, 0}, - {"Invalid block range", &LogFilter{ + {"Invalid block range", &LogQuery{ fromBlock: 10, toBlock: 5, Topics: topics, @@ -176,7 +176,7 @@ func TestEth_Block_GetLogs(t *testing.T) { for _, testCase := range testTable { t.Run(testCase.name, func(t *testing.T) { - foundLogs, logError := eth.GetLogs(testCase.filterOptions) + foundLogs, logError := eth.GetLogs(testCase.query) if logError != nil && !testCase.shouldFail { // If there is an error and test isn't expected to fail diff --git a/jsonrpc/eth_endpoint.go b/jsonrpc/eth_endpoint.go index 75de7953f2..4d74df443d 100644 --- a/jsonrpc/eth_endpoint.go +++ b/jsonrpc/eth_endpoint.go @@ -665,7 +665,7 @@ func (e *Eth) EstimateGas(arg *txnArgs, rawNum *BlockNumber) (interface{}, error } // GetLogs returns an array of logs matching the filter options -func (e *Eth) GetLogs(filterOptions *LogFilter) (interface{}, error) { +func (e *Eth) GetLogs(query *LogQuery) (interface{}, error) { result := make([]*Log, 0) parseReceipts := func(block *types.Block) error { receipts, err := e.store.GetReceiptsByHash(block.Header.Hash) @@ -675,7 +675,7 @@ func (e *Eth) GetLogs(filterOptions *LogFilter) (interface{}, error) { for indx, receipt := range receipts { for logIndx, log := range receipt.Logs { - if filterOptions.Match(log) { + if query.Match(log) { result = append(result, &Log{ Address: log.Address, Topics: log.Topics, @@ -693,8 +693,8 @@ func (e *Eth) GetLogs(filterOptions *LogFilter) (interface{}, error) { return nil } - if filterOptions.BlockHash != nil { - block, ok := e.store.GetBlockByHash(*filterOptions.BlockHash, true) + if query.BlockHash != nil { + block, ok := e.store.GetBlockByHash(*query.BlockHash, true) if !ok { return nil, fmt.Errorf("not found") } @@ -729,8 +729,8 @@ func (e *Eth) GetLogs(filterOptions *LogFilter) (interface{}, error) { return uint64(num) } - from := resolveNum(filterOptions.fromBlock) - to := resolveNum(filterOptions.toBlock) + from := resolveNum(query.fromBlock) + to := resolveNum(query.toBlock) if to < from { return nil, fmt.Errorf("incorrect range") @@ -858,7 +858,7 @@ func (e *Eth) GetCode(address types.Address, filter BlockNumberOrHash) (interfac } // NewFilter creates a filter object, based on filter options, to notify when the state changes (logs). -func (e *Eth) NewFilter(filter *LogFilter) (interface{}, error) { +func (e *Eth) NewFilter(filter *LogQuery) (interface{}, error) { return e.filterManager.NewLogFilter(filter, nil), nil } diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 07eca5c0f6..fda97a77ce 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -3,6 +3,7 @@ package jsonrpc import ( "container/heap" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -15,57 +16,69 @@ import ( "github.com/hashicorp/go-hclog" ) -type Filter struct { - id string +var ( + ErrFilterDoesNotExists = errors.New("filter does not exists") + ErrWSFilterDoesNotSupportGetChanges = errors.New("web socket Filter doesn't support to return a batch of the changes") +) - // block filter - block *headElem +// defaultTimeout is the timeout to remove the filters that don't have a web socket stream +var defaultTimeout = 1 * time.Minute - // log cache - logs []*Log +const ( + // The index in heap which is indicating the element is not in the heap + NoIndexInHeap = -1 +) - // log filter - logFilter *LogFilter +// filter is an interface that BlockFilter and LogFilter implement +type filter interface { + // isWS returns the flag indicating the filter has web socket stream + isWS() bool - // index of the filter in the timer array - index int + // getFilterBase returns filterBase that has common fields + getFilterBase() *filterBase - // next time to timeout - timestamp time.Time + // getUpdates returns stored data in string + getUpdates() (string, error) - // websocket connection - ws wsConn + // sendUpdates write stored data to web socket stream + sendUpdates() error } -func (f *Filter) getFilterUpdates() (string, error) { - if f.isBlockFilter() { - // block filter - headers, newHead := f.block.getUpdates() - f.block = newHead +// filterBase is a struct for common fields between BlockFilter and LogFilter +type filterBase struct { + // UUID, a key of filter for client + id string - updates := []string{} - for _, header := range headers { - updates = append(updates, header.Hash.String()) - } + // index in the timeouts heap, -1 for non-existing index + heapIndex int - return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil - } - // log filter - res, err := json.Marshal(f.logs) - if err != nil { - return "", err - } + // timestamp to be expired + expiredAt time.Time - f.logs = []*Log{} + // websocket connection + ws wsConn +} - return string(res), nil +// newFilterBase initializes filterBase with unique ID +func newFilterBase(ws wsConn) filterBase { + return filterBase{ + id: uuid.New().String(), + ws: ws, + heapIndex: NoIndexInHeap, + } } -func (f *Filter) isWS() bool { +// getFilterBase returns its own reference so that child struct can return base +func (f *filterBase) getFilterBase() *filterBase { + return f +} + +// isWS returns the flag indicating this filter has websocket connection +func (f *filterBase) isWS() bool { return f.ws != nil } -var ethSubscriptionTemplate = `{ +const ethSubscriptionTemplate = `{ "jsonrpc": "2.0", "method": "eth_subscription", "params": { @@ -74,7 +87,8 @@ var ethSubscriptionTemplate = `{ } }` -func (f *Filter) sendMessage(msg string) error { +// writeMessageToWs sends given message to websocket stream +func (f *filterBase) writeMessageToWs(msg string) error { res := fmt.Sprintf(ethSubscriptionTemplate, f.id, msg) if err := f.ws.WriteMessage(websocket.TextMessage, []byte(res)); err != nil { return err @@ -83,48 +97,110 @@ func (f *Filter) sendMessage(msg string) error { return nil } -func (f *Filter) flush() error { - if f.isBlockFilter() { - // send each block independently - updates, newHead := f.block.getUpdates() - f.block = newHead +// blockFilter is a filter to store the updates of block +type blockFilter struct { + filterBase + sync.Mutex + block *headElem +} - for _, block := range updates { - raw, err := json.Marshal(block) - if err != nil { - return err - } +// takeBlockUpdates advances blocks from head to latest and returns header array +func (f *blockFilter) takeBlockUpdates() []*types.Header { + updates, newHead := f.block.getUpdates() - if err := f.sendMessage(string(raw)); err != nil { - return err - } + f.Lock() + f.block = newHead + f.Unlock() + + return updates +} + +// getUpdates returns updates of blocks in string +func (f *blockFilter) getUpdates() (string, error) { + headers := f.takeBlockUpdates() + + updates := []string{} + for _, header := range headers { + updates = append(updates, header.Hash.String()) + } + + return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil +} + +// sendUpdates writes the updates of blocks to web socket stream +func (f *blockFilter) sendUpdates() error { + updates := f.takeBlockUpdates() + + for _, block := range updates { + raw, err := json.Marshal(block) + if err != nil { + return err } - } else { - // log filter - for _, log := range f.logs { - res, err := json.Marshal(log) - if err != nil { - return err - } - if err := f.sendMessage(string(res)); err != nil { - return err - } + + if err := f.writeMessageToWs(string(raw)); err != nil { + return err } - f.logs = []*Log{} } return nil } -func (f *Filter) isLogFilter() bool { - return f.logFilter != nil +// logFilter is a filter to store logs that meet the conditions in query +type logFilter struct { + filterBase + sync.Mutex + query *LogQuery + logs []*Log } -func (f *Filter) isBlockFilter() bool { - return f.block != nil +// appendLog appends new log to logs +func (f *logFilter) appendLog(log *Log) { + f.Lock() + defer f.Unlock() + + f.logs = append(f.logs, log) } -var defaultTimeout = 1 * time.Minute +// takeLogUpdates returns all saved logs in filter and set new log slice +func (f *logFilter) takeLogUpdates() []*Log { + f.Lock() + defer f.Unlock() + + logs := f.logs + f.logs = []*Log{} // create brand new slice so that prevent new logs from being added to current logs + + return logs +} + +// getUpdates returns stored logs in string +func (f *logFilter) getUpdates() (string, error) { + logs := f.takeLogUpdates() + + res, err := json.Marshal(logs) + if err != nil { + return "", err + } + + return string(res), nil +} + +// sendUpdates writes stored logs to web socket stream +func (f *logFilter) sendUpdates() error { + logs := f.takeLogUpdates() + + for _, log := range logs { + res, err := json.Marshal(log) + if err != nil { + return err + } + + if err := f.writeMessageToWs(string(res)); err != nil { + return err + } + } + + return nil +} // filterManagerStore provides methods required by FilterManager type filterManagerStore interface { @@ -138,34 +214,35 @@ type filterManagerStore interface { GetReceiptsByHash(hash types.Hash) ([]*types.Receipt, error) } +// FilterManager manages all running filters type FilterManager struct { logger hclog.Logger - store filterManagerStore - closeCh chan struct{} + timeout time.Duration + store filterManagerStore subscription blockchain.Subscription + blockStream *blockStream - filters map[string]*Filter - lock sync.Mutex + lock sync.RWMutex + filters map[string]filter + timeouts timeHeapImpl updateCh chan struct{} - timer timeHeapImpl - timeout time.Duration - - blockStream *blockStream + closeCh chan struct{} } func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterManager { m := &FilterManager{ logger: logger.Named("filter"), + timeout: defaultTimeout, store: store, - closeCh: make(chan struct{}), - filters: map[string]*Filter{}, - updateCh: make(chan struct{}), - timer: timeHeapImpl{}, blockStream: &blockStream{}, - timeout: defaultTimeout, + lock: sync.RWMutex{}, + filters: make(map[string]filter), + timeouts: timeHeapImpl{}, + updateCh: make(chan struct{}), + closeCh: make(chan struct{}), } // start blockstream with the current header @@ -178,6 +255,7 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterMana return m } +// Run starts worker process to handle events func (f *FilterManager) Run() { // watch for new events in the blockchain watchCh := make(chan *blockchain.Event) @@ -196,11 +274,11 @@ func (f *FilterManager) Run() { for { // check for the next filter to be removed - filter := f.nextTimeoutFilter() + filterBase := f.nextTimeoutFilter() - // uninstall filters only - if filter != nil && !filter.isWS() { - timeoutCh = time.After(time.Until(filter.timestamp)) + // set timer to remove filter + if filterBase != nil { + timeoutCh = time.After(time.Until(filterBase.expiredAt)) } select { @@ -212,12 +290,12 @@ func (f *FilterManager) Run() { case <-timeoutCh: // timeout for filter - if !f.Uninstall(filter.id) { - f.logger.Error("failed to uninstall filter", "id", filter.id) + if !f.Uninstall(filterBase.id) { + f.logger.Error("failed to uninstall filter", "id", filterBase.id) } case <-f.updateCh: - // there is a new filter, reset the loop to start the timeout timer + // filters change, reset the loop to start the timeout timer case <-f.closeCh: // stop the filter manager @@ -226,113 +304,58 @@ func (f *FilterManager) Run() { } } -func (f *FilterManager) nextTimeoutFilter() *Filter { - f.lock.Lock() - if len(f.filters) == 0 { - f.lock.Unlock() - - return nil - } - - // pop the first item - item := f.timer[0] - f.lock.Unlock() - - return item +// Close closed closeCh so that terminate worker +func (f *FilterManager) Close() { + close(f.closeCh) } -func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { - f.lock.Lock() - defer f.lock.Unlock() - - // first include all the new headers in the blockstream for the block filters - for _, header := range evnt.NewChain { - f.blockStream.push(header) - } - - processBlock := func(h *types.Header, removed bool) error { - // get the logs from the transaction - receipts, err := f.store.GetReceiptsByHash(h.Hash) - if err != nil { - return err - } - - for indx, receipt := range receipts { - // check the logs with the filters - for _, log := range receipt.Logs { - for _, f := range f.filters { - if f.isLogFilter() { - if f.logFilter.Match(log) { - nn := &Log{ - Address: log.Address, - Topics: log.Topics, - Data: argBytes(log.Data), - BlockNumber: argUint64(h.Number), - BlockHash: h.Hash, - TxHash: receipt.TxHash, - TxIndex: argUint64(indx), - Removed: removed, - } - f.logs = append(f.logs, nn) - } - } - } - } - } - - return nil +// NewBlockFilter adds new BlockFilter +func (f *FilterManager) NewBlockFilter(ws wsConn) string { + filter := &blockFilter{ + filterBase: newFilterBase(ws), + block: f.blockStream.Head(), } - // process old chain - for _, i := range evnt.OldChain { - if processErr := processBlock(i, true); processErr != nil { - f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) - } - } - // process new chain - for _, i := range evnt.NewChain { - if processErr := processBlock(i, false); processErr != nil { - f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) - } - } + return f.addFilter(filter) +} - // flush all the websocket values - for _, filter := range f.filters { - if filter.isWS() { - if flushErr := filter.flush(); flushErr != nil { - f.logger.Error(fmt.Sprintf("Unable to process flush, %v", flushErr)) - } - } +// NewLogFilter adds new LogFilter +func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { + filter := &logFilter{ + filterBase: newFilterBase(ws), + query: logQuery, } - return nil + return f.addFilter(filter) } +// Exists checks the filter with given ID exists func (f *FilterManager) Exists(id string) bool { - f.lock.Lock() + f.lock.RLock() + defer f.lock.RUnlock() + _, ok := f.filters[id] - f.lock.Unlock() return ok } -var errFilterDoesNotExists = fmt.Errorf("filter does not exists") - +// GetFilterChanges returns the updates of the filter with given ID in string func (f *FilterManager) GetFilterChanges(id string) (string, error) { - f.lock.Lock() - defer f.lock.Unlock() + f.lock.RLock() + defer f.lock.RUnlock() + + filter, ok := f.filters[id] - item, ok := f.filters[id] if !ok { - return "", errFilterDoesNotExists + return "", ErrFilterDoesNotExists } - if item.isWS() { - // we cannot get updates from a ws filter with getFilterChanges - return "", errFilterDoesNotExists + // we cannot get updates from a ws filter with getFilterChanges + if filter.isWS() { + return "", ErrWSFilterDoesNotSupportGetChanges } - res, err := item.getFilterUpdates() + res, err := filter.getUpdates() if err != nil { return "", err } @@ -340,83 +363,245 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { return res, nil } +// Uninstall removes the filter with given ID from list func (f *FilterManager) Uninstall(id string) bool { f.lock.Lock() + defer f.lock.Unlock() + + return f.removeFilterByID(id) +} - item, ok := f.filters[id] +// removeFilterByID removes the filter with given ID, unsafe against race condition +func (f *FilterManager) removeFilterByID(id string) bool { + filter, ok := f.filters[id] if !ok { return false } delete(f.filters, id) - heap.Remove(&f.timer, item.index) - f.lock.Unlock() + if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { + f.emitSignalToUpdateCh() + } return true } -func (f *FilterManager) NewBlockFilter(ws wsConn) string { - return f.addFilter(nil, ws) +// addFilter is an internal method to add given filter to list and heap +func (f *FilterManager) addFilter(filter filter) string { + f.lock.Lock() + defer f.lock.Unlock() + + base := filter.getFilterBase() + + f.filters[base.id] = filter + + // Set timeout and add to heap if filter doesn't have web socket connection + if !filter.isWS() { + base.expiredAt = time.Now().Add(f.timeout) + f.timeouts.addFilter(base) + f.emitSignalToUpdateCh() + } + + return base.id } -func (f *FilterManager) NewLogFilter(logFilter *LogFilter, ws wsConn) string { - return f.addFilter(logFilter, ws) +func (f *FilterManager) emitSignalToUpdateCh() { + select { + // notify worker of new filter with timeout + case f.updateCh <- struct{}{}: + default: + } } -func (f *FilterManager) addFilter(logFilter *LogFilter, ws wsConn) string { - f.lock.Lock() +// nextTimeoutFilter returns the filter that will be expired next +// nextTimeoutFilter returns the only filter with timeout +func (f *FilterManager) nextTimeoutFilter() *filterBase { + f.lock.RLock() + defer f.lock.RUnlock() + + if len(f.timeouts) == 0 { + return nil + } + + // peek the first item + base := f.timeouts[0] + + return base +} - filter := &Filter{ - id: uuid.New().String(), - ws: ws, +// dispatchEvent is a event handler for new block event +func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { + // store new event in each filters + if err := f.processEvent(evnt); err != nil { + return err } - if logFilter == nil { - // block filter - // take the reference from the stream - filter.block = f.blockStream.Head() - } else { - // log filter - filter.logFilter = logFilter + // send data to web socket stream + if err := f.flushWsFilters(); err != nil { + return err } - f.filters[filter.id] = filter - filter.timestamp = time.Now().Add(f.timeout) - heap.Push(&f.timer, filter) + return nil +} - f.lock.Unlock() +// processEvent makes each filter append the new data that interests them +func (f *FilterManager) processEvent(evnt *blockchain.Event) error { + f.lock.RLock() + defer f.lock.RUnlock() - select { - case f.updateCh <- struct{}{}: - default: + // first include all the new headers in the blockstream for BlockFilter + for _, header := range evnt.NewChain { + f.blockStream.push(header) } - return filter.id + // process old chain to include old logs marked removed for LogFilter + for _, header := range evnt.OldChain { + if processErr := f.appendLogsToFilters(header, true); processErr != nil { + f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) + } + } + + // process new chain to include new logs for LogFilter + for _, header := range evnt.NewChain { + if processErr := f.appendLogsToFilters(header, false); processErr != nil { + f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) + } + } + + return nil } -func (f *FilterManager) Close() { - close(f.closeCh) +// appendLogsToFilters makes each LogFilters append logs in the header +func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) error { + receipts, err := f.store.GetReceiptsByHash(header.Hash) + if err != nil { + return err + } + + // Get logFilters from filters + logFilters := f.getLogFilters() + if len(logFilters) == 0 { + return nil + } + + for indx, receipt := range receipts { + // check the logs with the filters + for _, log := range receipt.Logs { + nn := &Log{ + Address: log.Address, + Topics: log.Topics, + Data: argBytes(log.Data), + BlockNumber: argUint64(header.Number), + BlockHash: header.Hash, + TxHash: receipt.TxHash, + TxIndex: argUint64(indx), + Removed: removed, + } + + for _, f := range logFilters { + if f.query.Match(log) { + f.appendLog(nn) + } + } + } + } + + return nil } -type timeHeapImpl []*Filter +// flushWsFilters make each filters with web socket connection write the updates to web socket stream +// flushWsFilters also removes the filters if flushWsFilters notices the connection is closed +func (f *FilterManager) flushWsFilters() error { + closedFilterIDs := make([]string, 0) + + f.lock.RLock() + + for id, filter := range f.filters { + if !filter.isWS() { + continue + } + + if flushErr := filter.sendUpdates(); flushErr != nil { + // mark as closed if the connection is closed + if errors.Is(flushErr, websocket.ErrCloseSent) { + closedFilterIDs = append(closedFilterIDs, id) + + f.logger.Warn(fmt.Sprintf("Subscription %s has been closed", id)) + + continue + } + + f.logger.Error(fmt.Sprintf("Unable to process flush, %v", flushErr)) + } + } + + f.lock.RUnlock() + + // remove filters with closed web socket connections from FilterManager + if len(closedFilterIDs) > 0 { + f.lock.Lock() + + for _, id := range closedFilterIDs { + f.removeFilterByID(id) + } + + f.lock.Unlock() + f.emitSignalToUpdateCh() + f.logger.Info(fmt.Sprintf("Removed %d filters due to closed connections", len(closedFilterIDs))) + } + + return nil +} + +// getLogFilters returns logFilters +func (f *FilterManager) getLogFilters() []*logFilter { + f.lock.RLock() + defer f.lock.RUnlock() + + logFilters := []*logFilter{} + + for _, f := range f.filters { + if logFilter, ok := f.(*logFilter); ok { + logFilters = append(logFilters, logFilter) + } + } + + return logFilters +} + +type timeHeapImpl []*filterBase + +func (t *timeHeapImpl) addFilter(filter *filterBase) { + heap.Push(t, filter) +} + +func (t *timeHeapImpl) removeFilter(filter *filterBase) bool { + if filter.heapIndex == NoIndexInHeap { + return false + } + + heap.Remove(t, filter.heapIndex) + + return true +} func (t timeHeapImpl) Len() int { return len(t) } func (t timeHeapImpl) Less(i, j int) bool { - return t[i].timestamp.Before(t[j].timestamp) + return t[i].expiredAt.Before(t[j].expiredAt) } func (t timeHeapImpl) Swap(i, j int) { t[i], t[j] = t[j], t[i] - t[i].index = i - t[j].index = j + t[i].heapIndex = i + t[j].heapIndex = j } func (t *timeHeapImpl) Push(x interface{}) { n := len(*t) - item := x.(*Filter) //nolint: forcetypeassert - item.index = n + item := x.(*filterBase) //nolint: forcetypeassert + item.heapIndex = n *t = append(*t, item) } @@ -425,7 +610,7 @@ func (t *timeHeapImpl) Pop() interface{} { n := len(old) item := old[n-1] old[n-1] = nil - item.index = -1 + item.heapIndex = -1 *t = old[0 : n-1] return item @@ -440,14 +625,15 @@ type blockStream struct { func (b *blockStream) Head() *headElem { b.lock.Lock() - head := b.head - b.lock.Unlock() + defer b.lock.Unlock() - return head + return b.head } func (b *blockStream) push(header *types.Header) { b.lock.Lock() + defer b.lock.Unlock() + newHead := &headElem{ header: header.Copy(), } @@ -457,8 +643,6 @@ func (b *blockStream) push(header *types.Header) { } b.head = newHead - - b.lock.Unlock() } type headElem struct { diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 2b7371c7fb..b495ec1801 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -4,7 +4,9 @@ import ( "testing" "time" + "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/types" + "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) @@ -15,7 +17,7 @@ func TestFilterLog(t *testing.T) { m := NewFilterManager(hclog.NewNullLogger(), store) go m.Run() - id := m.addFilter(&LogFilter{ + id := m.NewLogFilter(&LogQuery{ Topics: [][]types.Hash{ {hash1}, }, @@ -74,7 +76,7 @@ func TestFilterBlock(t *testing.T) { go m.Run() // add block filter - id := m.addFilter(nil, nil) + id := m.NewBlockFilter(nil) // emit two events store.emitEvent(&mockEvent{ @@ -137,7 +139,7 @@ func TestFilterTimeout(t *testing.T) { go m.Run() // add block filter - id := m.addFilter(nil, nil) + id := m.NewBlockFilter(nil) assert.True(t, m.Exists(id)) time.Sleep(3 * time.Second) @@ -158,7 +160,7 @@ func TestFilterWebsocket(t *testing.T) { // we cannot call get filter changes for a websocket filter _, err := m.GetFilterChanges(id) - assert.Equal(t, err, errFilterDoesNotExists) + assert.Equal(t, err, ErrFilterDoesNotExists) // emit two events store.emitEvent(&mockEvent{ @@ -209,3 +211,37 @@ func TestHeadStream(t *testing.T) { updates, _ = next.getUpdates() assert.Len(t, updates, 0) } + +type MockClosedWSConnection struct{} + +func (m *MockClosedWSConnection) WriteMessage(_messageType int, _data []byte) error { + return websocket.ErrCloseSent +} + +func TestClosedFilterDeletion(t *testing.T) { + store := newMockStore() + + m := NewFilterManager(hclog.NewNullLogger(), store) + + go m.Run() + + // add block filter + id := m.NewBlockFilter(&MockClosedWSConnection{}) + + assert.True(t, m.Exists(id)) + + // event is sent to the filter but writing to connection should fail + err := m.dispatchEvent(&blockchain.Event{ + NewChain: []*types.Header{ + { + Hash: types.StringToHash("1"), + }, + }, + }) + + // should not return error when the error is websocket.ErrCloseSen because filter is removed instead + assert.NoError(t, err) + + // false because filter was removed automatically + assert.False(t, m.Exists(id)) +} diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 729135ff0d..765bc423c2 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -7,8 +7,8 @@ import ( "github.com/0xPolygon/polygon-edge/types" ) -// LogFilter is a filter for logs -type LogFilter struct { +// LogQuery is a query to filter logs +type LogQuery struct { BlockHash *types.Hash fromBlock BlockNumber @@ -19,9 +19,9 @@ type LogFilter struct { } // addTopicSet adds specific topics to the log filter topics -func (l *LogFilter) addTopicSet(set ...string) error { - if l.Topics == nil { - l.Topics = [][]types.Hash{} +func (q *LogQuery) addTopicSet(set ...string) error { + if q.Topics == nil { + q.Topics = [][]types.Hash{} } res := []types.Hash{} @@ -35,15 +35,15 @@ func (l *LogFilter) addTopicSet(set ...string) error { res = append(res, item) } - l.Topics = append(l.Topics, res) + q.Topics = append(q.Topics, res) return nil } // addAddress Adds the address to the log filter -func (l *LogFilter) addAddress(raw string) error { - if l.Addresses == nil { - l.Addresses = []types.Address{} +func (q *LogQuery) addAddress(raw string) error { + if q.Addresses == nil { + q.Addresses = []types.Address{} } addr := types.Address{} @@ -52,28 +52,28 @@ func (l *LogFilter) addAddress(raw string) error { return err } - l.Addresses = append(l.Addresses, addr) + q.Addresses = append(q.Addresses, addr) return nil } -func decodeLogFilterFromInterface(i interface{}) (*LogFilter, error) { +func decodeLogQueryFromInterface(i interface{}) (*LogQuery, error) { // once the log filter is decoded as map[string]interface we cannot use unmarshal json raw, err := json.Marshal(i) if err != nil { return nil, err } - filter := &LogFilter{} - if err := json.Unmarshal(raw, &filter); err != nil { + query := &LogQuery{} + if err := json.Unmarshal(raw, &query); err != nil { return nil, err } - return filter, nil + return query, nil } // UnmarshalJSON decodes a json object -func (l *LogFilter) UnmarshalJSON(data []byte) error { +func (q *LogQuery) UnmarshalJSON(data []byte) error { var obj struct { BlockHash *types.Hash `json:"blockHash"` FromBlock string `json:"fromBlock"` @@ -88,20 +88,20 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { return err } - l.BlockHash = obj.BlockHash + q.BlockHash = obj.BlockHash if obj.FromBlock == "" { - l.fromBlock = LatestBlockNumber + q.fromBlock = LatestBlockNumber } else { - if l.fromBlock, err = stringToBlockNumber(obj.FromBlock); err != nil { + if q.fromBlock, err = stringToBlockNumber(obj.FromBlock); err != nil { return err } } if obj.ToBlock == "" { - l.toBlock = LatestBlockNumber + q.toBlock = LatestBlockNumber } else { - if l.toBlock, err = stringToBlockNumber(obj.ToBlock); err != nil { + if q.toBlock, err = stringToBlockNumber(obj.ToBlock); err != nil { return err } } @@ -111,7 +111,7 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { switch raw := obj.Address.(type) { case string: // "" - if err := l.addAddress(raw); err != nil { + if err := q.addAddress(raw); err != nil { return err } @@ -119,7 +119,7 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { // ["", ""] for _, addr := range raw { if item, ok := addr.(string); ok { - if err := l.addAddress(item); err != nil { + if err := q.addAddress(item); err != nil { return err } } else { @@ -138,7 +138,7 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { switch raw := item.(type) { case string: // "" - if err := l.addTopicSet(raw); err != nil { + if err := q.addTopicSet(raw); err != nil { return err } @@ -154,13 +154,13 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { } } - if err := l.addTopicSet(res...); err != nil { + if err := q.addTopicSet(res...); err != nil { return err } case nil: // null - if err := l.addTopicSet(); err != nil { + if err := q.addTopicSet(); err != nil { return err } @@ -175,12 +175,12 @@ func (l *LogFilter) UnmarshalJSON(data []byte) error { } // Match returns whether the receipt includes topics for this filter -func (l *LogFilter) Match(log *types.Log) bool { +func (q *LogQuery) Match(log *types.Log) bool { // check addresses - if len(l.Addresses) > 0 { + if len(q.Addresses) > 0 { match := false - for _, addr := range l.Addresses { + for _, addr := range q.Addresses { if addr == log.Address { match = true } @@ -191,11 +191,11 @@ func (l *LogFilter) Match(log *types.Log) bool { } } // check topics - if len(l.Topics) > len(log.Topics) { + if len(q.Topics) > len(log.Topics) { return false } - for i, sub := range l.Topics { + for i, sub := range q.Topics { match := len(sub) == 0 for _, topic := range sub { diff --git a/jsonrpc/query_test.go b/jsonrpc/query_test.go index 1cd79adde5..a108ff651a 100644 --- a/jsonrpc/query_test.go +++ b/jsonrpc/query_test.go @@ -20,11 +20,11 @@ var ( func TestFilterDecode(t *testing.T) { cases := []struct { str string - res *LogFilter + res *LogQuery }{ { `{}`, - &LogFilter{ + &LogQuery{ fromBlock: LatestBlockNumber, toBlock: LatestBlockNumber, }, @@ -39,7 +39,7 @@ func TestFilterDecode(t *testing.T) { `{ "address": "` + addr1.String() + `" }`, - &LogFilter{ + &LogQuery{ fromBlock: LatestBlockNumber, toBlock: LatestBlockNumber, Addresses: []types.Address{ @@ -54,7 +54,7 @@ func TestFilterDecode(t *testing.T) { "` + addr2.String() + `" ] }`, - &LogFilter{ + &LogQuery{ fromBlock: LatestBlockNumber, toBlock: LatestBlockNumber, Addresses: []types.Address{ @@ -78,7 +78,7 @@ func TestFilterDecode(t *testing.T) { "` + hash1.String() + `" ] }`, - &LogFilter{ + &LogQuery{ fromBlock: LatestBlockNumber, toBlock: LatestBlockNumber, Topics: [][]types.Hash{ @@ -104,7 +104,7 @@ func TestFilterDecode(t *testing.T) { "fromBlock": "pending", "toBlock": "earliest" }`, - &LogFilter{ + &LogQuery{ fromBlock: PendingBlockNumber, toBlock: EarliestBlockNumber, }, @@ -113,7 +113,7 @@ func TestFilterDecode(t *testing.T) { `{ "blockHash": "` + hash1.String() + `" }`, - &LogFilter{ + &LogQuery{ BlockHash: &hash1, fromBlock: LatestBlockNumber, toBlock: LatestBlockNumber, @@ -122,7 +122,7 @@ func TestFilterDecode(t *testing.T) { } for indx, c := range cases { - res := &LogFilter{} + res := &LogQuery{} err := res.UnmarshalJSON([]byte(c.str)) if err != nil && c.res != nil { @@ -143,13 +143,13 @@ func TestFilterDecode(t *testing.T) { func TestFilterMatch(t *testing.T) { cases := []struct { - filter LogFilter + filter LogQuery log *types.Log match bool }{ { // correct, exact match - LogFilter{ + LogQuery{ Topics: [][]types.Hash{ { hash1, @@ -165,7 +165,7 @@ func TestFilterMatch(t *testing.T) { }, { // bad, the filter has two hashes - LogFilter{ + LogQuery{ Topics: [][]types.Hash{ { hash1, @@ -184,7 +184,7 @@ func TestFilterMatch(t *testing.T) { }, { // correct, wildcard in one hash - LogFilter{ + LogQuery{ Topics: [][]types.Hash{ {}, { @@ -202,7 +202,7 @@ func TestFilterMatch(t *testing.T) { }, { // correct, more topics than in filter - LogFilter{ + LogQuery{ Topics: [][]types.Hash{ { hash1, diff --git a/licenses/bsd_licenses.json b/licenses/bsd_licenses.json index 6b41d60760..f6b7109df8 100644 --- a/licenses/bsd_licenses.json +++ b/licenses/bsd_licenses.json @@ -1,30 +1,31 @@ [ { - "name": "flynn/noise", - "version": "v1.0.0", + "name": "marten-seemann/qtls-go1-16", + "version": "v0.1.4", "type": "BSD-3-Clause", - "path": "vendor/github.com/flynn/noise/LICENSE" + "path": "vendor/github.com/marten-seemann/qtls-go1-16/LICENSE" }, { - "name": "x/sync/errgroup", + "name": "spaolacci/murmur3", + "version": "v1.1.0", "type": "BSD-3-Clause", - "path": "vendor/golang.org/x/sync/LICENSE" + "path": "vendor/github.com/spaolacci/murmur3/LICENSE" }, { - "name": "klauspost/compress/internal/snapref", + "name": "google/gopacket/routing", "type": "BSD-3-Clause", - "path": "vendor/github.com/klauspost/compress/internal/snapref/LICENSE" + "path": "vendor/github.com/google/gopacket/LICENSE" }, { - "name": "marten-seemann/tcp", - "version": "v0.0.0-20210406111302-dfbc87cc63fd", - "type": "BSD-2-Clause", - "path": "vendor/github.com/marten-seemann/tcp/LICENSE" + "name": "pmezard/go-difflib/difflib", + "type": "BSD-3-Clause", + "path": "vendor/github.com/pmezard/go-difflib/LICENSE" }, { - "name": "syndtr/goleveldb/leveldb", - "type": "BSD-2-Clause", - "path": "vendor/github.com/syndtr/goleveldb/LICENSE" + "name": "multiformats/go-base32", + "version": "v0.0.4", + "type": "BSD-3-Clause", + "path": "vendor/github.com/multiformats/go-base32/LICENSE" }, { "name": "pbnjay/memory", @@ -32,6 +33,11 @@ "type": "BSD-3-Clause", "path": "vendor/github.com/pbnjay/memory/LICENSE" }, + { + "name": "square/go-jose.v2/json", + "type": "BSD-3-Clause", + "path": "vendor/gopkg.in/square/go-jose.v2/json/LICENSE" + }, { "name": "x/crypto", "version": "v0.0.0-20220214200702-86341886e292", @@ -45,62 +51,64 @@ "path": "vendor/golang.org/x/text/LICENSE" }, { - "name": "umbracle/go-eth-bn256", - "version": "v0.0.0-20190607160430-b36caf4e0f6b", + "name": "google/uuid", + "version": "v1.3.0", "type": "BSD-3-Clause", - "path": "vendor/github.com/umbracle/go-eth-bn256/LICENSE" + "path": "vendor/github.com/google/uuid/LICENSE" }, { - "name": "protobuf", - "version": "v1.27.1", + "name": "golang/snappy", + "version": "v0.0.4", "type": "BSD-3-Clause", - "path": "vendor/google.golang.org/protobuf/LICENSE" + "path": "vendor/github.com/golang/snappy/LICENSE" }, { - "name": "gogo/protobuf/proto", + "name": "libp2p/go-netroute", + "version": "v0.2.0", "type": "BSD-3-Clause", - "path": "vendor/github.com/gogo/protobuf/LICENSE" + "path": "vendor/github.com/libp2p/go-netroute/LICENSE" }, { - "name": "spf13/pflag", - "version": "v1.0.5", - "type": "BSD-3-Clause", - "path": "vendor/github.com/spf13/pflag/LICENSE" + "name": "gorilla/websocket", + "version": "v1.5.0", + "type": "BSD-2-Clause", + "path": "vendor/github.com/gorilla/websocket/LICENSE" }, { - "name": "multiformats/go-base32", - "version": "v0.0.4", + "name": "flynn/noise", + "version": "v1.0.0", "type": "BSD-3-Clause", - "path": "vendor/github.com/multiformats/go-base32/LICENSE" + "path": "vendor/github.com/flynn/noise/LICENSE" }, { - "name": "marten-seemann/qtls-go1-17", - "version": "v0.1.0", - "type": "BSD-3-Clause", - "path": "vendor/github.com/marten-seemann/qtls-go1-17/LICENSE" + "name": "marten-seemann/tcp", + "version": "v0.0.0-20210406111302-dfbc87cc63fd", + "type": "BSD-2-Clause", + "path": "vendor/github.com/marten-seemann/tcp/LICENSE" }, { - "name": "x/sys", - "version": "v0.0.0-20220227234510-4e6760a101f9", - "type": "BSD-3-Clause", - "path": "vendor/golang.org/x/sys/LICENSE" + "name": "huin/goupnp", + "version": "v1.0.2", + "type": "BSD-2-Clause", + "path": "vendor/github.com/huin/goupnp/LICENSE" }, { - "name": "golang/snappy", - "version": "v0.0.4", + "name": "pierrec/lz4", + "version": "v2.5.2", "type": "BSD-3-Clause", - "path": "vendor/github.com/golang/snappy/LICENSE" + "path": "vendor/github.com/pierrec/lz4/LICENSE" }, { - "name": "prometheus/common/internal/bitbucket.org/ww/goautoneg", + "name": "x/sys", + "version": "v0.0.0-20220227234510-4e6760a101f9", "type": "BSD-3-Clause", - "path": "vendor/github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg/README.txt" + "path": "vendor/golang.org/x/sys/LICENSE" }, { - "name": "libp2p/go-netroute", - "version": "v0.2.0", + "name": "spf13/pflag", + "version": "v1.0.5", "type": "BSD-3-Clause", - "path": "vendor/github.com/libp2p/go-netroute/LICENSE" + "path": "vendor/github.com/spf13/pflag/LICENSE" }, { "name": "x/time/rate", @@ -108,39 +116,33 @@ "path": "vendor/golang.org/x/time/LICENSE" }, { - "name": "mikioh/tcpopt", - "version": "v0.0.0-20190314235656-172688c1accc", - "type": "BSD-2-Clause", - "path": "vendor/github.com/mikioh/tcpopt/LICENSE" + "name": "umbracle/go-eth-bn256", + "version": "v0.0.0-20190607160430-b36caf4e0f6b", + "type": "BSD-3-Clause", + "path": "vendor/github.com/umbracle/go-eth-bn256/LICENSE" }, { - "name": "pmezard/go-difflib/difflib", + "name": "miekg/dns", + "version": "v1.1.45", "type": "BSD-3-Clause", - "path": "vendor/github.com/pmezard/go-difflib/LICENSE" + "path": "vendor/github.com/miekg/dns/LICENSE" }, { - "name": "x/net", - "version": "v0.0.0-20220127200216-cd36cc0744dd", + "name": "protobuf", + "version": "v1.28.0", "type": "BSD-3-Clause", - "path": "vendor/golang.org/x/net/LICENSE" + "path": "vendor/google.golang.org/protobuf/LICENSE" }, { - "name": "spaolacci/murmur3", - "version": "v1.1.0", + "name": "klauspost/compress/internal/snapref", "type": "BSD-3-Clause", - "path": "vendor/github.com/spaolacci/murmur3/LICENSE" + "path": "vendor/github.com/klauspost/compress/internal/snapref/LICENSE" }, { - "name": "gorilla/websocket", - "version": "v1.5.0", + "name": "mikioh/tcpopt", + "version": "v0.0.0-20190314235656-172688c1accc", "type": "BSD-2-Clause", - "path": "vendor/github.com/gorilla/websocket/LICENSE" - }, - { - "name": "miekg/dns", - "version": "v1.1.45", - "type": "BSD-3-Clause", - "path": "vendor/github.com/miekg/dns/LICENSE" + "path": "vendor/github.com/mikioh/tcpopt/LICENSE" }, { "name": "aws/aws-sdk-go/internal/sync/singleflight", @@ -148,44 +150,42 @@ "path": "vendor/github.com/aws/aws-sdk-go/internal/sync/singleflight/LICENSE" }, { - "name": "mikioh/tcpinfo", - "version": "v0.0.0-20190314235526-30a79bb1804b", - "type": "BSD-2-Clause", - "path": "vendor/github.com/mikioh/tcpinfo/LICENSE" + "name": "prometheus/common/internal/bitbucket.org/ww/goautoneg", + "type": "BSD-3-Clause", + "path": "vendor/github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg/README.txt" }, { - "name": "square/go-jose.v2/json", + "name": "x/net", + "version": "v0.0.0-20220127200216-cd36cc0744dd", "type": "BSD-3-Clause", - "path": "vendor/gopkg.in/square/go-jose.v2/json/LICENSE" + "path": "vendor/golang.org/x/net/LICENSE" }, { - "name": "huin/goupnp", - "version": "v1.0.2", + "name": "mikioh/tcpinfo", + "version": "v0.0.0-20190314235526-30a79bb1804b", "type": "BSD-2-Clause", - "path": "vendor/github.com/huin/goupnp/LICENSE" + "path": "vendor/github.com/mikioh/tcpinfo/LICENSE" }, { - "name": "pierrec/lz4", - "version": "v2.5.2", + "name": "golang/protobuf", + "version": "v1.5.2", "type": "BSD-3-Clause", - "path": "vendor/github.com/pierrec/lz4/LICENSE" + "path": "vendor/github.com/golang/protobuf/LICENSE" }, { - "name": "google/uuid", - "version": "v1.3.0", + "name": "x/sync/errgroup", "type": "BSD-3-Clause", - "path": "vendor/github.com/google/uuid/LICENSE" + "path": "vendor/golang.org/x/sync/LICENSE" }, { - "name": "golang/protobuf", - "version": "v1.5.2", - "type": "BSD-3-Clause", - "path": "vendor/github.com/golang/protobuf/LICENSE" + "name": "syndtr/goleveldb/leveldb", + "type": "BSD-2-Clause", + "path": "vendor/github.com/syndtr/goleveldb/LICENSE" }, { - "name": "google/gopacket/routing", + "name": "gogo/protobuf/proto", "type": "BSD-3-Clause", - "path": "vendor/github.com/google/gopacket/LICENSE" + "path": "vendor/github.com/gogo/protobuf/LICENSE" }, { "name": "libp2p/go-netroute",