Skip to content

Commit

Permalink
Merge pull request #41 from informalsystems/ph/backport-racefix-v0.37.x
Browse files Browse the repository at this point in the history
Ph/backport racefix v0.37.x
  • Loading branch information
p-offtermatt authored Aug 21, 2023
2 parents a80dc16 + a7cd9ab commit 37ddbd7
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 66 deletions.
29 changes: 11 additions & 18 deletions cometmock/abci_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ var GlobalClient *AbciClient
// store a mutex that allows only running one block at a time
var blockMutex = sync.Mutex{}

// store a mutex that disallows accessing state information while it is being updated
var stateUpdateMutex = sync.Mutex{}

var verbose = false

// AbciClient facilitates calls to the ABCI interface of multiple nodes.
Expand Down Expand Up @@ -716,8 +719,6 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
return nil, nil, nil, nil, nil, err
}

a.LastBlock = block

commitSigs := []types.CommitSig{}

for index, val := range a.CurState.Validators.Validators {
Expand Down Expand Up @@ -813,33 +814,23 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
deliverTxResponses = append(deliverTxResponses, resDeliverTx)
}

// insert entries into the storage
err = a.Storage.InsertBlock(newHeight, block)
if err != nil {
return nil, nil, nil, nil, nil, err
}

err = a.Storage.InsertCommit(newHeight, a.LastCommit)
if err != nil {
return nil, nil, nil, nil, nil, err
}
// lock the state update mutex while the stores are updated to avoid
// inconsistencies between stores
a.Storage.LockBeforeStateUpdate()
a.LastBlock = block

// copy state so that the historical state is not mutated
state := a.CurState.Copy()

err = a.Storage.InsertState(newHeight, &state)
if err != nil {
return nil, nil, nil, nil, nil, err
}

// build components of the state update, then call the update function
abciResponses := cmtstate.ABCIResponses{
DeliverTxs: deliverTxResponses,
EndBlock: resEndBlock,
BeginBlock: resBeginBlock,
}

