Skip to content

Various minor fixes #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
-- > ^. ((event :: Event ButtonPush)
-- > ~> ( (On ~@ enter Off))
-- > .| (Off ~@ (set_ (+1) >> enter On))
-- > ) |> (reply currentState))
-- > ) |> (reply currentState)
--
-- Our client code will need to use the @call@ function from the Client module,
-- although it /is/ possible to interact synchronously with an FSM process (e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ callTimeout pid msg ti = bracket (monitor pid) unmonitor $ \mRef -> do
Just m -> do mR <- unwrapMessage m
case mR of
Just r -> return $ Just r
_ -> die $ ExitOther $ baseErr ++ ".Client:InvalidResponseType"
Nothing -> die $ ExitOther $ baseErr ++ ".Client:InvalidResponseType"

-- | Make a synchronous /call/ to the FSM process at "ProcessId". If a
-- "Step" exists that upon receiving an event of type @m@ will eventually
Expand All @@ -98,4 +98,4 @@ call pid msg = bracket (monitor pid) unmonitor $ \mRef -> do
mR <- unwrapMessage msg'
case mR of
Just r -> return r
_ -> die $ ExitOther $ baseErr ++ ".Client:InvalidResponseType"
Nothing -> die $ ExitOther $ baseErr ++ ".Client:InvalidResponseType"
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ walkPFSM st acc
handleRpcRawInputs :: forall s d . (Show s) => State s d
-> (P.Message, SendPort P.Message)
-> Action (State s d)
handleRpcRawInputs st@State{..} (msg, port) =
handleRpcRawInputs st (msg, port) =
handleInput msg $ st { stReply = (sendChan port), stTrans = Q.empty, stInput = Just msg }

handleAllRawInputs :: forall s d. (Show s) => State s d
-> P.Message
-> Action (State s d)
handleAllRawInputs st@State{..} msg =
handleAllRawInputs st msg =
handleInput msg $ st { stReply = noOp, stTrans = Q.empty, stInput = Just msg }

handleExitReason :: forall s d. (Show s) => State s d
-> P.Message
-> Process (Maybe (ProcessAction (State s d)))
handleExitReason st@State{..} msg =
handleExitReason st msg =
let st' = st { stReply = noOp, stTrans = Q.empty, stInput = Just msg }
in tryHandleInput st' msg

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ import Data.Sequence
)
import qualified Data.Sequence as Q (null)
import Data.Typeable (Typeable, typeOf)
import Data.Tuple (swap, uncurry)
import Data.Tuple (swap)
import GHC.Generics

