Skip to content

Commit

Permalink
wip: Payload pushing
Browse files Browse the repository at this point in the history
  • Loading branch information
edmundnoble committed Dec 12, 2024
1 parent ec46852 commit 2cd63c3
Show file tree
Hide file tree
Showing 13 changed files with 384 additions and 245 deletions.
1 change: 1 addition & 0 deletions chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ library
, pem >=0.2
, primitive >= 0.7.1.0
, random >= 1.2
, resource-pool >= 0.4
, rocksdb-haskell-kadena >= 1.1.0
, rosetta >= 1.0
, safe-exceptions >= 0.1
Expand Down
4 changes: 3 additions & 1 deletion src/Chainweb/BlockHeader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ module Chainweb.BlockHeader
(
-- * Newtype wrappers for function parameters
I.ParentHeader(..)
, I.parentHeader
, I._ParentHeader
, I.HasParentHeader(..)
, I.ParentCreationTime(..)
, I.UnminedPayload(..)

-- * Block Payload Hash
, I.BlockPayloadHash
Expand Down
23 changes: 20 additions & 3 deletions src/Chainweb/BlockHeader/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ module Chainweb.BlockHeader.Internal
(
-- * Newtype wrappers for function parameters
ParentHeader(..)
, parentHeader
, _ParentHeader
, HasParentHeader(..)
, ParentCreationTime(..)

-- * Block Payload Hash
Expand Down Expand Up @@ -129,6 +130,9 @@ module Chainweb.BlockHeader.Internal

-- * CAS Constraint
, BlockHeaderCas

-- * As-yet unmined payloads
, UnminedPayload(..)
) where

import Chainweb.BlockCreationTime
Expand Down Expand Up @@ -614,8 +618,21 @@ newtype ParentHeader = ParentHeader
deriving (Show, Eq, Ord, Generic)
deriving anyclass (NFData)

parentHeader :: Lens' ParentHeader BlockHeader
parentHeader = lens _parentHeader $ \_ hdr -> ParentHeader hdr
_ParentHeader :: Iso' ParentHeader BlockHeader
_ParentHeader = iso _parentHeader ParentHeader

class HasParentHeader c where
parentHeader :: Lens' c ParentHeader

instance HasParentHeader ParentHeader where
parentHeader = id

data UnminedPayload
= UnminedPayload
{ unminedParent :: !(Maybe ParentHeader)
, unminedPayload :: !PayloadWithOutputs
}
deriving Show

instance HasChainId ParentHeader where
_chainId = _chainId . _parentHeader
Expand Down
8 changes: 7 additions & 1 deletion src/Chainweb/Chainweb/ChainResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Chainweb.Chainweb.ChainResources
, chainResMempool
, chainResLogger
, chainResPact
, chainResLatestNewBlockVar
, withChainResources
) where

Expand Down Expand Up @@ -55,6 +56,8 @@ import Chainweb.WebPactExecutionService

import Chainweb.Storage.Table.RocksDB
import Chainweb.Counter
import Control.Concurrent.STM
import Chainweb.BlockHeader

-- -------------------------------------------------------------------------- --
-- Single Chain Resources
Expand All @@ -64,6 +67,7 @@ data ChainResources logger = ChainResources
, _chainResLogger :: !logger
, _chainResMempool :: !(MempoolBackend Pact4.UnparsedTransaction)
, _chainResPact :: PactExecutionService
, _chainResLatestNewBlockVar :: !(TMVar UnminedPayload)
}

makeLenses ''ChainResources
Expand Down Expand Up @@ -100,8 +104,9 @@ withChainResources
let mempoolCfg = mempoolCfg0 pexMv
Mempool.withInMemoryMempool (setComponent "mempool" logger) mempoolCfg v $ \mempool -> do
mpc <- MPCon.mkMempoolConsensus mempool cdb $ Just payloadDb
latestNewBlockVar <- newEmptyTMVarIO
withPactService v cid logger (Just txFailuresCounter) mpc cdb
payloadDb pactDbDir pactConfig $ \requestQ -> do
payloadDb pactDbDir latestNewBlockVar pactConfig $ \requestQ -> do
let pex = pes requestQ
putMVar pexMv pex