err = a.Storage.InsertResponses(newHeight, &abciResponses)
// insert entries into the storage
err = a.Storage.UpdateStores(newHeight, block, a.LastCommit, &state, &abciResponses)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand All @@ -849,6 +840,8 @@ func (a *AbciClient) RunBlockWithTimeAndProposer(
if err != nil {
return nil, nil, nil, nil, nil, err
}
// unlock the state mutex, since we are done updating state
a.Storage.UnlockAfterStateUpdate()

resCommit, err := a.SendCommit()
if err != nil {
Expand Down
99 changes: 51 additions & 48 deletions cometmock/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,51 @@ import (
// Storage is an interface for storing blocks, commits and states by height.
// All methods are thread-safe.
type Storage interface {
// InsertBlock inserts a block at a given height.
// If there is already a block at that height, it should be overwritten.
InsertBlock(height int64, block *types.Block) error
// GetBlock returns the block at a given height.
GetBlock(height int64) (*types.Block, error)

// InsertCommit inserts a commit at a given height.
// If there is already a commit at that height, it should be overwritten.
InsertCommit(height int64, commit *types.Commit) error
// GetCommit returns the commit at a given height.
GetCommit(height int64) (*types.Commit, error)

// InsertState inserts a state at a given height. This is the state after
// applying the block at that height.
// If there is already a state at that height, it should be overwritten.
InsertState(height int64, state *cometstate.State) error
// GetState returns the state at a given height. This is the state after
// applying the block at that height.
GetState(height int64) (*cometstate.State, error)

// InsertResponses inserts the ABCI responses from a given height.
// If there are already responses at that height, they should be overwritten.
InsertResponses(height int64, responses *protostate.ABCIResponses) error
// GetResponses returns the ABCI responses from a given height.
GetResponses(height int64) (*protostate.ABCIResponses, error)

// LockBeforeStateUpdate locks the storage for state update.
LockBeforeStateUpdate()

// UnlockAfterStateUpdate unlocks the storage for state update.
UnlockAfterStateUpdate()

// UpdateStores updates the storage with the given block, commit, state and responses.
// It is assumed that the block, commit, state and responses are all from the same height.
// If they are not, the storage will be in an inconsistent state.
// If the storage is already updated with the given height, the storage will overwrite the existing data.
// This method is *not* thread-safe.
// Before calling this, the caller should call LockForStateUpdate().
// After calling this, the caller should call UnlockForStateUpdate().
UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error
}

// MapStorage is a simple in-memory implementation of Storage.
type MapStorage struct {
blocks map[int64]*types.Block
blocksMutex sync.RWMutex
commits map[int64]*types.Commit
commitMutex sync.RWMutex
states map[int64]*cometstate.State
statesMutex sync.RWMutex
responses map[int64]*protostate.ABCIResponses
responsesMutex sync.RWMutex
// a mutex that gets locked while the state is being updated,
// so that a) updates do not interleave and b) reads do not happen while
// the state is being updated, i.e. two stores might give bogus data.
stateUpdateMutex sync.RWMutex
blocks map[int64]*types.Block
commits map[int64]*types.Commit
states map[int64]*cometstate.State
responses map[int64]*protostate.ABCIResponses
}

// ensure MapStorage implements Storage
var _ Storage = (*MapStorage)(nil)

func (m *MapStorage) InsertBlock(height int64, block *types.Block) error {
m.blocksMutex.Lock()
defer m.blocksMutex.Unlock()
func (m *MapStorage) insertBlock(height int64, block *types.Block) error {
if m.blocks == nil {
m.blocks = make(map[int64]*types.Block)
}
Expand All @@ -65,9 +65,8 @@ func (m *MapStorage) InsertBlock(height int64, block *types.Block) error {
}

func (m *MapStorage) GetBlock(height int64) (*types.Block, error) {
m.blocksMutex.RLock()
defer m.blocksMutex.RUnlock()

m.stateUpdateMutex.RLock()
defer m.stateUpdateMutex.RUnlock()
if m.blocks == nil {
m.blocks = make(map[int64]*types.Block)
}
Expand All @@ -77,10 +76,7 @@ func (m *MapStorage) GetBlock(height int64) (*types.Block, error) {
return nil, fmt.Errorf("block for height %v not found", height)
}

func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error {
m.commitMutex.Lock()
defer m.commitMutex.Unlock()

func (m *MapStorage) insertCommit(height int64, commit *types.Commit) error {
if m.commits == nil {
m.commits = make(map[int64]*types.Commit)
}
Expand All @@ -90,9 +86,8 @@ func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error {
}

func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) {
m.commitMutex.RLock()
defer m.commitMutex.RUnlock()

m.stateUpdateMutex.RLock()
defer m.stateUpdateMutex.RUnlock()
if m.commits == nil {
m.commits = make(map[int64]*types.Commit)
}
Expand All @@ -103,10 +98,7 @@ func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) {
return nil, fmt.Errorf("commit for height %v not found", height)
}

func (m *MapStorage) InsertState(height int64, state *cometstate.State) error {
m.statesMutex.Lock()
defer m.statesMutex.Unlock()

func (m *MapStorage) insertState(height int64, state *cometstate.State) error {
if m.states == nil {
m.states = make(map[int64]*cometstate.State)
}
Expand All @@ -116,9 +108,8 @@ func (m *MapStorage) InsertState(height int64, state *cometstate.State) error {
}

func (m *MapStorage) GetState(height int64) (*cometstate.State, error) {
m.statesMutex.RLock()
defer m.statesMutex.RUnlock()

m.stateUpdateMutex.RLock()
defer m.stateUpdateMutex.RUnlock()
if m.states == nil {
m.states = make(map[int64]*cometstate.State)
}
Expand All @@ -129,10 +120,7 @@ func (m *MapStorage) GetState(height int64) (*cometstate.State, error) {
return nil, fmt.Errorf("state for height %v not found", height)
}

func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIResponses) error {
m.responsesMutex.Lock()
defer m.responsesMutex.Unlock()

func (m *MapStorage) insertResponses(height int64, responses *protostate.ABCIResponses) error {
if m.responses == nil {
m.responses = make(map[int64]*protostate.ABCIResponses)
}
Expand All @@ -142,9 +130,8 @@ func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIRes
}

func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, error) {
m.responsesMutex.RLock()
defer m.responsesMutex.RUnlock()

m.stateUpdateMutex.RLock()
defer m.stateUpdateMutex.RUnlock()
if m.responses == nil {
m.responses = make(map[int64]*protostate.ABCIResponses)
}
Expand All @@ -154,3 +141,19 @@ func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, erro
}
return nil, fmt.Errorf("responses for height %v not found", height)
}

func (m *MapStorage) LockBeforeStateUpdate() {
m.stateUpdateMutex.Lock()
}

func (m *MapStorage) UnlockAfterStateUpdate() {
m.stateUpdateMutex.Unlock()
}

func (m *MapStorage) UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error {
m.insertBlock(height, block)
m.insertCommit(height, commit)
m.insertState(height, state)
m.insertResponses(height, responses)
return nil
}

0 comments on commit 37ddbd7

Please sign in to comment.