Skip to content

Commit

Permalink
feat(dot/rpc) implement chain_subscribeAllHeads RPC (ChainSafe#1740)
Browse files Browse the repository at this point in the history
* feat: implement chain subscribe all heads

* chore: add unit tests to subscribe all heads

* chore: fix lint

* chore: create constructor

* chore: improve AllBlocksListener export comment

Co-authored-by: noot <36753753+noot@users.noreply.github.com>

Co-authored-by: noot <36753753+noot@users.noreply.github.com>
  • Loading branch information
2 people authored and timwu20 committed Dec 6, 2021
1 parent 5f22d01 commit 4ad319c
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 0 deletions.
85 changes: 85 additions & 0 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
authorExtrinsicUpdatesMethod = "author_extrinsicUpdate"
chainFinalizedHeadMethod = "chain_finalizedHead"
chainNewHeadMethod = "chain_newHead"
chainAllHeadMethod = "chain_allHead"
stateStorageMethod = "state_storage"
)

Expand Down Expand Up @@ -213,6 +214,90 @@ func (l *BlockFinalizedListener) Stop() error {
return cancelWithTimeout(l.cancel, l.done, l.cancelTimeout)
}

// AllBlocksListener is a listener that is aware of new and newly finalised blocks```
type AllBlocksListener struct {
finalizedChan chan *types.FinalisationInfo
importedChan chan *types.Block

wsconn *WSConn
finalizedChanID byte
importedChanID byte
subID uint32
done chan struct{}
cancel chan struct{}
cancelTimeout time.Duration
}

func newAllBlockListener(conn *WSConn) *AllBlocksListener {
return &AllBlocksListener{
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
cancelTimeout: defaultCancelTimeout,
wsconn: conn,
finalizedChan: make(chan *types.FinalisationInfo, DEFAULT_BUFFER_SIZE),
importedChan: make(chan *types.Block, DEFAULT_BUFFER_SIZE),
}
}

// Listen start a goroutine to listen imported and finalised blocks
func (l *AllBlocksListener) Listen() {
go func() {
defer func() {
l.wsconn.BlockAPI.UnregisterImportedChannel(l.importedChanID)
l.wsconn.BlockAPI.UnregisterFinalisedChannel(l.finalizedChanID)

close(l.importedChan)
close(l.finalizedChan)
close(l.done)
}()

for {
select {
case <-l.cancel:
return
case fin, ok := <-l.finalizedChan:
if !ok {
return
}

if fin == nil || fin.Header == nil {
continue
}

finHead, err := modules.HeaderToJSON(*fin.Header)
if err != nil {
logger.Error("failed to convert finalised block header to JSON", "error", err)
continue
}

l.wsconn.safeSend(newSubscriptionResponse(chainAllHeadMethod, l.subID, finHead))

case imp, ok := <-l.importedChan:
if !ok {
return
}

if imp == nil || imp.Header == nil {
continue
}

impHead, err := modules.HeaderToJSON(*imp.Header)
if err != nil {
logger.Error("failed to convert imported block header to JSON", "error", err)
continue
}

l.wsconn.safeSend(newSubscriptionResponse(chainAllHeadMethod, l.subID, impHead))
}
}
}()
}

// Stop will unregister the imported chanells and stop the goroutine
func (l *AllBlocksListener) Stop() error {
return cancelWithTimeout(l.cancel, l.done, l.cancelTimeout)
}

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn *WSConn
Expand Down
3 changes: 3 additions & 0 deletions dot/rpc/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
chainSubscribeNewHeads string = "chain_subscribeNewHeads"
chainSubscribeNewHead string = "chain_subscribeNewHead"
chainSubscribeFinalizedHeads string = "chain_subscribeFinalizedHeads"
chainSubscribeAllHeads string = "chain_subscribeAllHeads"
stateSubscribeStorage string = "state_subscribeStorage"
stateSubscribeRuntimeVersion string = "state_subscribeRuntimeVersion"
grandpaSubscribeJustifications string = "grandpa_subscribeJustifications"
Expand All @@ -35,6 +36,8 @@ func (c *WSConn) getSetupListener(method string) setupListener {
return c.initStorageChangeListener
case chainSubscribeFinalizedHeads:
return c.initBlockFinalizedListener
case chainSubscribeAllHeads:
return c.initAllBlocksListerner
case stateSubscribeRuntimeVersion:
return c.initRuntimeVersionListener
case grandpaSubscribeJustifications:
Expand Down
30 changes: 30 additions & 0 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,36 @@ func (c *WSConn) initBlockFinalizedListener(reqID float64, _ interface{}) (Liste
return bfl, nil
}

func (c *WSConn) initAllBlocksListerner(reqID float64, _ interface{}) (Listener, error) {
listener := newAllBlockListener(c)

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
return nil, fmt.Errorf("error BlockAPI not set")
}

var err error
listener.importedChanID, err = c.BlockAPI.RegisterImportedChannel(listener.importedChan)
if err != nil {
c.safeSendError(reqID, nil, "could not register imported channel")
return nil, fmt.Errorf("could not register imported channel")
}

listener.finalizedChanID, err = c.BlockAPI.RegisterFinalizedChannel(listener.finalizedChan)
if err != nil {
c.safeSendError(reqID, nil, "could not register finalised channel")
return nil, fmt.Errorf("could not register finalised channel")
}

c.mu.Lock()
listener.subID = atomic.AddUint32(&c.qtyListeners, 1)
c.Subscriptions[listener.subID] = listener
c.mu.Unlock()

c.safeSend(NewSubscriptionResponseJSON(listener.subID, reqID))
return listener, nil
}

