Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dot/rpc implement state_subscribeStorage RPC WebSocket method #983

Merged
merged 29 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8dcd114
stub functions for storage change
edwardmack Jul 6, 2020
1839679
create channels
edwardmack Jul 6, 2020
e17115c
added websocket subscription filter
edwardmack Jul 6, 2020
cb68030
implement rpc state_subscribeStorage
edwardmack Jul 6, 2020
a4a06d1
make lint
edwardmack Jul 6, 2020
4e92819
add tests
edwardmack Jul 6, 2020
4dd45e3
add tests
edwardmack Jul 6, 2020
452a7a1
update comments
edwardmack Jul 6, 2020
d822106
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 8, 2020
631f942
move channel listener logic to individual go routines
edwardmack Jul 10, 2020
2696608
update logging to use chainsafe logger
edwardmack Jul 10, 2020
b7ff1ce
refactor websocket connection handling storage
edwardmack Jul 11, 2020
5bdbe0e
refactor block listener
edwardmack Jul 11, 2020
9389b5b
handle closing listening channels and websocket connections
edwardmack Jul 11, 2020
884c523
lint issues
edwardmack Jul 11, 2020
0d62122
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 11, 2020
ce4837c
added notification on clear storage for storage change listener
edwardmack Jul 11, 2020
8d533ca
handle closing channes and subscriptions
edwardmack Jul 11, 2020
46db2f6
implement close functions for block listener
edwardmack Jul 11, 2020
0d19256
fix test
edwardmack Jul 11, 2020
005f4e6
move websocket_test to service_test to fix cyclic import
edwardmack Jul 13, 2020
cb3c502
fix lint issues
edwardmack Jul 13, 2020
f9d65f0
add tests for websocket
edwardmack Jul 13, 2020
85b6602
update test
edwardmack Jul 13, 2020
bd6e9d9
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 15, 2020
b0be8f1
code cleanup
edwardmack Jul 15, 2020
cf9cc48
add mock BlockAPI and mock StorageAPI to tests
edwardmack Jul 15, 2020
31f6b5a
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 15, 2020
bde2c68
Merge branch 'development' into ed/subscribe_storage
edwardmack Jul 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@ import (
"fmt"
"net/http"
"os"
"sync"

"github.com/ChainSafe/gossamer/dot/rpc/modules"
"github.com/ChainSafe/gossamer/dot/types"
log "github.com/ChainSafe/log15"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"
"github.com/gorilla/websocket"

log "github.com/ChainSafe/log15"
)

// HTTPServer gateway for RPC server
type HTTPServer struct {
logger log.Logger
rpcServer *rpc.Server // Actual RPC call handler
serverConfig *HTTPServerConfig
blockChan chan *types.Block
chanID byte // channel ID
wsConns []*WSConn
}

// HTTPServerConfig configures the HTTPServer
Expand All @@ -56,13 +54,18 @@ type HTTPServerConfig struct {
WSEnabled bool
WSPort uint32
Modules []string
WSSubscriptions map[uint32]*WebSocketSubscription
}

// WebSocketSubscription holds subscription details
type WebSocketSubscription struct {
WSConnection *websocket.Conn
SubscriptionType int
// WSConn struct to hold WebSocket Connection references
type WSConn struct {
wsconn *websocket.Conn
mu sync.Mutex
serverConfig *HTTPServerConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need the whole server config? or just the state interfaces?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No just BlockAPI and StorageAPI, I've updated.

logger log.Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to make the logger global to the rpc package

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea.

blockSubChannels map[int]byte
storageSubChannels map[int]byte
qtyListeners int
subscriptions map[int]Listener
}

// NewHTTPServer creates a new http server and registers an associated rpc server
Expand All @@ -77,10 +80,6 @@ func NewHTTPServer(cfg *HTTPServerConfig) *HTTPServer {
serverConfig: cfg,
}

if cfg.WSSubscriptions == nil {
cfg.WSSubscriptions = make(map[uint32]*WebSocketSubscription)
}

server.RegisterModules(cfg.Modules)
return server
}
Expand Down Expand Up @@ -151,25 +150,30 @@ func (h *HTTPServer) Start() error {
}
}()

// init and start block received listener routine
if h.serverConfig.BlockAPI != nil {
var err error
h.blockChan = make(chan *types.Block)
h.chanID, err = h.serverConfig.BlockAPI.RegisterImportedChannel(h.blockChan)
if err != nil {
return err
}
go h.blockReceivedListener()
}

return nil
}

// Stop stops the server
func (h *HTTPServer) Stop() error {
if h.serverConfig.WSEnabled {
h.serverConfig.BlockAPI.UnregisterImportedChannel(h.chanID)
close(h.blockChan)
// close all channels and websocket connections
for _, conn := range h.wsConns {
for _, sub := range conn.subscriptions {
switch v := sub.(type) {
case *StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.chanID)
close(v.channel)
case *BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.chanID)
close(v.channel)
}
}

err := conn.wsconn.Close()
if err != nil {
h.logger.Error("error closing websocket connection", "error", err)
}
}
}
return nil
}
3 changes: 3 additions & 0 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package modules
import (
"math/big"

"github.com/ChainSafe/gossamer/dot/state"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto"
Expand All @@ -14,6 +15,8 @@ import (
type StorageAPI interface {
GetStorage(key []byte) ([]byte, error)
Entries() map[string][]byte
RegisterStorageChangeChannel(ch chan<- *state.KeyValue) (byte, error)
UnregisterStorageChangeChannel(id byte)
}

// BlockAPI is the interface for the block state
Expand Down
8 changes: 5 additions & 3 deletions dot/rpc/modules/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,11 @@ func (sm *StateModule) SubscribeRuntimeVersion(r *http.Request, req *StateStorag
return sm.GetRuntimeVersion(r, nil, res)
}

// SubscribeStorage isn't implemented properly yet.
func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) {
// TODO implement change storage trie so that block hash parameter works (See issue #834)
// SubscribeStorage Storage subscription. If storage keys are specified, it creates a message for each block which
// changes the specified storage keys. If none are specified, then it creates a message for every block.
// This endpoint communicates over the Websocket protocol, but this func should remain here so it's added to rpc_methods list
func (sm *StateModule) SubscribeStorage(r *http.Request, req *StateStorageQueryRangeRequest, res *StorageChangeSetResponse) error {
Copy link
Contributor

@noot noot Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering, is there an unsubscribe method corresponding to the subscribe methods? if so, it would be nice to return the subscription ID

return nil
}

func convertAPIs(in []*runtime.API_Item) []interface{} {
Expand Down
Loading