Skip to content

Commit 8b68456

Browse files
author
Tim Watson
committed
Async API for local and remote work - fixes #8
1 parent e60d384 commit 8b68456

File tree

5 files changed

+60
-44
lines changed

5 files changed

+60
-44
lines changed

src/Control/Distributed/Process/Platform/Async.hs

+15-5
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@
3030
module Control.Distributed.Process.Platform.Async
3131
( -- types/data
3232
AsyncRef
33-
, AsyncTask
33+
, AsyncTask(..)
3434
, AsyncResult(..)
35+
, asyncDo
3536
) where
3637
import Control.Distributed.Process
37-
38+
import Control.Distributed.Process.Serializable (SerializableDict)
3839
import Data.Binary
3940
import Data.DeriveTH
4041
import Data.Typeable (Typeable)
@@ -48,9 +49,15 @@ type AsyncRef = ProcessId
4849

4950
-- | A task to be performed asynchronously. This can either take the
5051
-- form of an action that runs over some type @a@ in the @Process@ monad,
51-
-- or a tuple that adds the node on which the asynchronous task should be
52-
-- spawned - in the @Process a@ case the task is spawned on the local node
53-
type AsyncTask a = Process a
52+
-- or a static 'SerializableDict' and @Closure (Process a)@ neccessary for the
53+
-- task to be spawned on a remote node.
54+
data AsyncTask a =
55+
AsyncTask { asyncTask :: Process a }
56+
| AsyncRemoteTask {
57+
asyncTaskDict :: Static (SerializableDict a)
58+
, asyncTaskNode :: NodeId
59+
, asyncTaskProc :: Closure (Process a)
60+
}
5461

5562
-- | Represents the result of an asynchronous action, which can be in one of
5663
-- several states at any given time.
@@ -65,3 +72,6 @@ $(derive makeBinary ''AsyncResult)
6572

6673
deriving instance Eq a => Eq (AsyncResult a)
6774
deriving instance Show a => Show (AsyncResult a)
75+
76+
asyncDo :: Process a -> AsyncTask a
77+
asyncDo = AsyncTask

src/Control/Distributed/Process/Platform/Async/AsyncChan.hs

+9-5
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ module Control.Distributed.Process.Platform.Async.AsyncChan
4747
, waitCheckTimeout
4848
) where
4949

