Skip to content

Commit

Permalink
Track the last time the ChainDB thread was starved
Browse files Browse the repository at this point in the history
  • Loading branch information
Niols authored and amesgen committed Aug 7, 2024
1 parent 6926278 commit 73187ba
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ traceChainDBEventTestBlockWith tracer = \case
AddedReprocessLoEBlocksToQueue ->
trace $ "Requested ChainSel run"
_ -> pure ()
ChainDB.TraceChainSelStarvationEvent (ChainDB.ChainSelStarvationStarted time) ->
trace $ "ChainSel starvation started at " ++ prettyTime time
ChainDB.TraceChainSelStarvationEvent (ChainDB.ChainSelStarvationEnded time pt) ->
trace $ "ChainSel starvation ended at " ++ prettyTime time ++ " thanks to " ++ terseRealPoint pt
_ -> pure ()
where
trace = traceUnitWith tracer "ChainDB"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(BlockFetchConsensusInterface (..), FetchMode (..),
(BlockFetchConsensusInterface (..),
ChainSelStarvation (..), FetchMode (..),
FromConsensus (..), WhetherReceivingTentativeBlocks (..))
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers,
requiresBootstrapPeers)
Expand All @@ -56,6 +57,7 @@ data ChainDbView m blk = ChainDbView {
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
, getChainSelStarvation :: STM m ChainSelStarvation
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
Expand All @@ -64,6 +66,7 @@ defaultChainDbView chainDB = ChainDbView {
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
}

-- | How to get the wall-clock time of a slot. Note that this is a very
Expand Down Expand Up @@ -349,5 +352,7 @@ mkBlockFetchConsensusInterface
headerForgeUTCTime = slotForgeTime . headerRealPoint . unFromConsensus
blockForgeUTCTime = slotForgeTime . blockRealPoint . unFromConsensus

readChainSelStarvation = getChainSelStarvation chainDB

demoteCSJDynamo :: peer -> m ()
demoteCSJDynamo = void . atomically . Jumping.rotateDynamo csHandlesCol
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (ChainUpdate, MaxSlotNo,
Serialised (..))
import qualified Ouroboros.Network.Block as Network
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))
import Ouroboros.Network.Mock.Chain (Chain (..))
import qualified Ouroboros.Network.Mock.Chain as Chain
import System.FS.API.Types (FsError)
Expand Down Expand Up @@ -334,6 +336,10 @@ data ChainDB m blk = ChainDB {
-- invalid block is detected. These blocks are likely to be valid.
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))

-- | Whether ChainSel is currently starved, or when was last time it
-- stopped being starved.
, getChainSelStarvation :: STM m ChainSelStarvation

, closeDB :: m ()