-- | The internal state of an FSM process.
Expand Down Expand Up @@ -312,7 +312,6 @@ apply st msg step
setProcessState s'
-- (_, st') <- runFSM st (addTransition ev)
return $ enqueue st (Just ev)
| otherwise = error $ baseErr ++ ".Internal.Types.apply:InvalidStep"
where
mstash = return . uncurry enqueue . swap
stash (o, s) = return $ enqueue s (Just o)
Expand Down
4 changes: 1 addition & 3 deletions packages/distributed-process-fsm/tests/TestFSM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import Test.Tasty.HUnit (testCase, assertEqual, assertBool)
import Network.Transport.TCP
import qualified Network.Transport as NT

-- import Control.Distributed.Process.Serializable (Serializable)
-- import Control.Monad (void)
import Data.Binary (Binary)
import Data.Maybe (fromJust)
import Data.Typeable (Typeable)
Expand Down Expand Up @@ -157,7 +155,7 @@ republicationOfEvents = do
send pid "yo"
send pid On

res' <- receiveChanTimeout (asTimeout $ seconds 20) rp :: Process (Maybe ())
_ <- receiveChanTimeout (asTimeout $ seconds 20) rp :: Process (Maybe ())
liftIO $ assertEqual mempty (Just ()) res

kill pid "thankyou byebye"
Expand Down
25 changes: 24 additions & 1 deletion packages/network-transport-tcp/network-transport-tcp.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ Test-Suite TestQC
data-accessor,
data-accessor-transformers,
mtl,
transformers,
lockfree-queue
Else
Buildable: False
Expand All @@ -105,3 +104,27 @@ Test-Suite TestQC
DeriveDataTypeable
MultiParamTypeClasses
default-language: Haskell2010

executable chat-server
import: warnings
main-is: ChatServer.hs
hs-source-dirs: tests/chat
Default-Language: Haskell2010
build-depends: base >= 4.14 && < 5,
bytestring,
containers,
mtl,
network-transport,
network-transport-tcp

executable chat-client
import: warnings
main-is: ChatClient.hs
hs-source-dirs: tests/chat
Default-Language: Haskell2010
build-depends: base >= 4.14 && < 5,
bytestring,
containers,
network-transport,
network-transport-tcp

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Main (main) where

import System.Environment (getArgs)
import Network.Transport
import Network.Transport.TCP (createTransport)
import Network.Transport.TCP (createTransport, defaultTCPAddr, defaultTCPParameters)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, putMVar, newMVar, readMVar, modifyMVar_, modifyMVar)
import Control.Concurrent (forkIO)
import Control.Monad (forever, forM, unless, when)
Expand All @@ -11,12 +13,12 @@ import qualified Data.Map as Map (fromList, elems, insert, member, empty, size,

chatClient :: MVar () -> EndPoint -> EndPointAddress -> IO ()
chatClient done endpoint serverAddr = do
connect endpoint serverAddr ReliableOrdered
_ <- connect endpoint serverAddr ReliableOrdered defaultConnectHints
cOut <- getPeers >>= connectToPeers
cIn <- newMVar Map.empty

-- Listen for incoming messages
forkIO . forever $ do
_ <- forkIO . forever $ do
event <- receive endpoint
case event of
Received _ msg ->
Expand All @@ -26,7 +28,7 @@ chatClient done endpoint serverAddr = do
didAdd <- modifyMVar cOut $ \conns ->
if not (Map.member addr conns)
then do
Right conn <- connect endpoint addr ReliableOrdered
Right conn <- connect endpoint addr ReliableOrdered defaultConnectHints
return (Map.insert addr conn conns, True)
else
return (conns, False)
Expand All @@ -38,8 +40,7 @@ chatClient done endpoint serverAddr = do
close (conns Map.! addr)
return (Map.delete addr conns)
showNumPeers cOut


_ -> pure () -- DO nothing for unrecognised events

{-
chatState <- newMVar (Map.fromList peerConns)
Expand Down Expand Up @@ -67,7 +68,7 @@ chatClient done endpoint serverAddr = do
let go = do
msg <- BSC.getLine
unless (BS.null msg) $ do
readMVar cOut >>= \conns -> forM (Map.elems conns) $ \conn -> send conn [msg]
_ <- readMVar cOut >>= \conns -> forM (Map.elems conns) $ \conn -> send conn [msg]
go
go
putMVar done ()
Expand All @@ -83,7 +84,7 @@ chatClient done endpoint serverAddr = do
connectToPeers :: [EndPointAddress] -> IO (MVar (Map EndPointAddress Connection))
connectToPeers addrs = do
conns <- forM addrs $ \addr -> do
Right conn <- connect endpoint addr ReliableOrdered
Right conn <- connect endpoint addr ReliableOrdered defaultConnectHints
return (addr, conn)
newMVar (Map.fromList conns)

Expand All @@ -97,11 +98,11 @@ chatClient done endpoint serverAddr = do
main :: IO ()
main = do
host:port:server:_ <- getArgs
Right transport <- createTransport host port
Right transport <- createTransport (defaultTCPAddr host port) defaultTCPParameters
Right endpoint <- newEndPoint transport
clientDone <- newEmptyMVar

forkIO $ chatClient clientDone endpoint (EndPointAddress . BSC.pack $ server)
_ <- forkIO $ chatClient clientDone endpoint (EndPointAddress . BSC.pack $ server)

takeMVar clientDone

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Main (main) where

import System.Environment (getArgs)
import Network.Transport
import Network.Transport.TCP (createTransport)
import Network.Transport.TCP (createTransport, defaultTCPAddr, defaultTCPParameters)
import Control.Monad.State (evalStateT, modify, get)
import Control.Monad (forever)
import Control.Monad.IO.Class (liftIO)
Expand All @@ -10,7 +12,7 @@ import qualified Data.ByteString.Char8 as BSC (pack)
main :: IO ()
main = do
host:port:_ <- getArgs
Right transport <- createTransport host port
Right transport <- createTransport (defaultTCPAddr host port) defaultTCPParameters
Right endpoint <- newEndPoint transport

putStrLn $ "Chat server ready at " ++ (show . endPointAddressToByteString . address $ endpoint)
Expand All @@ -20,9 +22,10 @@ main = do
case event of
ConnectionOpened cid _ addr -> do
get >>= \clients -> liftIO $ do
Right conn <- connect endpoint addr ReliableOrdered
send conn [BSC.pack . show . IntMap.elems $ clients]
Right conn <- connect endpoint addr ReliableOrdered defaultConnectHints
_ <- send conn [BSC.pack . show . IntMap.elems $ clients]
close conn
modify $ IntMap.insert cid (endPointAddressToByteString addr)
modify $ IntMap.insert (fromIntegral cid) (endPointAddressToByteString addr)
ConnectionClosed cid ->
modify $ IntMap.delete cid
modify $ IntMap.delete (fromIntegral cid)
_ -> liftIO . putStrLn $ "Other event received"
1 change: 0 additions & 1 deletion packages/network-transport/network-transport.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ Library
binary >= 0.8 && < 0.9,
bytestring >= 0.10 && < 0.13,
hashable >= 1.2.0.5 && < 1.6,
transformers >= 0.2 && < 0.7,
deepseq >= 1.0 && < 1.7
Exposed-Modules: Network.Transport
Network.Transport.Util
Expand Down
Loading