Skip to content

Commit

Permalink
Add EpochSyncTable
Browse files Browse the repository at this point in the history
This table is mainly for sync time performance validation. Different
machines are likley to have different results in this table.

Closes: #621
  • Loading branch information
erikd committed May 31, 2021
1 parent 4b3022a commit 321a6eb
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 22 deletions.
1 change: 1 addition & 0 deletions cardano-db-sync/cardano-db-sync.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ library
Cardano.DbSync.Era.Byron.Insert
Cardano.DbSync.Era.Byron.Util

Cardano.DbSync.Era.Cardano.Insert
Cardano.DbSync.Era.Cardano.Util

Cardano.DbSync.Era.Shelley.Genesis
Expand Down
44 changes: 44 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Era/Cardano/Insert.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NoImplicitPrelude #-}
module Cardano.DbSync.Era.Cardano.Insert
( insertEpochSyncTime
) where

import qualified Cardano.Db as Db

import Cardano.Prelude hiding (STM, atomically)

import Cardano.Slotting.Slot (EpochNo (..))

import Cardano.Sync.Types (SyncState, renderSyncState)

import Control.Monad.Class.MonadSTM.Strict (MonadSTM, StrictTVar, STM, atomically, readTVar, writeTVar)
import Control.Monad.Trans.Control (MonadBaseControl)

import Data.Time.Clock (UTCTime)
import qualified Data.Time.Clock as Time

import Database.Persist.Sql (SqlBackend)


insertEpochSyncTime
:: (MonadBaseControl IO m, MonadIO m)
=> EpochNo -> SyncState -> StrictTVar IO (Maybe UTCTime) -> ReaderT SqlBackend m ()
insertEpochSyncTime epochNo syncState estvar = do
now <- liftIO $ Time.getCurrentTime
mlast <- liftIO . atomically $ swapTVar estvar (Just now)
let mdiff = maybe Nothing (Just . Time.diffUTCTime now) mlast
void . Db.insertEpochSyncTime $
Db.EpochSyncTime
{ Db.epochSyncTimeNo = unEpochNo epochNo
, Db.epochSyncTimeSeconds = realToFrac <$> mdiff
, Db.epochSyncTimeState = renderSyncState syncState
}


swapTVar :: MonadSTM m => StrictTVar m a -> a -> STM m a
swapTVar tvar !new = do
old <- readTVar tvar
writeTVar tvar new
pure old
4 changes: 3 additions & 1 deletion cardano-db-sync/src/Cardano/DbSync/Plugin/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Cardano.BM.Trace (Trace, logInfo)
import qualified Cardano.Db as DB

import Cardano.DbSync.Era.Byron.Insert (insertByronBlock)
import Cardano.DbSync.Era.Cardano.Insert (insertEpochSyncTime)
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, postEpochRewards,
postEpochStake)
Expand Down Expand Up @@ -115,7 +116,8 @@ handleLedgerEvents tracer lenv =
=> LedgerEvent -> ExceptT SyncNodeError (ReaderT SqlBackend m) ()
printer ev =
case ev of
LedgerNewEpoch en ->
LedgerNewEpoch en ss -> do
lift $ insertEpochSyncTime en ss (leEpochSyncTime lenv)
liftIO . logInfo tracer $ "Starting epoch " <> textShow (unEpochNo en)
LedgerRewards details rwds -> do
let progress = calcEpochProgress 4 details
Expand Down
65 changes: 46 additions & 19 deletions cardano-db/src/Cardano/Db/Insert.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Cardano.Db.Insert
, insertEpoch
, insertEpochParam
, insertEpochStake
, insertEpochSyncTime
, insertMaTxMint
, insertMaTxOut
, insertMeta
Expand Down Expand Up @@ -58,13 +59,13 @@ import Data.Proxy (Proxy (..))
import Data.Text (Text)
import qualified Data.Text as Text

import Database.Persist.Class (AtLeastOneUniqueKey, PersistEntityBackend, insert)
import Database.Persist.Class (AtLeastOneUniqueKey, PersistEntityBackend, insert, insertBy, replaceUnique)
import Database.Persist.Sql (OnlyOneUniqueKey, PersistRecordBackend, SqlBackend,
UniqueDef, entityDB, entityDef, entityUniques, rawSql, toPersistFields,
toPersistValue, uniqueDBName)
import qualified Database.Persist.Sql.Util as Util
import Database.Persist.Types (ConstraintNameDB (..), EntityNameDB (..), FieldNameDB (..),
PersistValue)
PersistValue, entityKey)
import Database.PostgreSQL.Simple (SqlError)

import Cardano.Db.Schema
Expand Down Expand Up @@ -110,6 +111,9 @@ insertEpochParam = insertUnchecked "EpochParam"
insertEpochStake :: (MonadBaseControl IO m, MonadIO m) => EpochStake -> ReaderT SqlBackend m EpochStakeId
insertEpochStake = insertUnchecked "EpochStake"

