From 98a40e11dddd529dcb41119e266a90be2e193d79 Mon Sep 17 00:00:00 2001 From: kourin Date: Mon, 3 Oct 2022 18:50:14 +0800 Subject: [PATCH] EVM-42 Remove WebSocket filter with closed connection (#763) * EVM-42 Add error handling to remove closed ws connection in JSON-RPC * Fix lint error --- jsonrpc/dispatcher_test.go | 10 +-- jsonrpc/filter_manager.go | 3 +- jsonrpc/filter_manager_test.go | 134 +++++++++++++++++++++++++++++---- 3 files changed, 126 insertions(+), 21 deletions(-) diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index de58e7fc64..f73edb2ff1 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -64,9 +64,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { store := newMockStore() dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 20, 1000) - mockConnection := &mockWsConn{ - msgCh: make(chan []byte, 1), - } + mockConnection, msgCh := newMockWsConnWithMsgCh() req := []byte(`{ "method": "eth_subscribe", @@ -87,7 +85,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { }) select { - case <-mockConnection.msgCh: + case <-msgCh: case <-time.After(2 * time.Second): t.Fatal("\"newHeads\" event not received in 2 seconds") } @@ -98,9 +96,7 @@ func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) { store := newMockStore() dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 20, 1000) - mockConnection := &mockWsConn{ - msgCh: make(chan []byte, 1), - } + mockConnection, _ := newMockWsConnWithMsgCh() cases := []struct { msg []byte diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index dbed1a22a7..2635a05c0c 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "sync" "time" @@ -718,7 +719,7 @@ func (f *FilterManager) flushWsFilters() error { if flushErr := filter.sendUpdates(); flushErr != nil { // mark as closed if the connection is closed - if errors.Is(flushErr, websocket.ErrCloseSent) { + if errors.Is(flushErr, websocket.ErrCloseSent) || errors.Is(flushErr, net.ErrClosed) { closedFilterIDs = append(closedFilterIDs, id) f.logger.Warn(fmt.Sprintf("Subscription %s has been closed", id)) diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 2b6a05579b..19354d8946 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -1,7 +1,10 @@ package jsonrpc import ( + "context" + "errors" "math/big" + "net" "strconv" "testing" "time" @@ -326,9 +329,7 @@ func TestRemoveFilterByWebsocket(t *testing.T) { store := newMockStore() - mock := &mockWsConn{ - msgCh: make(chan []byte, 1), - } + mock, _ := newMockWsConnWithMsgCh() m := NewFilterManager(hclog.NewNullLogger(), store, 1000) defer m.Close() @@ -343,15 +344,100 @@ func TestRemoveFilterByWebsocket(t *testing.T) { assert.False(t, m.Exists(id)) } -func TestFilterWebsocket(t *testing.T) { +func Test_flushWsFilters(t *testing.T) { t.Parallel() store := newMockStore() - mock := &mockWsConn{ - msgCh: make(chan []byte, 1), + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) + + t.Cleanup(func() { + m.Close() + }) + + go m.Run() + + runTest := func(t *testing.T, flushErr error, shouldExist bool) { + t.Helper() + + var ( + filterID string + ) + + mock := &mockWsConn{ + SetFilterIDFn: func(s string) { + filterID = s + }, + GetFilterIDFn: func() string { + return filterID + }, + WriteMessageFn: func(i int, b []byte) error { + return flushErr + }, + } + + id := m.NewBlockFilter(mock) + + // emit event + store.emitEvent(&mockEvent{ + NewChain: []*mockHeader{ + { + header: &types.Header{ + Hash: types.StringToHash("1"), + }, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + t.Errorf("timeout for filter existence check, expected=%t, actual=%t", shouldExist, m.Exists(id)) + + return + default: + if shouldExist == m.Exists(id) { + return + } + } + } } + t.Run("should remove if sendUpdates returns websocket.ErrCloseSent", func(t *testing.T) { + t.Parallel() + + runTest(t, websocket.ErrCloseSent, false) + }) + + t.Run("should remove if sendUpdates returns net.ErrClosed", func(t *testing.T) { + t.Parallel() + + runTest(t, net.ErrClosed, false) + }) + + t.Run("should keep if sendUpdates returns unknown error", func(t *testing.T) { + t.Parallel() + + runTest(t, errors.New("hoge"), true) + }) + + t.Run("should keep if sendUpdates doesn't return error", func(t *testing.T) { + t.Parallel() + + runTest(t, nil, true) + }) +} + +func TestFilterWebsocket(t *testing.T) { + t.Parallel() + + store := newMockStore() + + mock, msgCh := newMockWsConnWithMsgCh() + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) defer m.Close() @@ -375,29 +461,51 @@ func TestFilterWebsocket(t *testing.T) { }) select { - case <-mock.msgCh: + case <-msgCh: case <-time.After(2 * time.Second): t.Fatal("bad") } } type mockWsConn struct { - msgCh chan []byte - filterID string + SetFilterIDFn func(string) + GetFilterIDFn func() string + WriteMessageFn func(int, []byte) error } func (m *mockWsConn) SetFilterID(filterID string) { - m.filterID = filterID + m.SetFilterIDFn(filterID) } func (m *mockWsConn) GetFilterID() string { - return m.filterID + return m.GetFilterIDFn() } func (m *mockWsConn) WriteMessage(messageType int, b []byte) error { - m.msgCh <- b + return m.WriteMessageFn(messageType, b) +} + +func newMockWsConnWithMsgCh() (*mockWsConn, <-chan []byte) { + var ( + filterID string + msgCh = make(chan []byte, 1) + ) + + mock := &mockWsConn{ + SetFilterIDFn: func(s string) { + filterID = s + }, + GetFilterIDFn: func() string { + return filterID + }, + WriteMessageFn: func(i int, b []byte) error { + msgCh <- b + + return nil + }, + } - return nil + return mock, msgCh } func TestHeadStream(t *testing.T) {