Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dot/rpc: implement chain_subscribeFinalizedHeads RPC method #1140

Merged
merged 11 commits into from
Oct 29, 2020
2 changes: 2 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type BlockAPI interface {
GetFinalizedHash(uint64, uint64) (common.Hash, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
UnregisterFinalizedChannel(id byte)
}

// NetworkAPI interface for network state methods
Expand Down
64 changes: 64 additions & 0 deletions dot/rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (c *WSConn) handleComm() {
}
c.startListener(scl)
case "chain_subscribeFinalizedHeads":
bfl, err3 := c.initBlockFinalizedListener(reqid)
if err3 != nil {
logger.Warn("failed to create block finalized", "error", err)
continue
}
c.startListener(bfl)
}
continue
}
Expand Down Expand Up @@ -368,3 +374,61 @@ func (l *BlockListener) Listen() {

}
}

// BlockFinalizedListener to handle listening for finalized blocks
type BlockFinalizedListener struct {
channel chan *types.Header
wsconn *WSConn
chanID byte
subID int
}

func (c *WSConn) initBlockFinalizedListener(reqID float64) (int, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
wsconn: c,
}

if c.blockAPI == nil {
err := c.safeSendError(reqID, nil, "error BlockAPI not set")
if err != nil {
logger.Warn("error sending error message", "error", err)
}
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.blockAPI.RegisterFinalizedChannel(bfl.channel)
if err != nil {
return 0, err
}
bfl.chanID = chanID
c.qtyListeners++
bfl.subID = c.qtyListeners
c.subscriptions[bfl.subID] = bfl
c.blockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
err = c.safeSend(initRes)
if err != nil {
return 0, err
}
return bfl.subID, nil
}

// Listen implementation of Listen interface to listen for channel changes
func (l *BlockFinalizedListener) Listen() {
for header := range l.channel {
if header == nil {
continue
}
head := modules.HeaderToJSON(*header)
headM := make(map[string]interface{})
headM["result"] = head
headM["subscription"] = l.subID
res := newSubcriptionBaseResponseJSON()
res.Method = "chain_finalizedHead"
res.Params = headM
err := l.wsconn.safeSend(res)
if err != nil {
logger.Error("error sending websocket message", "error", err)
}
}
}
4 changes: 4 additions & 0 deletions dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}

type MockStorageAPI struct{}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/docker/docker v1.13.1
github.com/ethereum/go-ethereum v1.9.6
github.com/go-interpreter/wagon v0.6.1-0.20200227184901-6803234760a6
github.com/golangci/golangci-lint v1.23.1 // indirect
github.com/gorilla/mux v1.7.4
github.com/gorilla/rpc v1.2.0
github.com/gorilla/websocket v1.4.2
Expand Down
Loading