Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dot/state] implement state_subscribeStorage #1290

Merged
merged 27 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
22934b8
added system accounts to gensis configs
edwardmack Dec 10, 2020
88c7ca1
Merge branch 'development' into ed/init_account_balances
edwardmack Dec 10, 2020
c36950e
Merge branch 'development' into ed/init_account_balances
edwardmack Dec 13, 2020
9404640
add to full account object to raw genesis
edwardmack Dec 13, 2020
2c43c02
fix tests
edwardmack Dec 14, 2020
4fa6310
Merge branch 'development' into ed/init_account_balances
edwardmack Dec 15, 2020
5b32bb1
enable rpc module by defualt
edwardmack Dec 15, 2020
0dc28a5
update gssmr genesis to use palletBalances key
edwardmack Dec 15, 2020
941a4e3
update genesis-raw
edwardmack Dec 15, 2020
31d6db8
use subscribe for db listener
edwardmack Dec 15, 2020
6b5c6c6
testing db storage subscribe
edwardmack Dec 16, 2020
bf67ee3
Merge branch 'development' into ed/db_subscribe
edwardmack Dec 22, 2020
f78719e
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 4, 2021
a431c3e
lint
edwardmack Jan 4, 2021
560f923
add test for db subscribe
edwardmack Jan 4, 2021
8248dae
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 5, 2021
38d311d
add Example db.Subscribe code
edwardmack Jan 5, 2021
c856d32
update go.mod to use updated chaindb
edwardmack Jan 7, 2021
104e5c3
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 7, 2021
8f937e1
implement subscribe storage, move filters
edwardmack Jan 11, 2021
342848e
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 11, 2021
2d284c1
remove unused code
edwardmack Jan 11, 2021
b1171de
lint
edwardmack Jan 11, 2021
76b535b
remove redundant test
edwardmack Jan 11, 2021
ccf26cc
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 12, 2021
57def3f
modify tests to increase coverage
edwardmack Jan 12, 2021
5d3135d
Merge branch 'development' into ed/db_subscribe
edwardmack Jan 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type StorageAPI interface {
GetStorage(root *common.Hash, key []byte) ([]byte, error)
GetStorageByBlockHash(bhash common.Hash, key []byte) ([]byte, error)
Entries(root *common.Hash) (map[string][]byte, error)
RegisterStorageChangeChannel(ch chan<- *state.KeyValue) (byte, error)
RegisterStorageChangeChannel(sub state.StorageSubscription) (byte, error)
UnregisterStorageChangeChannel(id byte)
GetStateRootFromBlock(bhash *common.Hash) (*common.Hash, error)
}
Expand Down
39 changes: 24 additions & 15 deletions dot/rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *WSConn) handleComm() {
case "state_subscribeStorage":
scl, err2 := c.initStorageChangeListener(reqid, params)
if err2 != nil {
logger.Warn("failed to create state change listener", "error", err)
logger.Warn("failed to create state change listener", "error", err2)
continue
}
c.startListener(scl)
Expand Down Expand Up @@ -256,38 +256,45 @@ type Listener interface {

// StorageChangeListener for listening to state change channels
type StorageChangeListener struct {
channel chan *state.KeyValue
filter map[string]bool
channel chan *state.SubscriptionResult
wsconn *WSConn
chanID byte
subID int
}

func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (int, error) {
scl := &StorageChangeListener{
channel: make(chan *state.KeyValue),
filter: make(map[string]bool),
channel: make(chan *state.SubscriptionResult),
wsconn: c,
}
sub := &state.StorageSubscription{
Filter: make(map[string]bool),
Listener: scl.channel,
}

pA := params.([]interface{})
for _, param := range pA {
switch param.(type) {
switch p := param.(type) {
case []interface{}:
for _, p := range param.([]interface{}) {
scl.filter[p.(string)] = true
for _, pp := range param.([]interface{}) {
sub.Filter[pp.(string)] = true
}
case string:
sub.Filter[p] = true
default:
return 0, fmt.Errorf("unknow parameter type")
}
}

if c.storageAPI == nil {
err := c.safeSendError(reqID, nil, "error StorageAPI not set")
if err != nil {
logger.Warn("error sending error message", "error", err)
}
return 0, fmt.Errorf("error StorageAPI not set")
}
chanID, err := c.storageAPI.RegisterStorageChangeChannel(scl.channel)

chanID, err := c.storageAPI.RegisterStorageChangeChannel(*sub)
if err != nil {
return 0, err
}
Expand All @@ -313,14 +320,17 @@ func (l *StorageChangeListener) Listen() {
continue
}

//check if change key is in subscription filter
cKey := common.BytesToHex(change.Key)
if len(l.filter) > 0 && !l.filter[cKey] {
continue
result := make(map[string]interface{})
result["block"] = change.Hash.String()
changes := [][]string{}
for _, v := range change.Changes {
kv := []string{common.BytesToHex(v.Key), common.BytesToHex(v.Value)}
changes = append(changes, kv)
}
result["changes"] = changes

changeM := make(map[string]interface{})
changeM["result"] = []string{cKey, common.BytesToHex(change.Value)}
changeM["result"] = result
changeM["subscription"] = l.subID
res := newSubcriptionBaseResponseJSON()
res.Method = "state_storage"
Expand All @@ -329,7 +339,6 @@ func (l *StorageChangeListener) Listen() {
if err != nil {
logger.Error("error sending websocket message", "error", err)
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *MockStorageAPI) Entries(_ *common.Hash) (map[string][]byte, error) {
func (m *MockStorageAPI) GetStorageByBlockHash(_ common.Hash, key []byte) ([]byte, error) {
return nil, nil
}
func (m *MockStorageAPI) RegisterStorageChangeChannel(ch chan<- *state.KeyValue) (byte, error) {
func (m *MockStorageAPI) RegisterStorageChangeChannel(sub state.StorageSubscription) (byte, error) {
return 0, nil
}
func (m *MockStorageAPI) UnregisterStorageChangeChannel(id byte) {
Expand Down
100 changes: 50 additions & 50 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type StorageState struct {
lock sync.RWMutex

// change notifiers
changed map[byte]chan<- *KeyValue
changedLock sync.RWMutex
changedLock sync.RWMutex
subscriptions map[byte]*StorageSubscription
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
Expand All @@ -67,11 +67,11 @@ func NewStorageState(db chaindb.Database, blockState *BlockState, t *trie.Trie)
tries[t.MustHash()] = t

return &StorageState{
blockState: blockState,
tries: tries,
baseDB: db,
db: chaindb.NewTable(db, storagePrefix),
changed: make(map[byte]chan<- *KeyValue),
blockState: blockState,
tries: tries,
baseDB: db,
db: chaindb.NewTable(db, storagePrefix),
subscriptions: make(map[byte]*StorageSubscription),
}, nil
}

Expand Down Expand Up @@ -162,7 +162,49 @@ func (s *StorageState) StoreInDB(root common.Hash) error {
return errTrieDoesNotExist(root)
}

return StoreTrie(s.baseDB, s.tries[root])
err := StoreTrie(s.baseDB, s.tries[root])
if err != nil {
return err
}

// notify subscribers of database changes
for _, sub := range s.subscriptions {
subRes := &SubscriptionResult{
Hash: root,
}
if len(sub.Filter) == 0 {
// no filter, so send all changes
ent := s.tries[root].Entries()
for k, v := range ent {
if k != ":code" {
// todo, currently we're ignoring :code since this is a lot of data
kv := &KeyValue{
Key: common.MustHexToBytes(fmt.Sprintf("0x%x", k)),
Value: v,
}
subRes.Changes = append(subRes.Changes, *kv)
}
}

Comment on lines +177 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to send all the entries in the trie, not just the ones that are changed? am I mistaken?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I wasn't sure about this, however when I was testing with Substrate node, using key 0x26aa394eea5630e07c48ae0c9558cef70a98fdbe9ce6c55837576c60c7af3850 (System EventCount) I saw that it was responding with a value every block, even though the value remained the same (1). So I figured that I should replicate the same behavior.

} else {
// filter result to include only interested keys
for k := range sub.Filter {
value, err := s.tries[root].Get(common.MustHexToBytes(k))
if err != nil {
logger.Error("Error retrieving value from state tries")
continue
}
kv := &KeyValue{
Key: common.MustHexToBytes(k),
Value: value,
}
subRes.Changes = append(subRes.Changes, *kv)
}
}
s.notifyChanged(subRes)
}

return nil
}

// LoadFromDB loads an encoded trie from the DB where the key is `root`
Expand Down Expand Up @@ -353,48 +395,6 @@ func (s *StorageState) GetBalance(hash *common.Hash, key [32]byte) (uint64, erro
return binary.LittleEndian.Uint64(bal), nil
}

// setStorage set the storage value for a given key in the trie. only for testing
func (s *StorageState) setStorage(hash *common.Hash, key []byte, value []byte) error {
if hash == nil {
sr, err := s.blockState.BestBlockStateRoot()
if err != nil {
return err
}
hash = &sr
}

s.lock.Lock()
defer s.lock.Unlock()
kv := &KeyValue{
Key: key,
Value: value,
}

if s.tries[*hash] == nil {
return errTrieDoesNotExist(*hash)
}

err := s.tries[*hash].Put(key, value)
if err != nil {
return err
}
s.notifyChanged(kv) // TODO: what is this used for? needs to be updated to work with new StorageState/TrieState API
return nil
}

// setBalance sets the balance for an account with the given public key. only for testing
func (s *StorageState) setBalance(hash *common.Hash, key [32]byte, balance uint64) error {
skey, err := common.BalanceKey(key)
if err != nil {
return err
}

bb := make([]byte, 8)
binary.LittleEndian.PutUint64(bb, balance)

return s.setStorage(hash, skey, bb)
}

func (s *StorageState) pruneStorage(closeCh chan interface{}) {
for {
select {
Expand Down
38 changes: 26 additions & 12 deletions dot/state/storage_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package state

import (
"errors"

"github.com/ChainSafe/gossamer/lib/common"
)

// KeyValue struct to hold key value pairs
Expand All @@ -25,26 +27,38 @@ type KeyValue struct {
Value []byte
}

//SubscriptionResult holds results of storage changes
type SubscriptionResult struct {
Hash common.Hash
Changes []KeyValue
}

//StorageSubscription holds data for Subscription to Storage
type StorageSubscription struct {
Filter map[string]bool
Listener chan<- *SubscriptionResult
}

// RegisterStorageChangeChannel function to register storage change channels
func (s *StorageState) RegisterStorageChangeChannel(ch chan<- *KeyValue) (byte, error) {
func (s *StorageState) RegisterStorageChangeChannel(sub StorageSubscription) (byte, error) {
s.changedLock.RLock()

if len(s.changed) == 256 {
return 0, errors.New("channel limit reached")
if len(s.subscriptions) == 256 {
return 0, errors.New("storage subscriptions limit reached")
}

var id byte
for {
id = generateID()
if s.changed[id] == nil {
if s.subscriptions[id] == nil {
break
}
}

s.changedLock.RUnlock()

s.changedLock.Lock()
s.changed[id] = ch
s.subscriptions[id] = &sub
s.changedLock.Unlock()
return id, nil
}
Expand All @@ -55,22 +69,22 @@ func (s *StorageState) UnregisterStorageChangeChannel(id byte) {
s.changedLock.Lock()
defer s.changedLock.Unlock()

delete(s.changed, id)
delete(s.subscriptions, id)
}

func (s *StorageState) notifyChanged(change *KeyValue) {
func (s *StorageState) notifyChanged(change *SubscriptionResult) {
s.changedLock.RLock()
defer s.changedLock.RUnlock()

if len(s.changed) == 0 {
if len(s.subscriptions) == 0 {
return
}

logger.Trace("notifying changed storage chans...", "chans", s.changed)
logger.Trace("notifying changed storage chans...", "chans", s.subscriptions)

for _, ch := range s.changed {
go func(ch chan<- *KeyValue) {
for _, ch := range s.subscriptions {
go func(ch chan<- *SubscriptionResult) {
ch <- change
}(ch)
}(ch.Listener)
}
}
Loading