func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (Listener, error) {
pA := params.([]interface{})
extBytes, err := common.HexToBytes(pA[0].(string))
Expand Down
113 changes: 113 additions & 0 deletions dot/rpc/subscription/websocket_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package subscription

import (
"errors"
"fmt"
"math/big"
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/rpc/modules/mocks"
modulesmocks "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
Expand Down Expand Up @@ -270,3 +272,114 @@ func TestWSConn_HandleComm(t *testing.T) {
err = listener.Stop()
require.NoError(t, err)
}

func TestSubscribeAllHeads(t *testing.T) {
wsconn, c, cancel := setupWSConn(t)
wsconn.Subscriptions = make(map[uint32]Listener)
defer cancel()

go wsconn.HandleComm()
time.Sleep(time.Second * 2)

_, err := wsconn.initAllBlocksListerner(1, nil)
require.EqualError(t, err, "error BlockAPI not set")
_, msg, err := c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":null,"message":"error BlockAPI not set"},"id":1}`+"\n"), msg)

mockBlockAPI := new(mocks.BlockAPI)
mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")).
Return(uint8(0), errors.New("some mocked error")).Once()

wsconn.BlockAPI = mockBlockAPI
_, err = wsconn.initAllBlocksListerner(1, nil)
require.Error(t, err, "could not register imported channel")

_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","error":{"code":null,"message":"could not register imported channel"},"id":1}`+"\n"), msg)

mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")).
Return(uint8(10), nil).Once()
mockBlockAPI.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")).
Return(uint8(0), errors.New("failed")).Once()

_, err = wsconn.initAllBlocksListerner(1, nil)
require.Error(t, err, "could not register finalised channel")
c.ReadMessage()

importedChanID := uint8(10)
finalizedChanID := uint8(11)

var fCh chan<- *types.FinalisationInfo
var iCh chan<- *types.Block

mockBlockAPI.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")).
Run(func(args mock.Arguments) {
ch := args.Get(0).(chan<- *types.Block)
iCh = ch
}).Return(importedChanID, nil).Once()

mockBlockAPI.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")).
Run(func(args mock.Arguments) {
ch := args.Get(0).(chan<- *types.FinalisationInfo)
fCh = ch
}).
Return(finalizedChanID, nil).Once()

l, err := wsconn.initAllBlocksListerner(1, nil)
require.NoError(t, err)
require.NotNil(t, l)
require.IsType(t, &AllBlocksListener{}, l)
require.Len(t, wsconn.Subscriptions, 1)

_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(`{"jsonrpc":"2.0","result":1,"id":1}`+"\n"), msg)

l.Listen()
time.Sleep(time.Millisecond * 500)

expected := fmt.Sprintf(
`{"jsonrpc":"2.0","method":"chain_allHead","params":{"result":{"parentHash":"%s","number":"0x00","stateRoot":"%s","extrinsicsRoot":"%s","digest":{"logs":["0x064241424504ff"]}},"subscription":1}}`,
common.EmptyHash,
common.EmptyHash,
common.EmptyHash,
)

fCh <- &types.FinalisationInfo{
Header: &types.Header{
ParentHash: common.EmptyHash,
Number: big.NewInt(0),
StateRoot: common.EmptyHash,
ExtrinsicsRoot: common.EmptyHash,
Digest: types.NewDigest(types.NewBABEPreRuntimeDigest([]byte{0xff})),
},
}

time.Sleep(time.Millisecond * 500)
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, expected+"\n", string(msg))

iCh <- &types.Block{
Header: &types.Header{
ParentHash: common.EmptyHash,
Number: big.NewInt(0),
StateRoot: common.EmptyHash,
ExtrinsicsRoot: common.EmptyHash,
Digest: types.NewDigest(types.NewBABEPreRuntimeDigest([]byte{0xff})),
},
}
time.Sleep(time.Millisecond * 500)
_, msg, err = c.ReadMessage()
require.NoError(t, err)
require.Equal(t, []byte(expected+"\n"), msg)

mockBlockAPI.On("UnregisterImportedChannel", importedChanID)
mockBlockAPI.On("UnregisterFinalisedChannel", finalizedChanID)

require.NoError(t, l.Stop())
mockBlockAPI.AssertCalled(t, "UnregisterImportedChannel", importedChanID)
mockBlockAPI.AssertCalled(t, "UnregisterFinalisedChannel", finalizedChanID)
}

0 comments on commit 4ad319c

Please sign in to comment.