Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dot/rpc: implement chain_subscribeFinalizedHeads RPC method #1140

Merged
merged 11 commits into from
Oct 29, 2020
2 changes: 2 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type BlockAPI interface {
GetFinalizedHash(uint64, uint64) (common.Hash, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
UnregisterFinalizedChannel(id byte)
}

// NetworkAPI interface for network state methods
Expand Down
9 changes: 5 additions & 4 deletions dot/rpc/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,19 @@ func (cm *ChainModule) GetHeader(r *http.Request, req *ChainHashRequest, res *Ch
// SubscribeFinalizedHeads handled by websocket handler, but this func should remain
// here so it's added to rpc_methods list
func (cm *ChainModule) SubscribeFinalizedHeads(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
return ErrSubscriptionTransport
}

// SubscribeNewHead handled by websocket handler, but this func should remain
// here so it's added to rpc_methods list
func (cm *ChainModule) SubscribeNewHead(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
return ErrSubscriptionTransport
}

// SubscribeNewHeads isn't implemented properly yet.
// SubscribeNewHeads handled by websocket handler, but this func should remain
// here so it's added to rpc_methods list
func (cm *ChainModule) SubscribeNewHeads(r *http.Request, req *EmptyRequest, res *ChainBlockHeaderResponse) error {
return nil
return ErrSubscriptionTransport
}

func (cm *ChainModule) hashLookup(req *ChainHashRequest) (common.Hash, error) {
Expand Down
21 changes: 21 additions & 0 deletions dot/rpc/modules/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.
package modules

import "errors"

// ErrSubscriptionTransport error sent when trying to access websocket subscriptions via http
var ErrSubscriptionTransport = errors.New("subscriptions are not available on this transport")
64 changes: 64 additions & 0 deletions dot/rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (c *WSConn) handleComm() {
}
c.startListener(scl)
case "chain_subscribeFinalizedHeads":
bfl, err3 := c.initBlockFinalizedListener(reqid)
if err3 != nil {
logger.Warn("failed to create block finalized", "error", err)
continue
}
c.startListener(bfl)
}
continue
}
Expand Down Expand Up @@ -368,3 +374,61 @@ func (l *BlockListener) Listen() {

}
}

// BlockFinalizedListener to handle listening for finalized blocks
type BlockFinalizedListener struct {
channel chan *types.Header
wsconn *WSConn
chanID byte
subID int
}

func (c *WSConn) initBlockFinalizedListener(reqID float64) (int, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
wsconn: c,
}

if c.blockAPI == nil {
err := c.safeSendError(reqID, nil, "error BlockAPI not set")
if err != nil {
logger.Warn("error sending error message", "error", err)
}
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.blockAPI.RegisterFinalizedChannel(bfl.channel)
if err != nil {
return 0, err
}
bfl.chanID = chanID
c.qtyListeners++
bfl.subID = c.qtyListeners
c.subscriptions[bfl.subID] = bfl
c.blockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
err = c.safeSend(initRes)
if err != nil {
return 0, err
}
return bfl.subID, nil
}

// Listen implementation of Listen interface to listen for channel changes
func (l *BlockFinalizedListener) Listen() {
for header := range l.channel {
if header == nil {
continue
}
head := modules.HeaderToJSON(*header)
headM := make(map[string]interface{})
headM["result"] = head
headM["subscription"] = l.subID
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params = headM
err := l.wsconn.safeSend(res)
if err != nil {
logger.Error("error sending websocket message", "error", err)
}
}
}
5 changes: 5 additions & 0 deletions dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var testCalls = []struct {
{[]byte{}, []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n")}, // empty request
{[]byte(`{"jsonrpc":"2.0","method":"chain_subscribeNewHeads","params":[],"id":3}`), []byte(`{"jsonrpc":"2.0","result":1,"id":3}` + "\n")},
{[]byte(`{"jsonrpc":"2.0","method":"state_subscribeStorage","params":[],"id":4}`), []byte(`{"jsonrpc":"2.0","result":2,"id":4}` + "\n")},
{[]byte(`{"jsonrpc":"2.0","method":"chain_subscribeFinalizedHeads","params":[],"id":5}`), []byte(`{"jsonrpc":"2.0","result":3,"id":5}` + "\n")},
}

func TestHTTPServer_ServeHTTP(t *testing.T) {
Expand Down Expand Up @@ -97,6 +98,10 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}

type MockStorageAPI struct{}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/docker/docker v1.13.1
github.com/ethereum/go-ethereum v1.9.6
github.com/go-interpreter/wagon v0.6.1-0.20200227184901-6803234760a6
github.com/golangci/golangci-lint v1.23.1 // indirect
github.com/gorilla/mux v1.7.4
github.com/gorilla/rpc v1.2.0
github.com/gorilla/websocket v1.4.2
Expand Down
Loading