insertEpochSyncTime :: (MonadBaseControl IO m, MonadIO m) => EpochSyncTime -> ReaderT SqlBackend m EpochSyncTimeId
insertEpochSyncTime = insertReplace "EpochSyncTime"

insertMaTxMint :: (MonadBaseControl IO m, MonadIO m) => MaTxMint -> ReaderT SqlBackend m MaTxMintId
insertMaTxMint = insertCheckUnique "insertMaTxMint"

Expand Down Expand Up @@ -199,23 +203,6 @@ data DbInsertException

instance Exception DbInsertException

-- Insert without checking uniqueness constraints. This should be safe for most tables
-- even tables with uniqueness constraints, especially block, tx and many others, where
-- uniqueness is enforced by the ledger.
insertUnchecked
:: ( AtLeastOneUniqueKey record
, MonadIO m
, MonadBaseControl IO m
, PersistEntityBackend record ~ SqlBackend
)
=> String -> record -> ReaderT SqlBackend m (Key record)
insertUnchecked vtype =
handle exceptHandler . insert
where
exceptHandler :: MonadIO m => SqlError -> ReaderT SqlBackend m a
exceptHandler e =
liftIO $ throwIO (DbInsertException vtype e)

-- Insert, getting PostgreSQL to check the uniqueness constaint, and if it is violated, rewrite
-- the first field with the same value to force PostgresSQL to return the row identifier.
insertCheckUnique
Expand Down Expand Up @@ -263,6 +250,46 @@ insertCheckUnique vtype record = do
exceptHandler e =
liftIO $ throwIO (DbInsertException vtype e)

insertReplace
:: forall m record.
( AtLeastOneUniqueKey record
, Eq (Unique record)
, MonadBaseControl IO m
, MonadIO m
, PersistRecordBackend record SqlBackend
)
=> String -> record -> ReaderT SqlBackend m (Key record)
insertReplace vtype record =
handle exceptHandler $ do
eres <- insertBy record
case eres of
Right rid -> pure rid
Left rec -> do
mres <- replaceUnique (entityKey rec) record
maybe (pure $ entityKey rec) (const . pure $ entityKey rec) mres
where
exceptHandler :: SqlError -> ReaderT SqlBackend m a
exceptHandler e =
liftIO $ throwIO (DbInsertException vtype e)

-- Insert without checking uniqueness constraints. This should be safe for most tables
-- even tables with uniqueness constraints, especially block, tx and many others, where
-- uniqueness is enforced by the ledger.
insertUnchecked
:: ( AtLeastOneUniqueKey record
, MonadIO m
, MonadBaseControl IO m
, PersistEntityBackend record ~ SqlBackend
)
=> String -> record -> ReaderT SqlBackend m (Key record)
insertUnchecked vtype =
handle exceptHandler . insert
where
exceptHandler :: MonadIO m => SqlError -> ReaderT SqlBackend m a
exceptHandler e =
liftIO $ throwIO (DbInsertException vtype e)


-- This is cargo culted from Persistent because it is not exported.
escapeFieldName :: FieldNameDB -> Text
escapeFieldName (FieldNameDB s) =
Expand Down
9 changes: 9 additions & 0 deletions cardano-db/src/Cardano/Db/Schema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import Data.Word (Word16, Word64)
-- Do not use explicit imports from this module as the imports can change
-- from version to version due to changes to the TH code in Persistent.
import Database.Persist.TH
import Database.Persist.Class (Unique)

-- In the schema definition we need to match Haskell types with with the
-- custom type defined in PostgreSQL (via 'DOMAIN' statements). For the
Expand Down Expand Up @@ -412,6 +413,12 @@ share
UniquePoolOfflineFetchError poolId fetchTime retryCount
deriving Show

EpochSyncTime
no Word64
seconds Double Maybe
state Text sqltype=syncstatetype
UniqueEpochSyncTime no

--------------------------------------------------------------------------
-- Tables below must be preserved when migrations occur!
--------------------------------------------------------------------------
Expand All @@ -433,3 +440,5 @@ share
deriving Show

|]

deriving instance Eq (Unique EpochSyncTime)
8 changes: 6 additions & 2 deletions cardano-sync/src/Cardano/Sync/LedgerState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.Strict.Maybe as Strict
import qualified Data.Text as Text
import Data.Time.Clock (UTCTime)

import Ouroboros.Consensus.Block (CodecConfig, WithOrigin (..), blockHash, blockIsEBB,
blockPrevHash, withOrigin)
Expand Down Expand Up @@ -133,10 +134,11 @@ data LedgerEnv = LedgerEnv
, leBulkOpQueue :: !(TBQueue IO BulkOperation)
, leOfflineWorkQueue :: !(TBQueue IO PoolFetchRetry)
, leOfflineResultQueue :: !(TBQueue IO FetchResult)
, leEpochSyncTime :: !(StrictTVar IO (Maybe UTCTime))
}