50-
import Control.Distributed.Process.Platform.Async
50+
import Control.Distributed.Process.Platform.Async hiding (asyncDo)
5151
import Control.Distributed.Process.Platform.Time
5252
import Control.Distributed.Process.Platform.Internal.Types
5353
import Control.Distributed.Process
@@ -67,7 +67,9 @@ type InternalChannel a = (SendPort (AsyncResult a), ReceivePort (AsyncResult a))
6767
-- Handles of this type cannot cross remote boundaries. Furthermore, handles
6868
-- of this type /must not/ be passed to functions in this module by processes
6969
-- other than the caller of 'async' - that is, this module provides asynchronous
70-
-- actions whose results are accessible *only* by the initiating process.
70+
-- actions whose results are accessible *only* by the initiating process. This
71+
-- limitation is imposed becuase of the use of type channels, for which the
72+
-- @ReceivePort@ component is effectively /thread local/.
7173
--
7274
-- See 'async'
7375
data AsyncChan a = AsyncChan {
@@ -109,16 +111,18 @@ asyncLinked :: (Serializable a) => AsyncTask a -> Process (AsyncChan a)
109111
asyncLinked = async
110112

111113
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (AsyncChan a)
112-
asyncDo shouldLink task = do
113-
(wpid, gpid, chan) <- spawnWorkers task shouldLink
114+
asyncDo shouldLink (AsyncRemoteTask d n c) =
115+
let proc = call d n c in asyncDo shouldLink AsyncTask { asyncTask = proc }
116+
asyncDo shouldLink (AsyncTask proc) = do
117+
(wpid, gpid, chan) <- spawnWorkers proc shouldLink
114118
return AsyncChan {
115119
worker = wpid
116120
, insulator = gpid
117121
, channel = chan
118122
}
119123

120124
spawnWorkers :: (Serializable a)
121-
=> AsyncTask a
125+
=> Process a
122126
-> Bool
123127
-> Process (AsyncRef, AsyncRef, InternalChannel a)
124128
spawnWorkers task shouldLink = do

src/Control/Distributed/Process/Platform/Async/AsyncSTM.hs

+8-7
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
-- and waiting for their results. It is a thin layer over the basic
1717
-- concurrency operations provided by "Control.Distributed.Process".
1818
--
19-
-- The difference between 'Control.Distributed.Platform.Async.Async' and
19+
-- The difference between 'Control.Distributed.Platform.Async.AsyncSTM' and
2020
-- 'Control.Distributed.Platform.Async.AsyncChan' is that handles of the
21-
-- former (i.e., returned by /this/ module) can be sent across a remote
22-
-- boundary, where the receiver can use the API calls to wait on the
23-
-- results of the computation at their end.
21+
-- former (i.e., returned by /this/ module) can be used by processes other
22+
-- than the caller of 'async', but are not 'Serializable'.
2423
--
2524
-- Like 'Control.Distributed.Platform.Async.AsyncChan', workers can be
2625
-- started on a local or remote node.
@@ -57,7 +56,7 @@ import Control.Applicative
5756
import Control.Concurrent.STM hiding (check)
5857
import Control.Distributed.Process
5958
import Control.Distributed.Process.Serializable
60-
import Control.Distributed.Process.Platform.Async
59+
import Control.Distributed.Process.Platform.Async hiding (asyncDo)
6160
import Control.Distributed.Process.Platform.Internal.Types
6261
( CancelWait(..)
6362
, Channel
@@ -113,15 +112,17 @@ asyncLinked :: (Serializable a) => AsyncTask a -> Process (AsyncSTM a)
113112
asyncLinked = asyncDo True
114113

115114
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (AsyncSTM a)
116-
asyncDo shouldLink task = do
115+
asyncDo shouldLink (AsyncRemoteTask d n c) =
116+
let proc = call d n c in asyncDo shouldLink AsyncTask { asyncTask = proc }
117+
asyncDo shouldLink (AsyncTask proc) = do
117118
root <- getSelfPid
118119
result <- liftIO $ newEmptyTMVarIO
119120

120121
-- listener/response proxy
121122
mPid <- spawnLocal $ do
122123
wPid <- spawnLocal $ do
123124
() <- expect
124-
r <- task
125+
r <- proc
125126
void $ liftIO $ atomically $ putTMVar result (AsyncDone r)
126127

127128
send root wPid -- let the parent process know the worker pid

tests/TestAsyncChan.hs

+13-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import Control.Distributed.Process.Node
1313
import Control.Distributed.Process.Serializable()
1414
import Control.Distributed.Process.Platform.Time
1515
import Control.Distributed.Process.Platform.Timer
16+
import Control.Distributed.Process.Platform.Async
1617
import Control.Distributed.Process.Platform.Async.AsyncChan
1718
import Data.Binary()
1819
import Data.Typeable()
@@ -26,7 +27,7 @@ import TestUtils
2627

2728
testAsyncPoll :: TestResult (AsyncResult ()) -> Process ()
2829
testAsyncPoll result = do
29-
hAsync <- async $ do "go" <- expect; say "running" >> return ()
30+
hAsync <- async $ asyncDo $ do "go" <- expect; say "running" >> return ()
3031
ar <- poll hAsync
3132
case ar of
3233
AsyncPending ->
@@ -35,7 +36,7 @@ testAsyncPoll result = do
3536

3637
testAsyncCancel :: TestResult (AsyncResult ()) -> Process ()
3738
testAsyncCancel result = do
38-
hAsync <- async $ runTestProcess $ say "running" >> return ()
39+
hAsync <- async $ asyncDo $ runTestProcess $ say "running" >> return ()
3940
sleep $ milliSeconds 100
4041

4142
p <- poll hAsync -- nasty kind of assertion: use assertEquals?
@@ -47,7 +48,7 @@ testAsyncCancelWait :: TestResult (Maybe (AsyncResult ())) -> Process ()
4748
testAsyncCancelWait result = do
4849
testPid <- getSelfPid
4950
p <- spawnLocal $ do
50-
hAsync <- async $ runTestProcess $ say "running" >> (sleep $ seconds 60)
51+
hAsync <- async $ asyncDo $ runTestProcess $ sleep $ seconds 60
5152
sleep $ milliSeconds 100
5253

5354
send testPid "running"
@@ -65,7 +66,7 @@ testAsyncWaitTimeout :: TestResult (Maybe (AsyncResult ())) -> Process ()
6566
testAsyncWaitTimeout result =
6667
let delay = seconds 1
6768
in do
68-
hAsync <- async $ sleep $ seconds 20
69+
hAsync <- async $ asyncDo $ sleep $ seconds 20
6970
waitTimeout delay hAsync >>= stash result
7071
cancelWait hAsync >> return ()
7172

@@ -74,7 +75,7 @@ testAsyncLinked result = do
7475
mv :: MVar (AsyncChan ()) <- liftIO $ newEmptyMVar
7576
pid <- spawnLocal $ do
7677
-- NB: async == asyncLinked for AsyncChan
77-
h <- async $ do
78+
h <- async $ asyncDo $ do
7879
"waiting" <- expect
7980
return ()
8081
stash mv h
@@ -96,9 +97,9 @@ testAsyncLinked result = do
9697

9798
testAsyncWaitAny :: TestResult [AsyncResult String] -> Process ()
9899
testAsyncWaitAny result = do
99-
p1 <- async $ expect >>= return
100-
p2 <- async $ expect >>= return
101-
p3 <- async $ expect >>= return
100+
p1 <- async $ asyncDo $ expect >>= return
101+
p2 <- async $ asyncDo $ expect >>= return
102+
p3 <- async $ asyncDo $ expect >>= return
102103
send (worker p3) "c"
103104
r1 <- waitAny [p1, p2, p3]
104105
send (worker p1) "a"
@@ -109,14 +110,14 @@ testAsyncWaitAny result = do
109110

110111
testAsyncWaitAnyTimeout :: TestResult (Maybe (AsyncResult String)) -> Process ()
111112
testAsyncWaitAnyTimeout result = do
112-
p1 <- asyncLinked $ expect >>= return
113-
p2 <- asyncLinked $ expect >>= return
114-
p3 <- asyncLinked $ expect >>= return
113+
p1 <- asyncLinked $ asyncDo $ expect >>= return
114+
p2 <- asyncLinked $ asyncDo $ expect >>= return
115+
p3 <- asyncLinked $ asyncDo $ expect >>= return
115116
waitAnyTimeout (seconds 1) [p1, p2, p3] >>= stash result
116117

117118
testAsyncCancelWith :: TestResult Bool -> Process ()
118119
testAsyncCancelWith result = do
119-
p1 <- async $ do { s :: String <- expect; return s }
120+
p1 <- async $ asyncDo $ do { s :: String <- expect; return s }
120121
cancelWith "foo" p1
121122
AsyncFailed (DiedException _) <- wait p1
122123
stash result True

tests/TestAsyncSTM.hs

+15-15
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import TestUtils
2626

2727
testAsyncPoll :: TestResult (AsyncResult ()) -> Process ()
2828
testAsyncPoll result = do
29-
hAsync <- async $ do "go" <- expect; say "running" >> return ()
29+
hAsync <- async $ asyncDo $ do "go" <- expect; say "running" >> return ()
3030
ar <- poll hAsync
3131
case ar of
3232
AsyncPending ->
@@ -35,7 +35,7 @@ testAsyncPoll result = do
3535

3636
testAsyncCancel :: TestResult (AsyncResult ()) -> Process ()
3737
testAsyncCancel result = do
38-
hAsync <- async $ runTestProcess $ say "running" >> return ()
38+
hAsync <- async $ asyncDo $ runTestProcess $ say "running" >> return ()
3939
sleep $ milliSeconds 100
4040

4141
p <- poll hAsync -- nasty kind of assertion: use assertEquals?
@@ -47,7 +47,7 @@ testAsyncCancelWait :: TestResult (Maybe (AsyncResult ())) -> Process ()
4747
testAsyncCancelWait result = do
4848
testPid <- getSelfPid
4949
p <- spawnLocal $ do
50-
hAsync <- async $ runTestProcess $ sleep $ seconds 60
50+
hAsync <- async $ asyncDo $ runTestProcess $ sleep $ seconds 60
5151
sleep $ milliSeconds 100
5252

5353
send testPid "running"
@@ -65,7 +65,7 @@ testAsyncWaitTimeout :: TestResult (Maybe (AsyncResult ())) -> Process ()
6565
testAsyncWaitTimeout result =
6666
let delay = seconds 1
6767
in do
68-
hAsync <- async $ sleep $ seconds 20
68+
hAsync <- async $ asyncDo $ sleep $ seconds 20
6969
waitTimeout delay hAsync >>= stash result
7070
cancelWait hAsync >> return ()
7171

@@ -74,7 +74,7 @@ testAsyncWaitTimeoutCompletes :: TestResult (Maybe (AsyncResult ()))
7474
testAsyncWaitTimeoutCompletes result =
7575
let delay = seconds 1
7676
in do
77-
hAsync <- async $ sleep $ seconds 20
77+
hAsync <- async $ asyncDo $ sleep $ seconds 20
7878
waitTimeout delay hAsync >>= stash result
7979
cancelWait hAsync >> return ()
8080

@@ -83,15 +83,15 @@ testAsyncWaitTimeoutSTM :: TestResult (Maybe (AsyncResult ())) -> Process ()
8383
testAsyncWaitTimeoutSTM result =
8484
let delay = seconds 1
8585
in do
86-
hAsync <- async $ sleep $ seconds 20
86+
hAsync <- async $ asyncDo $ sleep $ seconds 20
8787
waitTimeoutSTM delay hAsync >>= stash result
8888

8989
testAsyncWaitTimeoutCompletesSTM :: TestResult (Maybe (AsyncResult Int))
9090
-> Process ()
9191
testAsyncWaitTimeoutCompletesSTM result =
9292
let delay = seconds 1 in do
9393

94-
hAsync <- async $ do
94+
hAsync <- async $ asyncDo $ do
9595
i <- expect
9696
return i
9797

@@ -106,7 +106,7 @@ testAsyncLinked result = do
106106
mv :: MVar (AsyncSTM ()) <- liftIO $ newEmptyMVar
107107
pid <- spawnLocal $ do
108108
-- NB: async == asyncLinked for AsyncChan
109-
h <- asyncLinked $ do
109+
h <- asyncLinked $ asyncDo $ do
110110
"waiting" <- expect
111111
return ()
112112
stash mv h
@@ -134,9 +134,9 @@ testAsyncLinked result = do
134134

135135
testAsyncWaitAny :: TestResult [AsyncResult String] -> Process ()
136136
testAsyncWaitAny result = do
137-
p1 <- async $ expect >>= return
138-
p2 <- async $ expect >>= return
139-
p3 <- async $ expect >>= return
137+
p1 <- async $ asyncDo $ expect >>= return
138+
p2 <- async $ asyncDo $ expect >>= return
139+
p3 <- async $ asyncDo $ expect >>= return
140140
send (_asyncWorker p3) "c"
141141
r1 <- waitAny [p1, p2, p3]
142142

@@ -151,14 +151,14 @@ testAsyncWaitAny result = do
151151

152152
testAsyncWaitAnyTimeout :: TestResult (Maybe (AsyncResult String)) -> Process ()
153153
testAsyncWaitAnyTimeout result = do
154-
p1 <- asyncLinked $ expect >>= return
155-
p2 <- asyncLinked $ expect >>= return
156-
p3 <- asyncLinked $ expect >>= return
154+
p1 <- asyncLinked $ asyncDo $ expect >>= return
155+
p2 <- asyncLinked $ asyncDo $ expect >>= return
156+
p3 <- asyncLinked $ asyncDo $ expect >>= return
157157
waitAnyTimeout (seconds 1) [p1, p2, p3] >>= stash result
158158

159159
testAsyncCancelWith :: TestResult Bool -> Process ()
160160
testAsyncCancelWith result = do
161-
p1 <- async $ do { s :: String <- expect; return s }
161+
p1 <- async $ asyncDo $ do { s :: String <- expect; return s }
162162
cancelWith "foo" p1
163163
AsyncFailed (DiedException _) <- wait p1
164164
stash result True

0 commit comments

Comments
 (0)