Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to typed-protocols 0.3.0.0 #35

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.7 - Oct 2024

* Updated to `typed-protocols-0.3`.

## 0.6.0 - Sep 2024

* Remove potentially leaky continuation passing of `EKGForwarder`.
Expand Down
6 changes: 3 additions & 3 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Custom repository for cardano haskell packages, see
-- ouroboros-network/CONTRIBUTING for more
repository cardano-haskell-packages
url: https://input-output-hk.github.io/cardano-haskell-packages
url: https://chap.intersectmbo.org/
secure: True
root-keys:
3e0cce471cf09815f930210f7827266fd09045445d65923e6d0238a6cd15126f
Expand All @@ -16,8 +16,8 @@ repository cardano-haskell-packages
-- Bump this if you need newer packages from Hackage

index-state:
, hackage.haskell.org 2024-09-05T18:39:40Z
, cardano-haskell-packages 2024-09-10T12:51:27Z
, hackage.haskell.org 2024-10-24T18:39:40Z
, cardano-haskell-packages 2024-10-24T07:10:59Z

packages: ./.

Expand Down
7 changes: 4 additions & 3 deletions ekg-forward.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 2.4
name: ekg-forward
version: 0.6
version: 0.7
synopsis: See README for more info
description: See README for more info
homepage: https://github.com/input-output-hk/ekg-forward
Expand Down Expand Up @@ -65,12 +65,13 @@ library
, io-classes >= 1.4.1
mgmeier marked this conversation as resolved.
Show resolved Hide resolved
, network
, ouroboros-network-api
, ouroboros-network-framework >= 0.8 && < 0.14
, ouroboros-network-framework ^>= 0.14
, singletons ^>= 3.0
, serialise
, stm
, text
, time
, typed-protocols ^>= 0.1.1
, typed-protocols ^>= 0.3
, typed-protocols-cborg
, unordered-containers

Expand Down
85 changes: 58 additions & 27 deletions src/System/Metrics/Network/Forwarder.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module System.Metrics.Network.Forwarder
( connectToAcceptor
Expand All @@ -16,29 +19,30 @@ import qualified Codec.Serialise as CBOR
import "contra-tracer" Control.Tracer (nullTracer)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Text as T
import Data.Void (Void)
import Data.Void (Void, absurd)
import qualified Network.Socket as Socket
import Network.TypedProtocol.Codec
import Control.Exception (throwIO)
import Ouroboros.Network.Context (MinimalInitiatorContext, ResponderContext)
import Ouroboros.Network.Driver.Simple (runPeer)
import Ouroboros.Network.Driver.Limits (ProtocolTimeLimits)
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..),
MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..),
RunMiniProtocol (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..), MiniProtocolLimits (..), MiniProtocolNum (..),
MuxMode (..), OuroborosApplication (..), RunMiniProtocol (..),
miniProtocolLimits, miniProtocolNum, miniProtocolRun)
import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake,
import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec, noTimeLimitsHandshake,
timeLimitsHandshake)
import Ouroboros.Network.Protocol.Handshake.Type (Handshake)
import Ouroboros.Network.Protocol.Handshake.Unversioned (UnversionedProtocol (..),
UnversionedProtocolData (..),
unversionedHandshakeCodec,
unversionedProtocolDataCodec)
import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion, simpleSingletonVersions)

import Ouroboros.Network.Protocol.Handshake.Version (Versions, acceptableVersion, queryVersion, simpleSingletonVersions)
import Ouroboros.Network.Snocket (MakeBearer, Snocket,
localAddressFromPath, localSnocket, socketSnocket,
makeLocalBearer, makeSocketBearer)
import Ouroboros.Network.Socket (HandshakeCallbacks (..), connectToNode, nullNetworkConnectTracers)
import Ouroboros.Network.Socket (NetworkConnectTracers(..), HandshakeCallbacks (..), ConnectToArgs (..), connectToNode, nullNetworkConnectTracers)
import qualified System.Metrics as EKG

import System.Metrics.Configuration (ForwarderConfiguration (..), HowToConnect (..))
Expand All @@ -50,7 +54,7 @@ connectToAcceptor
:: ForwarderConfiguration
-> EKG.Store
-> IO ()
connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \iocp -> do
connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager \iocp -> do
let app = forwarderApp config ekgStore
case acceptorEndpoint of
LocalPipe localPipe -> do
Expand All @@ -64,7 +68,8 @@ connectToAcceptor config@ForwarderConfiguration{..} ekgStore = withIOManager $ \
doConnectToAcceptor snocket makeSocketBearer mempty address timeLimitsHandshake app

