diff --git a/cometmock/abci_client/client.go b/cometmock/abci_client/client.go index 1a3c067..4d6ce43 100644 --- a/cometmock/abci_client/client.go +++ b/cometmock/abci_client/client.go @@ -28,6 +28,9 @@ var GlobalClient *AbciClient // store a mutex that allows only running one block at a time var blockMutex = sync.Mutex{} +// store a mutex that disallows accessing state information while it is being updated +var stateUpdateMutex = sync.Mutex{} + var verbose = false // AbciClient facilitates calls to the ABCI interface of multiple nodes. @@ -720,8 +723,6 @@ func (a *AbciClient) RunBlockWithTimeAndProposer( return nil, nil, nil, nil, nil, err } - a.LastBlock = block - commitSigs := []types.CommitSig{} for index, val := range a.CurState.Validators.Validators { @@ -817,25 +818,14 @@ func (a *AbciClient) RunBlockWithTimeAndProposer( deliverTxResponses = append(deliverTxResponses, resDeliverTx) } - // insert entries into the storage - err = a.Storage.InsertBlock(newHeight, block) - if err != nil { - return nil, nil, nil, nil, nil, err - } - - err = a.Storage.InsertCommit(newHeight, a.LastCommit) - if err != nil { - return nil, nil, nil, nil, nil, err - } + // lock the state update mutex while the stores are updated to avoid + // inconsistencies between stores + a.Storage.LockBeforeStateUpdate() + a.LastBlock = block // copy state so that the historical state is not mutated state := a.CurState.Copy() - err = a.Storage.InsertState(newHeight, &state) - if err != nil { - return nil, nil, nil, nil, nil, err - } - // build components of the state update, then call the update function abciResponses := cmtstate.ABCIResponses{ DeliverTxs: deliverTxResponses, @@ -843,7 +833,8 @@ func (a *AbciClient) RunBlockWithTimeAndProposer( BeginBlock: resBeginBlock, } - err = a.Storage.InsertResponses(newHeight, &abciResponses) + // insert entries into the storage + err = a.Storage.UpdateStores(newHeight, block, a.LastCommit, &state, &abciResponses) if err != nil { return nil, nil, nil, nil, nil, err } @@ -853,6 +844,8 @@ func (a *AbciClient) RunBlockWithTimeAndProposer( if err != nil { return nil, nil, nil, nil, nil, err } + // unlock the state mutex, since we are done updating state + a.Storage.UnlockAfterStateUpdate() resCommit, err := a.SendCommit() if err != nil { diff --git a/cometmock/storage/storage.go b/cometmock/storage/storage.go index 2151d22..e3dfb9f 100644 --- a/cometmock/storage/storage.go +++ b/cometmock/storage/storage.go @@ -12,51 +12,51 @@ import ( // Storage is an interface for storing blocks, commits and states by height. // All methods are thread-safe. type Storage interface { - // InsertBlock inserts a block at a given height. - // If there is already a block at that height, it should be overwritten. - InsertBlock(height int64, block *types.Block) error // GetBlock returns the block at a given height. GetBlock(height int64) (*types.Block, error) - // InsertCommit inserts a commit at a given height. - // If there is already a commit at that height, it should be overwritten. - InsertCommit(height int64, commit *types.Commit) error // GetCommit returns the commit at a given height. GetCommit(height int64) (*types.Commit, error) - // InsertState inserts a state at a given height. This is the state after - // applying the block at that height. - // If there is already a state at that height, it should be overwritten. - InsertState(height int64, state *cometstate.State) error // GetState returns the state at a given height. This is the state after // applying the block at that height. GetState(height int64) (*cometstate.State, error) - // InsertResponses inserts the ABCI responses from a given height. - // If there are already responses at that height, they should be overwritten. - InsertResponses(height int64, responses *protostate.ABCIResponses) error // GetResponses returns the ABCI responses from a given height. GetResponses(height int64) (*protostate.ABCIResponses, error) + + // LockBeforeStateUpdate locks the storage for state update. + LockBeforeStateUpdate() + + // UnlockAfterStateUpdate unlocks the storage for state update. + UnlockAfterStateUpdate() + + // UpdateStores updates the storage with the given block, commit, state and responses. + // It is assumed that the block, commit, state and responses are all from the same height. + // If they are not, the storage will be in an inconsistent state. + // If the storage is already updated with the given height, the storage will overwrite the existing data. + // This method is *not* thread-safe. + // Before calling this, the caller should call LockForStateUpdate(). + // After calling this, the caller should call UnlockForStateUpdate(). + UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error } // MapStorage is a simple in-memory implementation of Storage. type MapStorage struct { - blocks map[int64]*types.Block - blocksMutex sync.RWMutex - commits map[int64]*types.Commit - commitMutex sync.RWMutex - states map[int64]*cometstate.State - statesMutex sync.RWMutex - responses map[int64]*protostate.ABCIResponses - responsesMutex sync.RWMutex + // a mutex that gets locked while the state is being updated, + // so that a) updates do not interleave and b) reads do not happen while + // the state is being updated, i.e. two stores might give bogus data. + stateUpdateMutex sync.RWMutex + blocks map[int64]*types.Block + commits map[int64]*types.Commit + states map[int64]*cometstate.State + responses map[int64]*protostate.ABCIResponses } // ensure MapStorage implements Storage var _ Storage = (*MapStorage)(nil) -func (m *MapStorage) InsertBlock(height int64, block *types.Block) error { - m.blocksMutex.Lock() - defer m.blocksMutex.Unlock() +func (m *MapStorage) insertBlock(height int64, block *types.Block) error { if m.blocks == nil { m.blocks = make(map[int64]*types.Block) } @@ -65,9 +65,8 @@ func (m *MapStorage) InsertBlock(height int64, block *types.Block) error { } func (m *MapStorage) GetBlock(height int64) (*types.Block, error) { - m.blocksMutex.RLock() - defer m.blocksMutex.RUnlock() - + m.stateUpdateMutex.RLock() + defer m.stateUpdateMutex.RUnlock() if m.blocks == nil { m.blocks = make(map[int64]*types.Block) } @@ -77,10 +76,7 @@ func (m *MapStorage) GetBlock(height int64) (*types.Block, error) { return nil, fmt.Errorf("block for height %v not found", height) } -func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error { - m.commitMutex.Lock() - defer m.commitMutex.Unlock() - +func (m *MapStorage) insertCommit(height int64, commit *types.Commit) error { if m.commits == nil { m.commits = make(map[int64]*types.Commit) } @@ -90,9 +86,8 @@ func (m *MapStorage) InsertCommit(height int64, commit *types.Commit) error { } func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) { - m.commitMutex.RLock() - defer m.commitMutex.RUnlock() - + m.stateUpdateMutex.RLock() + defer m.stateUpdateMutex.RUnlock() if m.commits == nil { m.commits = make(map[int64]*types.Commit) } @@ -103,10 +98,7 @@ func (m *MapStorage) GetCommit(height int64) (*types.Commit, error) { return nil, fmt.Errorf("commit for height %v not found", height) } -func (m *MapStorage) InsertState(height int64, state *cometstate.State) error { - m.statesMutex.Lock() - defer m.statesMutex.Unlock() - +func (m *MapStorage) insertState(height int64, state *cometstate.State) error { if m.states == nil { m.states = make(map[int64]*cometstate.State) } @@ -116,9 +108,8 @@ func (m *MapStorage) InsertState(height int64, state *cometstate.State) error { } func (m *MapStorage) GetState(height int64) (*cometstate.State, error) { - m.statesMutex.RLock() - defer m.statesMutex.RUnlock() - + m.stateUpdateMutex.RLock() + defer m.stateUpdateMutex.RUnlock() if m.states == nil { m.states = make(map[int64]*cometstate.State) } @@ -129,10 +120,7 @@ func (m *MapStorage) GetState(height int64) (*cometstate.State, error) { return nil, fmt.Errorf("state for height %v not found", height) } -func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIResponses) error { - m.responsesMutex.Lock() - defer m.responsesMutex.Unlock() - +func (m *MapStorage) insertResponses(height int64, responses *protostate.ABCIResponses) error { if m.responses == nil { m.responses = make(map[int64]*protostate.ABCIResponses) } @@ -142,9 +130,8 @@ func (m *MapStorage) InsertResponses(height int64, responses *protostate.ABCIRes } func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, error) { - m.responsesMutex.RLock() - defer m.responsesMutex.RUnlock() - + m.stateUpdateMutex.RLock() + defer m.stateUpdateMutex.RUnlock() if m.responses == nil { m.responses = make(map[int64]*protostate.ABCIResponses) } @@ -154,3 +141,19 @@ func (m *MapStorage) GetResponses(height int64) (*protostate.ABCIResponses, erro } return nil, fmt.Errorf("responses for height %v not found", height) } + +func (m *MapStorage) LockBeforeStateUpdate() { + m.stateUpdateMutex.Lock() +} + +func (m *MapStorage) UnlockAfterStateUpdate() { + m.stateUpdateMutex.Unlock() +} + +func (m *MapStorage) UpdateStores(height int64, block *types.Block, commit *types.Commit, state *cometstate.State, responses *protostate.ABCIResponses) error { + m.insertBlock(height, block) + m.insertCommit(height, commit) + m.insertState(height, state) + m.insertResponses(height, responses) + return nil +}