data LedgerEvent
= LedgerNewEpoch !EpochNo
= LedgerNewEpoch !EpochNo !SyncState
| LedgerRewards !SlotDetails !Generic.Rewards
| LedgerStakeDist !Generic.StakeDist
deriving Eq
Expand Down Expand Up @@ -187,6 +189,7 @@ mkLedgerEnv protocolInfo dir network slot deleteFiles = do
boq <- newTBQueueIO 10800
owq <- newTBQueueIO 100
orq <- newTBQueueIO 100
est <- newTVarIO Nothing
pure LedgerEnv
{ leProtocolInfo = protocolInfo
, leDir = dir
Expand All @@ -197,6 +200,7 @@ mkLedgerEnv protocolInfo dir network slot deleteFiles = do
, leBulkOpQueue = boq
, leOfflineWorkQueue = owq
, leOfflineResultQueue = orq
, leEpochSyncTime = est
}
where
initLedgerEventState :: LedgerEventState
Expand Down Expand Up @@ -262,7 +266,7 @@ generateEvents env oldEventState details cls = do
writeTVar (leEventState env) newEventState
pure $ catMaybes
[ if lesEpochNo oldEventState < currentEpochNo
then Just (LedgerNewEpoch currentEpochNo) else Nothing
then Just (LedgerNewEpoch currentEpochNo (getSyncStatus details)) else Nothing
, LedgerRewards details <$> rewards
, LedgerStakeDist <$> stakeDist
]
Expand Down
20 changes: 20 additions & 0 deletions cardano-sync/src/Cardano/Sync/Types.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Cardano.Sync.Types
( BlockDetails (..)
, CardanoBlock
Expand All @@ -14,6 +15,9 @@ module Cardano.Sync.Types
, MetricSetters (..)
, PoolFetchRetry (..)
, Retry (..)

, readSyncState
, renderSyncState
) where

import Cardano.Prelude hiding (Meta)
Expand All @@ -25,6 +29,7 @@ import Cardano.Sync.Config.Types (CardanoBlock, CardanoProtocol)

import Cardano.Slotting.Slot (EpochNo (..), EpochSize (..), SlotNo (..))

import qualified Data.Text as Text
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (POSIXTime)

Expand Down Expand Up @@ -99,3 +104,18 @@ data Retry = Retry
, retryRetryTime :: !POSIXTime -- Time to retry
, retryCount :: !Word
} deriving (Eq, Show, Generic)

readSyncState :: Text -> SyncState
readSyncState str =
case str of
"lagging" -> SyncLagging
"following" -> SyncFollowing
-- This should never happen. On the Postgres side we defined an ENUM with
-- only the two values as above.
_other -> error $ "readSyncState: Unknown SyncState " ++ Text.unpack str

renderSyncState :: SyncState -> Text
renderSyncState ss =
case ss of
SyncFollowing -> "following"
SyncLagging -> "lagging"
25 changes: 25 additions & 0 deletions schema/migration-1-0006-20210531.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Hand written migration to create the custom types with 'DOMAIN' statements.

CREATE FUNCTION migrate() RETURNS void AS $$

DECLARE
next_version int;

BEGIN
SELECT stage_one + 1 INTO next_version FROM "schema_version";
IF next_version = 6 THEN

-- Would normally put this inside an "EXECUTE" statement, but that does not work for some
-- reason and this does.
CREATE TYPE syncstatetype AS ENUM ('lagging', 'following');

UPDATE "schema_version" SET stage_one = next_version;
RAISE NOTICE 'DB has been migrated to stage_one version %', next_version;
END IF;
END;

$$ LANGUAGE plpgsql;

SELECT migrate();

DROP FUNCTION migrate();
20 changes: 20 additions & 0 deletions schema/migration-2-0003-20210531.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- Persistent generated migration.

CREATE FUNCTION migrate() RETURNS void AS $$
DECLARE
next_version int ;
BEGIN
SELECT stage_two + 1 INTO next_version FROM schema_version ;
IF next_version = 3 THEN
EXECUTE 'CREATe TABLE "epoch_sync_time"("id" SERIAL8 PRIMARY KEY UNIQUE,"no" INT8 NOT NULL,"seconds" DOUBLE PRECISION NULL,"state" syncstatetype NOT NULL)' ;
EXECUTE 'ALTER TABLE "epoch_sync_time" ADD CONSTRAINT "unique_epoch_sync_time" UNIQUE("no")' ;
-- Hand written SQL statements can be added here.
UPDATE schema_version SET stage_two = next_version ;
RAISE NOTICE 'DB has been migrated to stage_two version %', next_version ;
END IF ;
END ;
$$ LANGUAGE plpgsql ;

SELECT migrate() ;

DROP FUNCTION migrate() ;

0 comments on commit 321a6eb

Please sign in to comment.