-- | Return 'True' when the database is open.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl (
, LgrDB.TraceReplayEvent
, SelectionChangedInfo (..)
, TraceAddBlockEvent (..)
, TraceChainSelStarvationEvent (..)
, TraceCopyToImmutableDBEvent (..)
, TraceEvent (..)
, TraceFollowerEvent (..)
Expand Down Expand Up @@ -69,6 +70,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry (WithTempRegistry,
import Ouroboros.Consensus.Util.STM (Fingerprint (..),
WithFingerprint (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

{-------------------------------------------------------------------------------
Initialization
Expand Down Expand Up @@ -177,6 +180,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
copyFuse <- newFuse "copy to immutable db"
chainSelFuse <- newFuse "chain selection"
chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs)
varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing

let env = CDB { cdbImmutableDB = immutableDB
, cdbVolatileDB = volatileDB
Expand All @@ -201,6 +205,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, cdbChainSelQueue = chainSelQueue
, cdbFutureBlocks = varFutureBlocks
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
, cdbChainSelStarvation = varChainSelStarvation
}
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
let chainDB = API.ChainDB
Expand All @@ -218,6 +223,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
, stream = Iterator.stream h
, newFollower = Follower.newFollower h
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
, getChainSelStarvation = getEnvSTM h Query.getChainSelStarvation
, closeDB = closeDB h
, isOpen = isOpen h
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
-- exception (or it errored), notify the blocked thread
withFuse fuse $
bracketOnError
(lift $ getChainSelMessage cdbChainSelQueue)
(lift $ getChainSelMessage starvationTracer cdbChainSelStarvation cdbChainSelQueue)
(\message -> lift $ atomically $ do
case message of
ChainSelReprocessLoEBlocks -> pure ()
Expand All @@ -541,3 +541,5 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint blockToAdd
chainSelSync cdb message)
where
starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query (
, getAnyBlockComponent
, getAnyKnownBlock
, getAnyKnownBlockComponent
, getChainSelStarvation
) where

import qualified Data.Map.Strict as Map
Expand All @@ -42,6 +43,8 @@ import Ouroboros.Consensus.Util.STM (WithFingerprint (..))
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (MaxSlotNo, maxSlotNoFromWithOrigin)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

-- | Return the last @k@ headers.
--
Expand Down Expand Up @@ -148,6 +151,12 @@ getIsInvalidBlock ::
getIsInvalidBlock CDB{..} =
fmap (fmap (fmap invalidBlockReason) . flip Map.lookup) <$> readTVar cdbInvalid

getChainSelStarvation ::
forall m blk. IOLike m
=> ChainDbEnv m blk
-> STM m ChainSelStarvation
getChainSelStarvation CDB {..} = readTVar cdbChainSelStarvation

getIsValid ::
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Trace types
, SelectionChangedInfo (..)
, TraceAddBlockEvent (..)
, TraceChainSelStarvationEvent (..)
, TraceCopyToImmutableDBEvent (..)
, TraceEvent (..)
, TraceFollowerEvent (..)
Expand All @@ -63,6 +64,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, TraceValidationEvent (..)
) where

import Cardano.Prelude (whenM)
import Control.Tracer
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
Expand Down Expand Up @@ -107,6 +109,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

-- | All the serialisation related constraints needed by the ChainDB.
class ( ImmutableDbSerialiseConstraints blk
Expand Down Expand Up @@ -275,6 +279,9 @@ data ChainDbEnv m blk = CDB
-- switch back to a chain containing it. The fragment is usually anchored at
-- a recent immutable tip; if it does not, it will conservatively be treated
-- as the empty fragment anchored in the current immutable tip.
, cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation)
-- ^ Information on the last starvation of ChainSel, whether ongoing or
-- ended recently.
} deriving (Generic)

-- | We include @blk@ in 'showTypeOf' because it helps resolving type families
Expand Down Expand Up @@ -509,9 +516,32 @@ addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
-- queue is empty.
getChainSelMessage :: IOLike m => ChainSelQueue m blk -> m (ChainSelMessage m blk)
getChainSelMessage (ChainSelQueue queue) = atomically $ readTBQueue queue
-- queue is empty; in that case, reports the starvation (and its end) to the
-- callback.
getChainSelMessage
:: (IOLike m, HasHeader blk)
=> Tracer m (TraceChainSelStarvationEvent blk)
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = do
-- NOTE: The test of emptiness and the blocking read are in different STM
-- transactions on purpose.
whenM (atomically $ isEmptyTBQueue queue) $ do
writeTVarIO starvationVar ChainSelStarvationOngoing
traceWith starvationTracer . ChainSelStarvationStarted =<< getMonotonicTime
message <- atomically $ readTBQueue queue
-- If there was a starvation ongoing, we need to report that it is done.
whenM ((== ChainSelStarvationOngoing) <$> readTVarIO starvationVar) $
case message of
ChainSelAddBlock BlockToAdd {blockToAdd} -> do
time <- getMonotonicTime
traceWith starvationTracer $ ChainSelStarvationEnded time $ blockRealPoint blockToAdd
writeTVarIO starvationVar $ ChainSelStarvationEndedAt time
ChainSelReprocessLoEBlocks -> pure ()
return message
where
writeTVarIO v x = atomically $ writeTVar v x

-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
--
Expand Down Expand Up @@ -545,6 +575,7 @@ data TraceEvent blk
| TraceImmutableDBEvent (ImmutableDB.TraceEvent blk)
| TraceVolatileDBEvent (VolatileDB.TraceEvent blk)
| TraceLastShutdownUnclean
| TraceChainSelStarvationEvent(TraceChainSelStarvationEvent blk)
deriving (Generic)


Expand Down Expand Up @@ -877,3 +908,8 @@ data TraceIteratorEvent blk
-- next block we're looking for.
| SwitchBackToVolatileDB
deriving (Generic, Eq, Show)

data TraceChainSelStarvationEvent blk
= ChainSelStarvationStarted Time
| ChainSelStarvationEnded Time (RealPoint blk)
deriving (Generic, Eq, Show)
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,8 @@ deriving instance SOP.Generic (ImmutableDB.TraceEvent blk)
deriving instance SOP.HasDatatypeInfo (ImmutableDB.TraceEvent blk)
deriving instance SOP.Generic (VolatileDB.TraceEvent blk)
deriving instance SOP.HasDatatypeInfo (VolatileDB.TraceEvent blk)
deriving instance SOP.Generic (TraceChainSelStarvationEvent blk)
deriving instance SOP.HasDatatypeInfo (TraceChainSelStarvationEvent blk)

data Tag =
TagGetIsValidJust
Expand Down Expand Up @@ -1632,7 +1634,7 @@ traceEventName = \case
TraceImmutableDBEvent ev -> "ImmutableDB." <> constrName ev
TraceVolatileDBEvent ev -> "VolatileDB." <> constrName ev
TraceLastShutdownUnclean -> "LastShutdownUnclean"
TraceChainSelStarvationEvent _ -> "TraceChainSelStarvationEvent"
TraceChainSelStarvationEvent ev -> "ChainSelStarvation." <> constrName ev

mkArgs :: IOLike m
=> TopLevelConfig Blk
Expand Down

0 comments on commit 73187ba

Please sign in to comment.