-
Notifications
You must be signed in to change notification settings - Fork 0
/
Async.hs
93 lines (75 loc) · 2.68 KB
/
Async.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Main where
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad.Trans.Class
import Control.Monad.Trans.Cont
import Data.Foldable (traverse_)
import Data.Proxy
class Monad m => Async m where
spawn :: m () -> IO ()
suspend :: ((a -> IO ()) -> IO ()) -> m a
nowait :: IO a -> m a
newtype Future a = Future (MVar (FutureState a))
data FutureState a
= Pending [a -> IO ()]
| Fulfilled a
data FutureFulfilledException = FutureFulfilledException
deriving (Show)
instance Exception FutureFulfilledException
emptyFuture :: IO (Future a)
emptyFuture = Future <$> newMVar (Pending [])
fulfill :: Future a -> a -> IO ()
fulfill (Future fstateVar) result = do
fstate <- takeMVar fstateVar
case fstate of
Pending awaiters -> do
putMVar fstateVar (Fulfilled result)
traverse_ ($ result) awaiters
Fulfilled a -> do
putMVar fstateVar (Fulfilled a)
throwIO FutureFulfilledException
getFuture :: Async m => Future a -> m a
getFuture (Future fstateVar) = do
res <- nowait $ takeMVar fstateVar
case res of
Fulfilled r -> nowait (putMVar fstateVar (Fulfilled r)) >> pure r
Pending awaiters -> suspend $ putMVar fstateVar . Pending . (: awaiters)
newtype ThreadAsync a = ThreadAsync (IO a)
deriving (Functor, Applicative, Monad)
instance Async ThreadAsync where
spawn (ThreadAsync io) = forkIO (io >> pure ()) >> pure ()
suspend action = ThreadAsync do
box <- newEmptyMVar
action (putMVar box)
takeMVar box
nowait = ThreadAsync
newtype CpsAsync a = CpsAsync {unCpsAsync :: ContT () IO a}
deriving (Functor, Applicative, Monad)
instance Async CpsAsync where
spawn = evalContT . resetT . unCpsAsync
suspend action = CpsAsync . shiftT $ \k -> lift (action k) >> pure ()
nowait = CpsAsync . lift
asyncWork :: Async m => MVar () -> m ()
asyncWork finish = do
content <- readFileAsync "./test.txt"
nowait (putStrLn content)
content <- readFileAsync "./test.txt"
nowait (putStrLn content)
nowait (putMVar finish ())
readFileAsync :: Async m => FilePath -> m String
readFileAsync path = suspend ((pure () <*) . forkIO . (readFile path >>=)) -- forkIO here means any other runtime
main :: IO ()
main = do
finish <- newEmptyMVar
putStrLn "A"
spawn (asyncWork finish :: CpsAsync ())
putStrLn "B"
takeMVar finish
future <- emptyFuture
spawn (getFuture future >> nowait (putStrLn "heihei") :: ThreadAsync ())
spawn (getFuture future >> nowait (putStrLn "houhou") :: CpsAsync ())
fulfill future ()
threadDelay 1000