Expand All @@ -111,6 +116,7 @@ withChainResources
, _chainResLogger = logger
, _chainResMempool = mempool
, _chainResPact = pex
, _chainResLatestNewBlockVar = latestNewBlockVar
}
where
pes requestQ
Expand Down
49 changes: 10 additions & 39 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module Chainweb.Miner.Coordinator

import Control.Concurrent
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM (atomically, retry, TMVar, tryTakeTMVar)
import Control.Concurrent.STM.TVar
import Control.DeepSeq (NFData)
import Control.Lens
Expand Down Expand Up @@ -129,41 +129,16 @@ data MiningCoordination logger tbl = MiningCoordination
, _coord403s :: !(IORef Int)
, _coordConf :: !CoordinationConfig
, _coordUpdateStreamCount :: !(IORef Int)
, _coordPrimedWork :: !(TVar PrimedWork)
, _coordPrimedWork :: !(TMVar UnminedPayload)
}

-- | Precached payloads for Private Miners. This allows new work requests to be
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId WorkState))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

data WorkState
= WorkReady NewBlock
-- ^ We have work ready for the miner
| WorkAlreadyMined BlockHash
-- ^ A block with this parent has already been mined and submitted to the
-- cut pipeline - we don't want to mine it again.
| WorkStale
-- ^ No work has been produced yet with the latest parent block on this
-- chain.
deriving stock (Show)

isWorkReady :: WorkState -> Bool
isWorkReady = \case
WorkReady {} -> True
_ -> False

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
--
-- The key is hash of the current block's payload.
-- The key is hash of the current block's payload and of the parent block.
--
newtype MiningState = MiningState
{ _miningState :: M.Map BlockPayloadHash (T3 Miner PayloadWithOutputs (Time Micros)) }
{ _miningState :: M.Map (BlockPayloadHash, BlockHash) (T3 Miner PayloadWithOutputs (Time Micros)) }
deriving stock (Generic)
deriving newtype (Semigroup, Monoid)

Expand Down Expand Up @@ -197,7 +172,7 @@ newWork
-- ^ this is used to lookup parent headers that are not in the cut
-- itself.
-> PactExecutionService
-> TVar PrimedWork
-> HM.HashMap ChainId (TMVar UnminedPayload)
-> Cut
-> IO (Maybe (T2 WorkHeader PayloadWithOutputs))
newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
Expand All @@ -216,25 +191,21 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
-- to loop and select one of those chains. it is not a normal situation to
-- have no chains with primed work if there are more than a couple chains.
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any isWorkReady mpw)
return mpw
unmined <- maybe retry return (HM.lookup cid mpw)
tryTakeTMVar (HM.lookup cid mpw)
let mr = T2
<$> HM.lookup cid mpw
<$> mpw
<*> getCutExtension c cid

