diff --git a/CHANGELOG.md b/CHANGELOG.md index d7fb32e..cf1cda1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.7 - Oct 2024 + +* Updated to `typed-protocols-0.3`. + ## 0.6.0 - Sep 2024 * Remove potentially leaky continuation passing of `EKGForwarder`. diff --git a/cabal.project b/cabal.project index 1765b39..9b7c8fc 100644 --- a/cabal.project +++ b/cabal.project @@ -1,7 +1,7 @@ -- Custom repository for cardano haskell packages, see -- ouroboros-network/CONTRIBUTING for more repository cardano-haskell-packages - url: https://input-output-hk.github.io/cardano-haskell-packages + url: https://chap.intersectmbo.org/ secure: True root-keys: 3e0cce471cf09815f930210f7827266fd09045445d65923e6d0238a6cd15126f @@ -16,8 +16,8 @@ repository cardano-haskell-packages -- Bump this if you need newer packages from Hackage index-state: - , hackage.haskell.org 2024-09-05T18:39:40Z - , cardano-haskell-packages 2024-09-10T12:51:27Z + , hackage.haskell.org 2024-10-24T18:39:40Z + , cardano-haskell-packages 2024-10-24T07:10:59Z packages: ./. diff --git a/ekg-forward.cabal b/ekg-forward.cabal index 8e3b980..049a75b 100644 --- a/ekg-forward.cabal +++ b/ekg-forward.cabal @@ -1,6 +1,6 @@ cabal-version: 2.4 name: ekg-forward -version: 0.6 +version: 0.7 synopsis: See README for more info description: See README for more info homepage: https://github.com/input-output-hk/ekg-forward @@ -65,12 +65,13 @@ library , io-classes >= 1.4.1 , network , ouroboros-network-api - , ouroboros-network-framework >= 0.8 && < 0.14 + , ouroboros-network-framework ^>= 0.14 + , singletons ^>= 3.0 , serialise , stm , text , time - , typed-protocols ^>= 0.1.1 + , typed-protocols ^>= 0.3 , typed-protocols-cborg , unordered-containers diff --git a/src/System/Metrics/Network/Forwarder.hs b/src/System/Metrics/Network/Forwarder.hs index 279c5cb..54c2435 100644 --- a/src/System/Metrics/Network/Forwarder.hs +++ b/src/System/Metrics/Network/Forwarder.hs @@ -1,6 +1,9 @@ +{-# LANGUAGE BlockArguments #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE PackageImports #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} module System.Metrics.Network.Forwarder ( connectToAcceptor @@ -16,29 +19,30 @@ import qualified Codec.Serialise as CBOR import "contra-tracer" Control.Tracer (nullTracer) import qualified Data.ByteString.Lazy as LBS import qualified Data.Text as T -import Data.Void (Void) +import Data.Void (Void, absurd) import qualified Network.Socket as Socket +import Network.TypedProtocol.Codec +import Control.Exception (throwIO) import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext) import Ouroboros.Network.Driver.Simple (runPeer) import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits) import Ouroboros.Network.IOManager (withIOManager) -import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..), - MiniProtocolLimits (..), MiniProtocolNum (..), - MuxMode (..), OuroborosApplication (..), - RunMiniProtocol (..), +import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..), MiniProtocolLimits (..), MiniProtocolNum (..), + MuxMode (..), OuroborosApplication (..), RunMiniProtocol (..), miniProtocolLimits, miniProtocolNum, miniProtocolRun) -import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake, +import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec, noTimeLimitsHandshake, timeLimitsHandshake) import Ouroboros.Network.Protocol.Handshake.Type (Handshake) import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..), UnversionedProtocolData (..), unversionedHandshakeCodec, unversionedProtocolDataCodec) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion, simpleSingletonVersions) + +import Ouroboros.Network.Protocol.Handshake.Version (Versions, acceptableVersion, queryVersion, simpleSingletonVersions) import Ouroboros.Network.Snocket (MakeBearer, Snocket, localAddressFromPath, localSnocket, socketSnocket, makeLocalBearer, makeSocketBearer) -import Ouroboros.Network.Socket (HandshakeCallbacks (..), connectToNode, nullNetworkConnectTracers) +import Ouroboros.Network.Socket (NetworkConnectTracers(..), HandshakeCallbacks (..), ConnectToArgs (..), connectToNode, nullNetworkConnectTracers) import qualified System.Metrics as EKG import System.Metrics.Configuration (ForwarderConfiguration (..), HowToConnect (..)) @@ -50,7 +54,7 @@ connectToAcceptor :: ForwarderConfiguration -> EKG.Store -> IO () -connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \iocp -> do +connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager \iocp -> do let app = forwarderApp config ekgStore case acceptorEndpoint of LocalPipe localPipe -> do @@ -64,7 +68,8 @@ connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \ doConnectToAcceptor snocket makeSocketBearer mempty address timeLimitsHandshake app doConnectToAcceptor - :: Snocket IO fd addr + :: forall fd addr. () + => Snocket IO fd addr -> MakeBearer IO fd -> (fd -> IO ()) -- ^ configure socket -> addr @@ -75,21 +80,47 @@ doConnectToAcceptor LBS.ByteString IO () Void -> IO () doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app = - connectToNode - snocket - makeBearer - configureSocket - unversionedHandshakeCodec - timeLimits - unversionedProtocolDataCodec - nullNetworkConnectTracers - (HandshakeCallbacks acceptableVersion queryVersion) - (simpleSingletonVersions + + let + connectToArgs :: ConnectToArgs fd addr UnversionedProtocol UnversionedProtocolData + connectToArgs = ConnectToArgs + { ctaHandshakeCodec = unversionedHandshakeCodec + :: Codec (Handshake UnversionedProtocol Term) CBOR.DeserialiseFailure IO LBS.ByteString + , ctaHandshakeTimeLimits = timeLimits + :: ProtocolTimeLimits (Handshake UnversionedProtocol Term) + , ctaVersionDataCodec = unversionedProtocolDataCodec + :: VersionDataCodec Term UnversionedProtocol UnversionedProtocolData + , ctaConnectTracers = nullNetworkConnectTracers + :: NetworkConnectTracers addr UnversionedProtocol + , ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion + :: HandshakeCallbacks UnversionedProtocolData + } + + versions :: Versions UnversionedProtocol UnversionedProtocolData + (OuroborosApplication 'InitiatorMode (MinimalInitiatorContext addr) (ResponderContext addr) LBS.ByteString IO () Void) + versions = simpleSingletonVersions UnversionedProtocol UnversionedProtocolData - app) - Nothing - address + app + + localAddress :: Maybe addr + remoteAddress :: addr + (localAddress, remoteAddress) = (Nothing, address) + + in do + res <- connectToNode + snocket + makeBearer + connectToArgs + configureSocket + versions + localAddress + remoteAddress + + case res of + Left err -> throwIO err + Right (Left ()) -> pure () + Right (Right void) -> absurd void forwarderApp :: ForwarderConfiguration @@ -109,7 +140,7 @@ forwardEKGMetrics -> EKG.Store -> RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void forwardEKGMetrics config ekgStore = - InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel -> + InitiatorProtocolOnly $ MiniProtocolCb \_ctx channel -> runPeer (forwarderTracer config) (Forwarder.codecEKGForward CBOR.encode CBOR.decode @@ -122,7 +153,7 @@ forwardEKGMetricsResp -> EKG.Store -> RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void () forwardEKGMetricsResp config ekgStore = - ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel -> + ResponderProtocolOnly $ MiniProtocolCb \_ctx channel -> runPeer (forwarderTracer config) (Forwarder.codecEKGForward CBOR.encode CBOR.decode @@ -133,7 +164,7 @@ forwardEKGMetricsResp config ekgStore = forwardEKGMetricsDummy :: RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void forwardEKGMetricsDummy = - InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel -> + InitiatorProtocolOnly $ MiniProtocolCb \_ctx channel -> runPeer nullTracer (Forwarder.codecEKGForward CBOR.encode CBOR.decode @@ -144,7 +175,7 @@ forwardEKGMetricsDummy = forwardEKGMetricsRespDummy :: RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void () forwardEKGMetricsRespDummy = - ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel -> + ResponderProtocolOnly $ MiniProtocolCb \_ctx channel -> runPeer nullTracer (Forwarder.codecEKGForward CBOR.encode CBOR.decode diff --git a/src/System/Metrics/Protocol/Acceptor.hs b/src/System/Metrics/Protocol/Acceptor.hs index b7b2d0e..dca4173 100644 --- a/src/System/Metrics/Protocol/Acceptor.hs +++ b/src/System/Metrics/Protocol/Acceptor.hs @@ -14,9 +14,7 @@ module System.Metrics.Protocol.Acceptor ( , ekgAcceptorPeer ) where -import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), - PeerRole (..)) - +import Network.TypedProtocol.Peer.Client import System.Metrics.Protocol.Type -- | Please note that the acceptor is a server from the __networking__ @@ -38,14 +36,14 @@ data EKGAcceptor req resp m a where ekgAcceptorPeer :: Monad m => EKGAcceptor req resp m a - -> Peer (EKGForward req resp) 'AsClient 'StIdle m a + -> Client (EKGForward req resp) 'NonPipelined 'StIdle m a ekgAcceptorPeer = \case SendMsgReq req next -> -- Send our message (request for the new metrics from the forwarder). - Yield (ClientAgency TokIdle) (MsgReq req) $ + Yield (MsgReq req) $ -- We're now into the 'StBusy' state, and now we'll wait for a reply -- from the forwarder. - Await (ServerAgency TokBusy) $ \(MsgResp resp) -> + Await $ \(MsgResp resp) -> Effect $ ekgAcceptorPeer <$> next resp @@ -55,4 +53,4 @@ ekgAcceptorPeer = \case -- 'done', with a return value. Effect $ do r <- getResult - return $ Yield (ClientAgency TokIdle) MsgDone (Done TokDone r) + return $ Yield MsgDone (Done r) diff --git a/src/System/Metrics/Protocol/Codec.hs b/src/System/Metrics/Protocol/Codec.hs index d22dd42..b6c9608 100644 --- a/src/System/Metrics/Protocol/Codec.hs +++ b/src/System/Metrics/Protocol/Codec.hs @@ -1,8 +1,10 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} module System.Metrics.Protocol.Codec ( codecEKGForward @@ -14,12 +16,13 @@ import Codec.CBOR.Read (DeserialiseFailure) import Control.Monad.Class.MonadST (MonadST) import qualified Data.ByteString.Lazy as LBS import Text.Printf (printf) -import Network.TypedProtocol.Codec.CBOR (Codec, PeerHasAgency (..), - PeerRole (..), SomeMessage (..), - mkCodecCborLazyBS) +import Network.TypedProtocol.Core +import Network.TypedProtocol.Codec (Codec, SomeMessage (..)) +import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS) import System.Metrics.Protocol.Type + codecEKGForward :: forall req resp m. (MonadST m) @@ -34,47 +37,46 @@ codecEKGForward encodeReq decodeReq mkCodecCborLazyBS encode decode where -- Encode messages. - encode :: forall (pr :: PeerRole) - (st :: EKGForward req resp) + encode :: forall (st :: EKGForward req resp) (st' :: EKGForward req resp). - PeerHasAgency pr st - -> Message (EKGForward req resp) st st' + Message (EKGForward req resp) st st' -> CBOR.Encoding - encode (ClientAgency TokIdle) (MsgReq req) = + encode (MsgReq req) = CBOR.encodeListLen 2 <> CBOR.encodeWord 0 <> encodeReq req - encode (ClientAgency TokIdle) MsgDone = + encode MsgDone = CBOR.encodeListLen 1 <> CBOR.encodeWord 1 - encode (ServerAgency TokBusy) (MsgResp resp) = + encode (MsgResp resp) = CBOR.encodeListLen 2 <> CBOR.encodeWord 1 <> encodeResp resp -- Decode messages - decode :: forall (pr :: PeerRole) - (st :: EKGForward req resp) s. - PeerHasAgency pr st + decode :: forall (st :: EKGForward req resp) s. + ActiveState st + => StateToken st -> CBOR.Decoder s (SomeMessage st) decode stok = do len <- CBOR.decodeListLen key <- CBOR.decodeWord case (key, len, stok) of - (0, 2, ClientAgency TokIdle) -> + (0, 2, SingIdle) -> SomeMessage . MsgReq <$> decodeReq - (1, 1, ClientAgency TokIdle) -> + (1, 1, SingIdle) -> return $ SomeMessage MsgDone - (1, 2, ServerAgency TokBusy) -> + (1, 2, SingBusy) -> SomeMessage . MsgResp <$> decodeResp -- Failures per protocol state - (_, _, ClientAgency TokIdle) -> + (_, _, SingIdle) -> fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len) - (_, _, ServerAgency TokBusy) -> + (_, _, SingBusy) -> fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len) + (_, _, SingDone) -> notActiveState stok diff --git a/src/System/Metrics/Protocol/Forwarder.hs b/src/System/Metrics/Protocol/Forwarder.hs index 85e2983..67e2ae6 100644 --- a/src/System/Metrics/Protocol/Forwarder.hs +++ b/src/System/Metrics/Protocol/Forwarder.hs @@ -1,15 +1,16 @@ +{-# LANGUAGE BlockArguments #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} module System.Metrics.Protocol.Forwarder ( EKGForwarder (..) , ekgForwarderPeer ) where -import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..), - PeerRole (..)) +import Network.TypedProtocol.Peer.Server import System.Metrics.Protocol.Type @@ -21,33 +22,32 @@ import System.Metrics.Protocol.Type -- data EKGForwarder req resp m a = EKGForwarder { -- | The acceptor sent us a request for new metrics. - recvMsgReq :: req -> m resp + recvMsgReq :: req -> m resp -- | The acceptor terminated. Here we have a pure return value, but we -- could have done another action in 'm' if we wanted to. - , recvMsgDone :: m a + , recvMsgDone :: m a } -- | Interpret a particular action sequence into the server side of the -- 'EKGForward' protocol. -- ekgForwarderPeer - :: Monad m + :: forall m req resp a. () + => Monad m => EKGForwarder req resp m a - -> Peer (EKGForward req resp) 'AsServer 'StIdle m a + -> Server (EKGForward req resp) 'NonPipelined 'StIdle m a ekgForwarderPeer EKGForwarder{..} = go where + go :: Server (EKGForward req resp) 'NonPipelined 'StIdle m a go = -- In the 'StIdle' state the forwarder is awaiting a request message -- from the acceptor. - Await (ClientAgency TokIdle) $ \case - -- The acceptor sent us a request for new metrics, so now we're - -- in the 'StBusy' state which means it's the forwarder's turn to send - -- a reply. - MsgReq req -> Effect $ do + Await \case + MsgReq req -> Effect do resp <- recvMsgReq req - return $ Yield (ServerAgency TokBusy) (MsgResp resp) go + return $ Yield (MsgResp resp) go -- The acceptor sent the done transition, so we're in the 'StDone' state -- so all we can do is stop using 'done', with a return value. - MsgDone -> Effect $ Done TokDone <$> recvMsgDone + MsgDone -> Effect $ Done <$> recvMsgDone diff --git a/src/System/Metrics/Protocol/Type.hs b/src/System/Metrics/Protocol/Type.hs index 5cc26b0..6cdf461 100644 --- a/src/System/Metrics/Protocol/Type.hs +++ b/src/System/Metrics/Protocol/Type.hs @@ -1,9 +1,12 @@ {-# LANGUAGE DataKinds #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE EmptyCase #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneKindSignatures #-} {-# LANGUAGE TypeFamilies #-} -- | The type of the EKG forwarding/accepting protocol. @@ -14,13 +17,11 @@ module System.Metrics.Protocol.Type ( EKGForward (..) , Message (..) - , ClientHasAgency (..) - , ServerHasAgency (..) - , NobodyHasAgency (..) + , SingEKGForward (..) ) where -import Data.Proxy (Proxy(..)) -import Network.TypedProtocol.Core (Protocol (..)) +import Data.Singletons +import Network.TypedProtocol.Core import Ouroboros.Network.Util.ShowProxy (ShowProxy(..)) -- | A kind to identify our protocol, and the types of the states in the state @@ -69,6 +70,24 @@ instance (ShowProxy req, ShowProxy resp) , ")" ] +-- | Singleton type of EKGForward. Same as: +-- +-- @ +-- type SingEKGForward :: EKGForward req resp -> Type +-- type SingEKGForward = TypeRep +-- @ +data SingEKGForward (st :: EKGForward req resp) where + SingIdle :: SingEKGForward 'StIdle + SingBusy :: SingEKGForward 'StBusy + SingDone :: SingEKGForward 'StDone + +type instance Sing = SingEKGForward + +deriving instance Show (SingEKGForward st) +instance StateTokenI 'StIdle where stateToken = SingIdle +instance StateTokenI 'StBusy where stateToken = SingBusy +instance StateTokenI 'StDone where stateToken = SingDone + instance Protocol (EKGForward req resp) where -- | The messages in the EKG forwarding/accepting protocol. @@ -92,31 +111,14 @@ instance Protocol (EKGForward req resp) where -- a reply to the acceptor (list of new metrics). -- -- So we assume that, from __interaction__ point of view: - -- 1. ClientHasAgency (from 'Network.TypedProtocol.Core') corresponds to acceptor's agency. - -- 3. ServerHasAgency (from 'Network.TypedProtocol.Core') corresponds to forwarder's agency. + -- 1. ClientAgency (from 'Network.TypedProtocol.Core') corresponds to acceptor's agency. + -- 3. ServerAgency (from 'Network.TypedProtocol.Core') corresponds to forwarder's agency. -- - data ClientHasAgency st where - TokIdle :: ClientHasAgency 'StIdle - - data ServerHasAgency st where - TokBusy :: ServerHasAgency 'StBusy - - data NobodyHasAgency st where - TokDone :: NobodyHasAgency 'StDone - - -- | Impossible cases. - exclusionLemma_ClientAndServerHaveAgency TokIdle tok = case tok of {} - exclusionLemma_NobodyAndClientHaveAgency TokDone tok = case tok of {} - exclusionLemma_NobodyAndServerHaveAgency TokDone tok = case tok of {} - -instance (Show req, Show resp) - => Show (Message (EKGForward req resp) from to) where - show MsgReq{} = "MsgReq" - show MsgResp{} = "MsgResp" - show MsgDone{} = "MsgDone" + type StateAgency 'StIdle = 'ClientAgency + type StateAgency 'StBusy = 'ServerAgency + type StateAgency 'StDone = 'NobodyAgency -instance Show (ClientHasAgency (st :: EKGForward req resp)) where - show TokIdle = "TokIdle" + type StateToken = SingEKGForward -instance Show (ServerHasAgency (st :: EKGForward req resp)) where - show TokBusy = "TokBusy" +deriving instance (Show req, Show resp) + => Show (Message (EKGForward req resp) from to)