diff --git a/dot/rpc/subscription/listeners.go b/dot/rpc/subscription/listeners.go index 01a4ad896d..ddc18a3c98 100644 --- a/dot/rpc/subscription/listeners.go +++ b/dot/rpc/subscription/listeners.go @@ -35,6 +35,7 @@ const ( authorExtrinsicUpdatesMethod = "author_extrinsicUpdate" chainFinalizedHeadMethod = "chain_finalizedHead" chainNewHeadMethod = "chain_newHead" + chainAllHeadMethod = "chain_allHead" stateStorageMethod = "state_storage" ) @@ -213,6 +214,90 @@ func (l *BlockFinalizedListener) Stop() error { return cancelWithTimeout(l.cancel, l.done, l.cancelTimeout) } +// AllBlocksListener is a listener that is aware of new and newly finalised blocks``` +type AllBlocksListener struct { + finalizedChan chan *types.FinalisationInfo + importedChan chan *types.Block + + wsconn *WSConn + finalizedChanID byte + importedChanID byte + subID uint32 + done chan struct{} + cancel chan struct{} + cancelTimeout time.Duration +} + +func newAllBlockListener(conn *WSConn) *AllBlocksListener { + return &AllBlocksListener{ + cancel: make(chan struct{}, 1), + done: make(chan struct{}, 1), + cancelTimeout: defaultCancelTimeout, + wsconn: conn, + finalizedChan: make(chan *types.FinalisationInfo, DEFAULT_BUFFER_SIZE), + importedChan: make(chan *types.Block, DEFAULT_BUFFER_SIZE), + } +} + +// Listen start a goroutine to listen imported and finalised blocks +func (l *AllBlocksListener) Listen() { + go func() { + defer func() { + l.wsconn.BlockAPI.UnregisterImportedChannel(l.importedChanID) + l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalizedChanID) + + close(l.importedChan) + close(l.finalizedChan) + close(l.done) + }() + + for { + select { + case <-l.cancel: + return + case fin, ok := <-l.finalizedChan: + if !ok { + return + } + + if fin == nil || fin.Header == nil { + continue + } + + finHead, err := modules.HeaderToJSON(*fin.Header) + if err != nil { + logger.Error("failed to convert finalised block header to JSON", "error", err) + continue + } + + l.wsconn.safeSend(newSubscriptionResponse(chainAllHeadMethod, l.subID, finHead)) + + case imp, ok := <-l.importedChan: + if !ok { + return + } + + if imp == nil || imp.Header == nil { + continue + } + + impHead, err := modules.HeaderToJSON(*imp.Header) + if err != nil { + logger.Error("failed to convert imported block header to JSON", "error", err) + continue + } + + l.wsconn.safeSend(newSubscriptionResponse(chainAllHeadMethod, l.subID, impHead)) + } + } + }() +} + +// Stop will unregister the imported chanells and stop the goroutine +func (l *AllBlocksListener) Stop() error { + return cancelWithTimeout(l.cancel, l.done, l.cancelTimeout) +} + // ExtrinsicSubmitListener to handle listening for extrinsic events type ExtrinsicSubmitListener struct { wsconn *WSConn diff --git a/dot/rpc/subscription/subscription.go b/dot/rpc/subscription/subscription.go index cbf7648c78..870faf5410 100644 --- a/dot/rpc/subscription/subscription.go +++ b/dot/rpc/subscription/subscription.go @@ -11,6 +11,7 @@ const ( chainSubscribeNewHeads string = "chain_subscribeNewHeads" chainSubscribeNewHead string = "chain_subscribeNewHead" chainSubscribeFinalizedHeads string = "chain_subscribeFinalizedHeads" + chainSubscribeAllHeads string = "chain_subscribeAllHeads" stateSubscribeStorage string = "state_subscribeStorage" stateSubscribeRuntimeVersion string = "state_subscribeRuntimeVersion" grandpaSubscribeJustifications string = "grandpa_subscribeJustifications" @@ -35,6 +36,8 @@ func (c *WSConn) getSetupListener(method string) setupListener { return c.initStorageChangeListener case chainSubscribeFinalizedHeads: return c.initBlockFinalizedListener + case chainSubscribeAllHeads: + return c.initAllBlocksListerner case stateSubscribeRuntimeVersion: return c.initRuntimeVersionListener case grandpaSubscribeJustifications: diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index f3ede55f39..e9dac52573 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -278,6 +278,36 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Liste return bfl, nil } +func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener, error) { + listener := newAllBlockListener(c) + + if c.BlockAPI == nil { + c.safeSendError(reqID, nil, "error BlockAPI not set") + return nil, fmt.Errorf("error BlockAPI not set") + } + + var err error + listener.importedChanID, err = c.BlockAPI.RegisterImportedChannel(listener.importedChan) + if err != nil { + c.safeSendError(reqID, nil, "could not register imported channel") + return nil, fmt.Errorf("could not register imported channel") + } + + listener.finalizedChanID, err = c.BlockAPI.RegisterFinalizedChannel(listener.finalizedChan) + if err != nil { + c.safeSendError(reqID, nil, "could not register finalised channel") + return nil, fmt.Errorf("could not register finalised channel") + } + + c.mu.Lock() + listener.subID = atomic.AddUint32(&c.qtyListeners, 1) + c.Subscriptions[listener.subID] = listener + c.mu.Unlock() + + c.safeSend(NewSubscriptionResponseJSON(listener.subID, reqID)) + return listener, nil +} + func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener, error) { pA := params.([]interface{}) extBytes, err := common.HexToBytes(pA[0].(string)) diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index 74d3c88796..cac0e9db0a 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -1,11 +1,13 @@ package subscription import ( + "errors" "fmt" "math/big" "testing" "time" + "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks" modulesmocks "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks" "github.com/ChainSafe/gossamer/dot/rpc/modules" @@ -270,3 +272,114 @@ func TestWSConn_HandleComm(t *testing.T) { err = listener.Stop() require.NoError(t, err) } + +func TestSubscribeAllHeads(t *testing.T) { + wsconn, c, cancel := setupWSConn(t) + wsconn.Subscriptions = make(map[uint32]Listener) + defer cancel() + + go wsconn.HandleComm() + time.Sleep(time.Second * 2) + + _, err := wsconn.initAllBlocksListerner(1, nil) + require.EqualError(t, err, "error BlockAPI not set") + _, msg, err := c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":null,"message":"error BlockAPI not set"},"id":1}`+"\n"), msg) + + mockBlockAPI := new(mocks.BlockAPI) + mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")). + Return(uint8(0), errors.New("some mocked error")).Once() + + wsconn.BlockAPI = mockBlockAPI + _, err = wsconn.initAllBlocksListerner(1, nil) + require.Error(t, err, "could not register imported channel") + + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":null,"message":"could not register imported channel"},"id":1}`+"\n"), msg) + + mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")). + Return(uint8(10), nil).Once() + mockBlockAPI.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")). + Return(uint8(0), errors.New("failed")).Once() + + _, err = wsconn.initAllBlocksListerner(1, nil) + require.Error(t, err, "could not register finalised channel") + c.ReadMessage() + + importedChanID := uint8(10) + finalizedChanID := uint8(11) + + var fCh chan<- *types.FinalisationInfo + var iCh chan<- *types.Block + + mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")). + Run(func(args mock.Arguments) { + ch := args.Get(0).(chan<- *types.Block) + iCh = ch + }).Return(importedChanID, nil).Once() + + mockBlockAPI.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")). + Run(func(args mock.Arguments) { + ch := args.Get(0).(chan<- *types.FinalisationInfo) + fCh = ch + }). + Return(finalizedChanID, nil).Once() + + l, err := wsconn.initAllBlocksListerner(1, nil) + require.NoError(t, err) + require.NotNil(t, l) + require.IsType(t, &AllBlocksListener{}, l) + require.Len(t, wsconn.Subscriptions, 1) + + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(`{"jsonrpc":"2.0","result":1,"id":1}`+"\n"), msg) + + l.Listen() + time.Sleep(time.Millisecond * 500) + + expected := fmt.Sprintf( + `{"jsonrpc":"2.0","method":"chain_allHead","params":{"result":{"parentHash":"%s","number":"0x00","stateRoot":"%s","extrinsicsRoot":"%s","digest":{"logs":["0x064241424504ff"]}},"subscription":1}}`, + common.EmptyHash, + common.EmptyHash, + common.EmptyHash, + ) + + fCh <- &types.FinalisationInfo{ + Header: &types.Header{ + ParentHash: common.EmptyHash, + Number: big.NewInt(0), + StateRoot: common.EmptyHash, + ExtrinsicsRoot: common.EmptyHash, + Digest: types.NewDigest(types.NewBABEPreRuntimeDigest([]byte{0xff})), + }, + } + + time.Sleep(time.Millisecond * 500) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, expected+"\n", string(msg)) + + iCh <- &types.Block{ + Header: &types.Header{ + ParentHash: common.EmptyHash, + Number: big.NewInt(0), + StateRoot: common.EmptyHash, + ExtrinsicsRoot: common.EmptyHash, + Digest: types.NewDigest(types.NewBABEPreRuntimeDigest([]byte{0xff})), + }, + } + time.Sleep(time.Millisecond * 500) + _, msg, err = c.ReadMessage() + require.NoError(t, err) + require.Equal(t, []byte(expected+"\n"), msg) + + mockBlockAPI.On("UnregisterImportedChannel", importedChanID) + mockBlockAPI.On("UnregisterFinalisedChannel", finalizedChanID) + + require.NoError(t, l.Stop()) + mockBlockAPI.AssertCalled(t, "UnregisterImportedChannel", importedChanID) + mockBlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", finalizedChanID) +}