From bccc6672cad4abe13241a979895bfd7637ed5ce5 Mon Sep 17 00:00:00 2001
From: Philip Offtermatt
Date: Mon, 21 Aug 2023 13:52:35 +0200
Subject: [PATCH 1/3] Add mutex around state updates
---
cometmock/abci_client/client.go | 29 ++++-----
cometmock/storage/storage.go | 101 +++++++++++++++++---------------
2 files changed, 64 insertions(+), 66 deletions(-)
diff --git a/cometmock/abci_client/client.go b/cometmock/abci_client/client.go
index 1a3c067..4d6ce43 100644
--- a/cometmock/abci_client/client.go
+++ b/cometmock/abci_client/client.go
@@ -28,6 +28,9 @@ var GlobalClient *AbciClient
// store a mutex that allows only running one block at a time
var blockMutex = sync.Mutex{}
+// store a mutex that disallows accessing state information while it is being updated
+var stateUpdateMutex = sync.Mutex{}
+
var verbose = false
// AbciClient facilitates calls to the ABCI interface of multiple nodes.
@@ -720,8 +723,6 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
return nil, nil, nil, nil, nil, err
}
- a.LastBlock = block
-
commitSigs := []types.CommitSig{}
for index, val := range a.CurState.Validators.Validators {
@@ -817,25 +818,14 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
deliverTxResponses = append(deliverTxResponses, resDeliverTx)
}
- // insert entries into the storage
- err = a.Storage.InsertBlock(newHeight, block)
- if err != nil {
- return nil, nil, nil, nil, nil, err
- }
-
- err = a.Storage.InsertCommit(newHeight, a.LastCommit)
- if err != nil {
- return nil, nil, nil, nil, nil, err
- }
+ // lock the state update mutex while the stores are updated to avoid
+ // inconsistencies between stores
+ a.Storage.LockBeforeStateUpdate()
+ a.LastBlock = block
// copy state so that the historical state is not mutated
state := a.CurState.Copy()
- err = a.Storage.InsertState(newHeight, &state)
- if err != nil {
- return nil, nil, nil, nil, nil, err
- }
-
// build components of the state update, then call the update function
abciResponses := cmtstate.ABCIResponses{
DeliverTxs: deliverTxResponses,
@@ -843,7 +833,8 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
BeginBlock: resBeginBlock,
}
- err = a.Storage.InsertResponses(newHeight, &abciResponses)
+ // insert entries into the storage
+ err = a.Storage.UpdateStores(newHeight, block, a.LastCommit, &state, &abciResponses)
if err != nil {
return nil, nil, nil, nil, nil, err
}
@@ -853,6 +844,8 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
if err != nil {
return nil, nil, nil, nil, nil, err
}
+ // unlock the state mutex, since we are done updating state
+ a.Storage.UnlockAfterStateUpdate()
resCommit, err := a.SendCommit()
if err != nil {
diff --git a/cometmock/storage/storage.go b/cometmock/storage/storage.go
index 2151d22..3e62c7b 100644
--- a/cometmock/storage/storage.go
+++ b/cometmock/storage/storage.go
@@ -12,51 +12,51 @@ import (
// Storage is an interface for storing blocks, commits and states by height.
// All methods are thread-safe.
type Storage interface {
- // InsertBlock inserts a block at a given height.
- // If there is already a block at that height, it should be overwritten.
- InsertBlock(height int64, block *types.Block) error
// GetBlock returns the block at a given height.
GetBlock(height int64) (*types.Block, error)
- // InsertCommit inserts a commit at a given height.
- // If there is already a commit at that height, it should be overwritten.
- InsertCommit(height int64, commit *types.Commit) error
// GetCommit returns the commit at a given height.
GetCommit(height int64) (*types.Commit, error)
- // InsertState inserts a state at a given height. This is the state after
- // applying the block at that height.
- // If there is already a state at that height, it should be overwritten.
- InsertState(height int64, state *cometstate.State) error
// GetState returns the state at a given height. This is the state after
// applying the block at that height.
GetState(height int64) (*cometstate.State, error)
- // InsertResponses inserts the ABCI responses from a given height.
- // If there are already responses at that height, they should be overwritten.
- InsertResponses(height int64, responses *protostate.ABCIResponses) error
// GetResponses returns the ABCI responses from a given height.
GetResponses(height int64) (*protostate.ABCIResponses, error)
+
+ // LockBeforeStateUpdate locks the storage for state update.
+ LockBeforeStateUpdate()
+
+ // UnlockAfterStateUpdate unlocks the storage for state update.
+ UnlockAfterStateUpdate()
+
+ // UpdateStores updates the storage with the given block, commit, state and responses.
+ // It is assumed that the block, commit, state and responses are all from the same height.
+ // If they are not, the storage will be in an inconsistent state.
+ // If the storage is already updated with the given height, the storage will overwrite the existing data.
+ // This method is *not* thread-safe.
+ // Before calling this, the caller should call LockForStateUpdate().
+ // After calling this, the caller should call UnlockForStateUpdate().
+ UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error
}
// MapStorage is a simple in-memory implementation of Storage.
type MapStorage struct {
- blocks map[int64]*types.Block
- blocksMutex sync.RWMutex
- commits map[int64]*types.Commit
- commitMutex sync.RWMutex
- states map[int64]*cometstate.State
- statesMutex sync.RWMutex
- responses map[int64]*protostate.ABCIResponses
- responsesMutex sync.RWMutex
+ // a mutex that gets locked while the state is being updated,
+ // so that a) updates do not interleave and b) reads do not happen while
+ // the state is being updated, i.e. two stores might give bogus data.
+ stateUpdateMutex sync.RWMutex
+ blocks map[int64]*types.Block
+ commits map[int64]*types.Commit
+ states map[int64]*cometstate.State
+ responses map[int64]*protostate.ABCIResponses
}
// ensure MapStorage implements Storage
var _ Storage = (*MapStorage)(nil)
-func (m *MapStorage) InsertBlock(height int64, block *types.Block) error {
- m.blocksMutex.Lock()
- defer m.blocksMutex.Unlock()
+func (m *MapStorage) insertBlock(height int64, block *types.Block) error {
if m.blocks == nil {
m.blocks = make(map[int64]*types.Block)
}
@@ -65,9 +65,8 @@ func (m *MapStorage) InsertBlock(height int64, block *types.Block) error {
}
func (m *MapStorage) GetBlock(height int64) (*types.Block, error) {
- m.blocksMutex.RLock()
- defer m.blocksMutex.RUnlock()
-
+ m.stateUpdateMutex.RLock()
+ defer m.stateUpdateMutex.RUnlock()
if m.blocks == nil {
m.blocks = make(map[int64]*types.Block)
}
@@ -77,10 +76,7 @@ func (m *MapStorage) GetBlock(height int64) (*types.Block, error) {
return nil, fmt.Errorf("block for height %v not found", height)
}
-func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error {
- m.commitMutex.Lock()
- defer m.commitMutex.Unlock()
-
+func (m *MapStorage) insertCommit(height int64, commit *types.Commit) error {
if m.commits == nil {
m.commits = make(map[int64]*types.Commit)
}
@@ -90,9 +86,8 @@ func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error {
}
func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) {
- m.commitMutex.RLock()
- defer m.commitMutex.RUnlock()
-
+ m.stateUpdateMutex.RLock()
+ defer m.stateUpdateMutex.RUnlock()
if m.commits == nil {
m.commits = make(map[int64]*types.Commit)
}
@@ -103,10 +98,7 @@ func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) {
return nil, fmt.Errorf("commit for height %v not found", height)
}
-func (m *MapStorage) InsertState(height int64, state *cometstate.State) error {
- m.statesMutex.Lock()
- defer m.statesMutex.Unlock()
-
+func (m *MapStorage) insertState(height int64, state *cometstate.State) error {
if m.states == nil {
m.states = make(map[int64]*cometstate.State)
}
@@ -116,9 +108,8 @@ func (m *MapStorage) InsertState(height int64, state *cometstate.State) error {
}
func (m *MapStorage) GetState(height int64) (*cometstate.State, error) {
- m.statesMutex.RLock()
- defer m.statesMutex.RUnlock()
-
+ m.stateUpdateMutex.RLock()
+ defer m.stateUpdateMutex.RUnlock()
if m.states == nil {
m.states = make(map[int64]*cometstate.State)
}
@@ -129,10 +120,7 @@ func (m *MapStorage) GetState(height int64) (*cometstate.State, error) {
return nil, fmt.Errorf("state for height %v not found", height)
}
-func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIResponses) error {
- m.responsesMutex.Lock()
- defer m.responsesMutex.Unlock()
-
+func (m *MapStorage) insertResponses(height int64, responses *protostate.ABCIResponses) error {
if m.responses == nil {
m.responses = make(map[int64]*protostate.ABCIResponses)
}
@@ -142,9 +130,8 @@ func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIRes
}
func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, error) {
- m.responsesMutex.RLock()
- defer m.responsesMutex.RUnlock()
-
+ m.stateUpdateMutex.RLock()
+ defer m.stateUpdateMutex.RUnlock()
if m.responses == nil {
m.responses = make(map[int64]*protostate.ABCIResponses)
}
@@ -154,3 +141,21 @@ func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, erro
}
return nil, fmt.Errorf("responses for height %v not found", height)
}
+
+func (m *MapStorage) LockBeforeStateUpdate() {
+ m.stateUpdateMutex.Lock()
+}
+
+func (m *MapStorage) UnlockAfterStateUpdate() {
+ m.stateUpdateMutex.Unlock()
+}
+
+func (m *MapStorage) UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error {
+ m.stateUpdateMutex.Lock()
+ defer m.stateUpdateMutex.Unlock()
+ m.insertBlock(height, block)
+ m.insertCommit(height, commit)
+ m.insertState(height, state)
+ m.insertResponses(height, responses)
+ return nil
+}
From 1dafbe83959015c46ceb8b2c4a2b4ec7f5ca0c9a Mon Sep 17 00:00:00 2001
From: Philip Offtermatt
Date: Mon, 21 Aug 2023 13:52:35 +0200
Subject: [PATCH 2/3] Add mutex around state updates
From 177d73e9d99b264c50c0fae474fb3e499cafde31 Mon Sep 17 00:00:00 2001
From: Philip Offtermatt
Date: Mon, 21 Aug 2023 14:11:39 +0200
Subject: [PATCH 3/3] Fix duplicate mutex lock
---
cometmock/storage/storage.go | 2 --
1 file changed, 2 deletions(-)
diff --git a/cometmock/storage/storage.go b/cometmock/storage/storage.go
index 3e62c7b..e3dfb9f 100644
--- a/cometmock/storage/storage.go
+++ b/cometmock/storage/storage.go
@@ -151,8 +151,6 @@ func (m *MapStorage) UnlockAfterStateUpdate() {
}
func (m *MapStorage) UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error {
- m.stateUpdateMutex.Lock()
- defer m.stateUpdateMutex.Unlock()
m.insertBlock(height, block)
m.insertCommit(height, commit)
m.insertState(height, state)