doConnectToAcceptor
:: Snocket IO fd addr
:: forall fd addr. ()
=> Snocket IO fd addr
-> MakeBearer IO fd
-> (fd -> IO ()) -- ^ configure socket
-> addr
Expand All @@ -75,21 +80,47 @@ doConnectToAcceptor
LBS.ByteString IO () Void
-> IO ()
doConnectToAcceptor snocket makeBearer configureSocket address timeLimits app =
connectToNode
snocket
makeBearer
configureSocket
unversionedHandshakeCodec
timeLimits
unversionedProtocolDataCodec
nullNetworkConnectTracers
(HandshakeCallbacks acceptableVersion queryVersion)
(simpleSingletonVersions

let
connectToArgs :: ConnectToArgs fd addr UnversionedProtocol UnversionedProtocolData
connectToArgs = ConnectToArgs
{ ctaHandshakeCodec = unversionedHandshakeCodec
:: Codec (Handshake UnversionedProtocol Term) CBOR.DeserialiseFailure IO LBS.ByteString
, ctaHandshakeTimeLimits = timeLimits
:: ProtocolTimeLimits (Handshake UnversionedProtocol Term)
, ctaVersionDataCodec = unversionedProtocolDataCodec
:: VersionDataCodec Term UnversionedProtocol UnversionedProtocolData
, ctaConnectTracers = nullNetworkConnectTracers
:: NetworkConnectTracers addr UnversionedProtocol
, ctaHandshakeCallbacks = HandshakeCallbacks acceptableVersion queryVersion
:: HandshakeCallbacks UnversionedProtocolData
}

versions :: Versions UnversionedProtocol UnversionedProtocolData
(OuroborosApplication 'InitiatorMode (MinimalInitiatorContext addr) (ResponderContext addr) LBS.ByteString IO () Void)
versions = simpleSingletonVersions
UnversionedProtocol
UnversionedProtocolData
app)
Nothing
address
app

localAddress :: Maybe addr
remoteAddress :: addr
(localAddress, remoteAddress) = (Nothing, address)

in do
res <- connectToNode
snocket
makeBearer
connectToArgs
configureSocket
versions
localAddress
remoteAddress

case res of
Left err -> throwIO err
Right (Left ()) -> pure ()
Right (Right void) -> absurd void

forwarderApp
:: ForwarderConfiguration
Expand All @@ -109,7 +140,7 @@ forwardEKGMetrics
-> EKG.Store
-> RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
forwardEKGMetrics config ekgStore =
InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
InitiatorProtocolOnly $ MiniProtocolCb \_ctx channel ->
runPeer
(forwarderTracer config)
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
Expand All @@ -122,7 +153,7 @@ forwardEKGMetricsResp
-> EKG.Store
-> RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void ()
forwardEKGMetricsResp config ekgStore =
ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
ResponderProtocolOnly $ MiniProtocolCb \_ctx channel ->
runPeer
(forwarderTracer config)
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
Expand All @@ -133,7 +164,7 @@ forwardEKGMetricsResp config ekgStore =
forwardEKGMetricsDummy
:: RunMiniProtocol 'InitiatorMode initiatorCtx responderCtx LBS.ByteString IO () Void
forwardEKGMetricsDummy =
InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
InitiatorProtocolOnly $ MiniProtocolCb \_ctx channel ->
runPeer
nullTracer
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
Expand All @@ -144,7 +175,7 @@ forwardEKGMetricsDummy =
forwardEKGMetricsRespDummy
:: RunMiniProtocol 'ResponderMode initiatorCtx responderCtx LBS.ByteString IO Void ()
forwardEKGMetricsRespDummy =
ResponderProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
ResponderProtocolOnly $ MiniProtocolCb \_ctx channel ->
runPeer
nullTracer
(Forwarder.codecEKGForward CBOR.encode CBOR.decode
Expand Down
12 changes: 5 additions & 7 deletions src/System/Metrics/Protocol/Acceptor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ module System.Metrics.Protocol.Acceptor (
, ekgAcceptorPeer
) where

import Network.TypedProtocol.Core (Peer (..), PeerHasAgency (..),
PeerRole (..))

import Network.TypedProtocol.Peer.Client
import System.Metrics.Protocol.Type

-- | Please note that the acceptor is a server from the __networking__
Expand All @@ -38,14 +36,14 @@ data EKGAcceptor req resp m a where
ekgAcceptorPeer
:: Monad m
=> EKGAcceptor req resp m a
-> Peer (EKGForward req resp) 'AsClient 'StIdle m a
-> Client (EKGForward req resp) 'NonPipelined 'StIdle m a
ekgAcceptorPeer = \case
SendMsgReq req next ->
-- Send our message (request for the new metrics from the forwarder).
Yield (ClientAgency TokIdle) (MsgReq req) $
Yield (MsgReq req) $
-- We're now into the 'StBusy' state, and now we'll wait for a reply
-- from the forwarder.
Await (ServerAgency TokBusy) $ \(MsgResp resp) ->
Await $ \(MsgResp resp) ->
Effect $
ekgAcceptorPeer <$> next resp

Expand All @@ -55,4 +53,4 @@ ekgAcceptorPeer = \case
-- 'done', with a return value.
Effect $ do
r <- getResult
return $ Yield (ClientAgency TokIdle) MsgDone (Done TokDone r)
return $ Yield MsgDone (Done r)
38 changes: 20 additions & 18 deletions src/System/Metrics/Protocol/Codec.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

module System.Metrics.Protocol.Codec (
codecEKGForward
Expand All @@ -14,12 +16,13 @@ import Codec.CBOR.Read (DeserialiseFailure)
import Control.Monad.Class.MonadST (MonadST)
import qualified Data.ByteString.Lazy as LBS
import Text.Printf (printf)
import Network.TypedProtocol.Codec.CBOR (Codec, PeerHasAgency (..),
PeerRole (..), SomeMessage (..),
mkCodecCborLazyBS)
import Network.TypedProtocol.Core
import Network.TypedProtocol.Codec (Codec, SomeMessage (..))
import Network.TypedProtocol.Codec.CBOR (mkCodecCborLazyBS)

import System.Metrics.Protocol.Type


codecEKGForward
:: forall req resp m.
(MonadST m)
Expand All @@ -34,47 +37,46 @@ codecEKGForward encodeReq decodeReq
mkCodecCborLazyBS encode decode
where
-- Encode messages.
encode :: forall (pr :: PeerRole)
(st :: EKGForward req resp)
encode :: forall (st :: EKGForward req resp)
(st' :: EKGForward req resp).
PeerHasAgency pr st
-> Message (EKGForward req resp) st st'
Message (EKGForward req resp) st st'
-> CBOR.Encoding

encode (ClientAgency TokIdle) (MsgReq req) =
encode (MsgReq req) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 0
<> encodeReq req

encode (ClientAgency TokIdle) MsgDone =
encode MsgDone =
CBOR.encodeListLen 1
<> CBOR.encodeWord 1

encode (ServerAgency TokBusy) (MsgResp resp) =
encode (MsgResp resp) =
CBOR.encodeListLen 2
<> CBOR.encodeWord 1
<> encodeResp resp

-- Decode messages
decode :: forall (pr :: PeerRole)
(st :: EKGForward req resp) s.
PeerHasAgency pr st
decode :: forall (st :: EKGForward req resp) s.
ActiveState st
=> StateToken st
-> CBOR.Decoder s (SomeMessage st)
decode stok = do
len <- CBOR.decodeListLen
key <- CBOR.decodeWord
case (key, len, stok) of
(0, 2, ClientAgency TokIdle) ->
(0, 2, SingIdle) ->
SomeMessage . MsgReq <$> decodeReq

(1, 1, ClientAgency TokIdle) ->
(1, 1, SingIdle) ->
return $ SomeMessage MsgDone

(1, 2, ServerAgency TokBusy) ->
(1, 2, SingBusy) ->
SomeMessage . MsgResp <$> decodeResp

-- Failures per protocol state
(_, _, ClientAgency TokIdle) ->
(_, _, SingIdle) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, ServerAgency TokBusy) ->
(_, _, SingBusy) ->
fail (printf "codecEKGForward (%s) unexpected key (%d, %d)" (show stok) key len)
(_, _, SingDone) -> notActiveState stok
Loading
Loading