case mr of
Just (T2 WorkStale _) -> do
logFun @T.Text Debug $ "newWork: chain " <> toText cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (WorkAlreadyMined _) _) -> do
Just (T2 Nothing _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has a payload that was already mined"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> toText cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (WorkReady newBlock) extension) -> do
Just (T2 newBlock extension) -> do
let (primedParentHash, primedParentHeight, _) = newBlockParent newBlock
if primedParentHash == view blockHash (_parentHeader (_cutExtensionParent extension))
then do
Expand Down
52 changes: 39 additions & 13 deletions src/Chainweb/Pact/Backend/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@

module Chainweb.Pact.Backend.Utils
( -- * General utils
open2
, chainDbFileName
chainDbFileName
-- * Shared Pact database interactions
, doLookupSuccessful
, commitBlockStateToDatabase
Expand Down Expand Up @@ -64,6 +63,7 @@ module Chainweb.Pact.Backend.Utils
-- * SQLite runners
, withSqliteDb
, startSqliteDb
, startReadSqliteDb
, stopSqliteDb
, withSQLiteConnection
, openSQLiteConnection
Expand Down Expand Up @@ -116,7 +116,7 @@ import Chainweb.Version
import Chainweb.Utils
import Chainweb.BlockHash
import Chainweb.BlockHeight
import Database.SQLite3.Direct hiding (open2)
import Database.SQLite3.Direct
import GHC.Stack (HasCallStack)
import qualified Data.ByteString.Short as SB
import qualified Data.Vector as V
Expand All @@ -127,6 +127,7 @@ import qualified Data.ByteString as BS
import qualified Pact.Types.Persistence as Pact4
import Chainweb.Pact.Backend.Types
import qualified Pact.Core.Persistence as Pact5
import Network.Wai.Middleware.OpenApi (HasReadOnly(readOnly))

-- -------------------------------------------------------------------------- --
-- SQ3.Utf8 Encodings
Expand Down Expand Up @@ -359,6 +360,18 @@ startSqliteDb cid logger dbDir doResetDb = do
resetDb = removeDirectoryRecursive dbDir
sqliteFile = dbDir </> chainDbFileName cid

startReadSqliteDb
:: Logger logger
=> ChainId
-> logger
-> FilePath
-> IO SQLiteEnv
startReadSqliteDb cid logger dbDir = do
logFunctionText logger Debug $ "(read-only) opening sqlitedb named " <> T.pack sqliteFile
openSQLiteConnection sqliteFile chainwebPragmas
where
sqliteFile = dbDir </> chainDbFileName cid

chainDbFileName :: ChainId -> FilePath
chainDbFileName cid = fold
[ "pact-v1-chain-"
Expand All @@ -374,7 +387,25 @@ withSQLiteConnection file ps =
bracket (openSQLiteConnection file ps) closeSQLiteConnection

openSQLiteConnection :: String -> [Pragma] -> IO SQLiteEnv
openSQLiteConnection file ps = open2 file >>= \case
openSQLiteConnection file ps = open_v2
(fromString file)
(collapseFlags [sqlite_open_readwrite , sqlite_open_create , sqlite_open_nomutex])
Nothing -- Nothing corresponds to the nullPtr
>>= \case
Left (err, msg) ->
internalError $
"withSQLiteConnection: Can't open db with "
<> asString (show err) <> ": " <> asString (show msg)
Right r -> do
runPragmas r ps
return r

openReadSQLiteConnection :: String -> [Pragma] -> IO SQLiteEnv
openReadSQLiteConnection file ps = open_v2
(fromString file)
(collapseFlags [sqlite_open_readonly , sqlite_open_create , sqlite_open_nomutex])
Nothing -- Nothing corresponds to the nullPtr
>>= \case
Left (err, msg) ->
internalError $
"withSQLiteConnection: Can't open db with "
Expand Down Expand Up @@ -403,21 +434,16 @@ withTempSQLiteConnection = withSQLiteConnection ""
withInMemSQLiteConnection :: [Pragma] -> (SQLiteEnv -> IO c) -> IO c
withInMemSQLiteConnection = withSQLiteConnection ":memory:"

open2 :: String -> IO (Either (SQ3.Error, SQ3.Utf8) SQ3.Database)
open2 file = open_v2
(fromString file)
(collapseFlags [sqlite_open_readwrite , sqlite_open_create , sqlite_open_fullmutex])
Nothing -- Nothing corresponds to the nullPtr

collapseFlags :: [SQLiteFlag] -> SQLiteFlag
collapseFlags xs =
if Prelude.null xs then error "collapseFlags: You must pass a non-empty list"
else Prelude.foldr1 (.|.) xs
else Prelude.foldl1 (.|.) xs

sqlite_open_readwrite, sqlite_open_create, sqlite_open_fullmutex :: SQLiteFlag
sqlite_open_readwrite, sqlite_open_create, sqlite_open_nomutex :: SQLiteFlag
sqlite_open_readwrite = 0x00000002
sqlite_open_readonly = 0x00000001
sqlite_open_create = 0x00000004
sqlite_open_fullmutex = 0x00010000
sqlite_open_nomutex = 0x00008000

markTableMutation :: Utf8 -> BlockHeight -> Database -> IO ()
markTableMutation tablename blockheight db = do
Expand Down
Loading

0 comments on commit 2cd63c3

Please sign in to comment.