From 4f80927ebca3aefa60c009f10e2fdc902245692f Mon Sep 17 00:00:00 2001 From: Daniel Brice Date: Tue, 18 Aug 2020 10:12:46 -0700 Subject: [PATCH 1/5] Add example for threadsafe logging. --- outputs/threadsafe-logging.txt | 37 +++++++++++++++++++++++++ threadsafe-logging.hs | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 outputs/threadsafe-logging.txt create mode 100644 threadsafe-logging.hs diff --git a/outputs/threadsafe-logging.txt b/outputs/threadsafe-logging.txt new file mode 100644 index 0000000..783dec3 --- /dev/null +++ b/outputs/threadsafe-logging.txt @@ -0,0 +1,37 @@ +λ> :main +Do Re Mi +Fa Sol +La Ti Do +Fa SolL +Dao TRie DMoi + +Fa Sol +La Ti Do +Do Re Mi +Do Re Mi +Fa Sol +FaD LoSa o RlTe +i MDio + +La Ti Do + +λ> import System.Environment (setEnv) + +λ> setEnv "MAX_QUEUE" "100" + +λ> :main +Fa Sol +La Ti Do +Do Re Mi +La Ti Do +Fa Sol +Do Re Mi +Fa Sol +Do Re Mi +La Ti Do +La Ti Do +Do Re Mi +Fa Sol +Fa Sol +Do Re Mi +La Ti Do diff --git a/threadsafe-logging.hs b/threadsafe-logging.hs new file mode 100644 index 0000000..84979ae --- /dev/null +++ b/threadsafe-logging.hs @@ -0,0 +1,50 @@ +import Control.Concurrent (forkIO, threadDelay) +import Control.Monad.STM (atomically) +import Data.Foldable (for_) +import System.Environment (lookupEnv) +import System.Random (randomRIO) +import Text.Read (readMaybe) + +import qualified Control.Concurrent.STM.TBQueue as TBQ + + +seconds n = 1000000 * n + + +threadsafeLog maxQueue = do + queue <- atomically (TBQ.newTBQueue maxQueue) + + let + logToQueue msg = atomically (TBQ.writeTBQueue queue msg) + + printFromQueue = do + threadDelay (seconds 1 `div` 2) + emptyQueue <- atomically (TBQ.isEmptyTBQueue queue) + if emptyQueue then + return () + else do + msg <- atomically (TBQ.readTBQueue queue) + putStrLn msg + printFromQueue + + return (logToQueue, printFromQueue) + + +sing phrase log = + for_ [1..5] $ \n -> do + i <- randomRIO (1, 10000) + threadDelay (seconds 1 `div` i + n) + log phrase + + +main = do + (log, print) <- do + maxQueue <- (readMaybe =<<) <$> lookupEnv "MAX_QUEUE" + case maxQueue of + Nothing -> return (putStrLn, threadDelay (seconds 3)) + Just n -> threadsafeLog n + + for_ [sing "Do Re Mi", sing "Fa Sol", sing "La Ti Do"] $ + \singPhrase -> forkIO (singPhrase log) + + print From d63441bb9e8fa5ff8d8531fb1904fe71d6719686 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 22 Feb 2022 15:39:36 -0700 Subject: [PATCH 2/5] rename threadsafe-logging to concurrent-logging --- ...adsafe-logging.hs => concurrent-logging.hs | 4 -- outputs/concurrent-logging.txt | 15 ++++++++ outputs/threadsafe-logging.txt | 37 ------------------- tools/outputs.nix | 1 + 4 files changed, 16 insertions(+), 41 deletions(-) rename threadsafe-logging.hs => concurrent-logging.hs (99%) create mode 100644 outputs/concurrent-logging.txt delete mode 100644 outputs/threadsafe-logging.txt diff --git a/threadsafe-logging.hs b/concurrent-logging.hs similarity index 99% rename from threadsafe-logging.hs rename to concurrent-logging.hs index 84979ae..a2aed20 100644 --- a/threadsafe-logging.hs +++ b/concurrent-logging.hs @@ -7,10 +7,8 @@ import Text.Read (readMaybe) import qualified Control.Concurrent.STM.TBQueue as TBQ - seconds n = 1000000 * n - threadsafeLog maxQueue = do queue <- atomically (TBQ.newTBQueue maxQueue) @@ -29,14 +27,12 @@ threadsafeLog maxQueue = do return (logToQueue, printFromQueue) - sing phrase log = for_ [1..5] $ \n -> do i <- randomRIO (1, 10000) threadDelay (seconds 1 `div` i + n) log phrase - main = do (log, print) <- do maxQueue <- (readMaybe =<<) <$> lookupEnv "MAX_QUEUE" diff --git a/outputs/concurrent-logging.txt b/outputs/concurrent-logging.txt new file mode 100644 index 0000000..8646a4a --- /dev/null +++ b/outputs/concurrent-logging.txt @@ -0,0 +1,15 @@ +LaFD aoT iSR oeDl o +M +i +FaL aS oTli + Do +LaF aT iS oDlo + +LaF aT iS oDlo + +FaL aS oTli + Do +Do Re Mi +Do Re Mi +Do Re Mi +Do Re Mi diff --git a/outputs/threadsafe-logging.txt b/outputs/threadsafe-logging.txt deleted file mode 100644 index 783dec3..0000000 --- a/outputs/threadsafe-logging.txt +++ /dev/null @@ -1,37 +0,0 @@ -λ> :main -Do Re Mi -Fa Sol -La Ti Do -Fa SolL -Dao TRie DMoi - -Fa Sol -La Ti Do -Do Re Mi -Do Re Mi -Fa Sol -FaD LoSa o RlTe -i MDio - -La Ti Do - -λ> import System.Environment (setEnv) - -λ> setEnv "MAX_QUEUE" "100" - -λ> :main -Fa Sol -La Ti Do -Do Re Mi -La Ti Do -Fa Sol -Do Re Mi -Fa Sol -Do Re Mi -La Ti Do -La Ti Do -Do Re Mi -Fa Sol -Fa Sol -Do Re Mi -La Ti Do diff --git a/tools/outputs.nix b/tools/outputs.nix index e5cc379..24877de 100644 --- a/tools/outputs.nix +++ b/tools/outputs.nix @@ -7,6 +7,7 @@ examples = [ { name = "bounded-queues"; file = ../bounded-queues.hs; sed = "s!^(finish:.*|start: (6|7|8|9|10))$!...!"; } { name = "branching"; file = ../branching.hs; sed = "s!^It's .* noon$!It's ... noon!"; } { name = "common-types"; file = ../common-types.hs; } + { name = "concurrent-logging"; file = ../concurrent-logging.hs; } { name = "crypto-hashing"; file = ../crypto-hashing.hs; } { name = "dynamic"; file = ../dynamic.hs; } { name = "enum-ranges"; file = ../enum-ranges.hs; } From b7c362deff23e3ed70d4dc56bdec251f9a578bd1 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 22 Feb 2022 15:40:32 -0700 Subject: [PATCH 3/5] thanks: mention concurrent logging --- docs/thanks.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/thanks.md b/docs/thanks.md index 00a202b..ba0aeb6 100644 --- a/docs/thanks.md +++ b/docs/thanks.md @@ -13,4 +13,4 @@ Thanks to the following people who have [contributed](https://typeclasses.github - [Florian Beeres](https://fbrs.io/) -- [Records with optics](https://github.com/typeclasses/haskell-phrasebook/pull/34) - [Yuras Shumovich](https://twitter.com/shumovichy) -- assistance with exception handling in `monitoring.hs` ([1](https://twitter.com/shumovichy/status/1207093768182288386), [2](https://twitter.com/shumovichy/status/1207637508412059648)) - [gutierrezje](https://github.com/gutierrezje) -- [Folding lists](https://github.com/typeclasses/haskell-phrasebook/pull/20) -- [Daniel Brice](https://github.com/friedbrice) -- [Logging](https://github.com/typeclasses/haskell-phrasebook/pull/39) +- [Daniel Brice](https://github.com/friedbrice) -- [Logging](https://github.com/typeclasses/haskell-phrasebook/pull/39) and [Concurrent logging](https://github.com/typeclasses/haskell-phrasebook/pull/40) From 6f9f40442b6c4aeb09b51c22c49e0d6e77089461 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 22 Feb 2022 16:48:00 -0700 Subject: [PATCH 4/5] concurrent-logging: make sure all log messages print Use concurrently_ and forConcurrently_ from the async package instead of forkIO. Use a TVar to tell the printing thread when to stop instead of just stopping when the queue is empty, because an empty queue doesn't necessarily mean nothing else will get logged. Use a continuation passing style to obtain the concurrent log action, because this is encouraged by the async package to ensure that all threads get canceled in the event of an exception. --- concurrent-logging.hs | 84 ++++++++++++++++++---------------- outputs/concurrent-logging.txt | 22 +++++---- 2 files changed, 58 insertions(+), 48 deletions(-) diff --git a/concurrent-logging.hs b/concurrent-logging.hs index a2aed20..7f5a24a 100644 --- a/concurrent-logging.hs +++ b/concurrent-logging.hs @@ -1,46 +1,52 @@ +import Control.Applicative (liftA2) +import Control.Monad (unless, replicateM_) import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.Async (concurrently_, forConcurrently_) import Control.Monad.STM (atomically) import Data.Foldable (for_) import System.Environment (lookupEnv) import System.Random (randomRIO) import Text.Read (readMaybe) -import qualified Control.Concurrent.STM.TBQueue as TBQ - -seconds n = 1000000 * n - -threadsafeLog maxQueue = do - queue <- atomically (TBQ.newTBQueue maxQueue) - - let - logToQueue msg = atomically (TBQ.writeTBQueue queue msg) - - printFromQueue = do - threadDelay (seconds 1 `div` 2) - emptyQueue <- atomically (TBQ.isEmptyTBQueue queue) - if emptyQueue then - return () - else do - msg <- atomically (TBQ.readTBQueue queue) - putStrLn msg - printFromQueue - - return (logToQueue, printFromQueue) - -sing phrase log = - for_ [1..5] $ \n -> do - i <- randomRIO (1, 10000) - threadDelay (seconds 1 `div` i + n) - log phrase - -main = do - (log, print) <- do - maxQueue <- (readMaybe =<<) <$> lookupEnv "MAX_QUEUE" - case maxQueue of - Nothing -> return (putStrLn, threadDelay (seconds 3)) - Just n -> threadsafeLog n - - for_ [sing "Do Re Mi", sing "Fa Sol", sing "La Ti Do"] $ - \singPhrase -> forkIO (singPhrase log) - - print +import qualified Control.Concurrent.STM.TQueue as TQ +import qualified Control.Concurrent.STM.TVar as TV + +randomDelay = + do + i <- randomRIO (1, 1000) + threadDelay i + +-- Here we have a concurrent program that is parameterized on how to write log messages. +choir log = + -- Run three concurrent threads, each singing a different tune. + forConcurrently_ ["Do Re Mi", "Fa Sol", "La Ti Do"] $ \tune -> + replicateM_ 3 $ -- Each member of the choir sings its tune 3 times. + do + randomDelay -- ... with some random delays between each repetition. + log tune + +-- Our demonstration program runs the "choir" two different ways: +main = + do + choir putStrLn -- The first way has a serious flaw, which we shall see in the output. + putStrLn "---" + withConcurrentLog choir -- The second way uses a queue to orchestrate the printing. + +withConcurrentLog go = do + queue <- TQ.newTQueueIO + stopVar <- TV.newTVarIO False + + let + logToQueue msg = atomically (TQ.writeTQueue queue msg) + + printFromQueue = do + randomDelay + stop <- atomically (liftA2 (&&) (TQ.isEmptyTQueue queue) (TV.readTVar stopVar)) + unless stop $ do + msg <- atomically (TQ.readTQueue queue) + putStrLn msg + printFromQueue + + let stop = atomically (TV.writeTVar stopVar True) + + concurrently_ printFromQueue (go logToQueue *> stop) diff --git a/outputs/concurrent-logging.txt b/outputs/concurrent-logging.txt index 8646a4a..3ec12de 100644 --- a/outputs/concurrent-logging.txt +++ b/outputs/concurrent-logging.txt @@ -1,15 +1,19 @@ +FaDL oaS oRTlei + MDio + +DoLF aaR eTS ioM liD + +o LaFD aoT iSR oeDl o M i -FaL aS oTli - Do -LaF aT iS oDlo - -LaF aT iS oDlo - -FaL aS oTli - Do -Do Re Mi +--- Do Re Mi +La Ti Do Do Re Mi +Fa Sol +Fa Sol Do Re Mi +La Ti Do +Fa Sol +La Ti Do From a14a2e807fe5b7a59b4a30426ed9229e5cf6dd77 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 22 Feb 2022 19:25:41 -0700 Subject: [PATCH 5/5] concurrent-logging: write to files but why is the unsynchronized output not jumbled up anymore? --- concurrent-logging.hs | 27 ++++++++++++++++++--------- outputs/concurrent-logging.txt | 22 ++++++++++++---------- tools/outputs.nix | 2 +- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/concurrent-logging.hs b/concurrent-logging.hs index 7f5a24a..5c50d40 100644 --- a/concurrent-logging.hs +++ b/concurrent-logging.hs @@ -7,6 +7,7 @@ import Data.Foldable (for_) import System.Environment (lookupEnv) import System.Random (randomRIO) import Text.Read (readMaybe) +import System.IO import qualified Control.Concurrent.STM.TQueue as TQ import qualified Control.Concurrent.STM.TVar as TV @@ -28,25 +29,33 @@ choir log = -- Our demonstration program runs the "choir" two different ways: main = do - choir putStrLn -- The first way has a serious flaw, which we shall see in the output. - putStrLn "---" - withConcurrentLog choir -- The second way uses a queue to orchestrate the printing. - -withConcurrentLog go = do + -- The first way has a serious flaw, which we shall see in the output. + withFile "log1.txt" WriteMode $ \h -> + do + hSetBuffering h NoBuffering + choir (hPutStrLn h) + + -- The second way uses a queue to orchestrate the printing. + withFile "log2.txt" WriteMode $ \h -> + do + hSetBuffering h NoBuffering + withConcurrentLog (hPutStrLn h) choir + +withConcurrentLog print go = do queue <- TQ.newTQueueIO stopVar <- TV.newTVarIO False let logToQueue msg = atomically (TQ.writeTQueue queue msg) - printFromQueue = do + loop = do randomDelay stop <- atomically (liftA2 (&&) (TQ.isEmptyTQueue queue) (TV.readTVar stopVar)) unless stop $ do msg <- atomically (TQ.readTQueue queue) - putStrLn msg - printFromQueue + print msg + loop let stop = atomically (TV.writeTVar stopVar True) - concurrently_ printFromQueue (go logToQueue *> stop) + concurrently_ loop (go logToQueue *> stop) diff --git a/outputs/concurrent-logging.txt b/outputs/concurrent-logging.txt index 3ec12de..9721eea 100644 --- a/outputs/concurrent-logging.txt +++ b/outputs/concurrent-logging.txt @@ -1,19 +1,21 @@ -FaDL oaS oRTlei - MDio - -DoLF aaR eTS ioM liD - -o -LaFD aoT iSR oeDl o -M -i ---- +--- log1.txt --- +Fa Sol +La Ti Do Do Re Mi La Ti Do Do Re Mi Fa Sol +La Ti Do Fa Sol Do Re Mi + +--- log2.txt --- +Do Re Mi +La Ti Do +Fa Sol La Ti Do Fa Sol +Do Re Mi La Ti Do +Do Re Mi +Fa Sol diff --git a/tools/outputs.nix b/tools/outputs.nix index 24877de..205eee8 100644 --- a/tools/outputs.nix +++ b/tools/outputs.nix @@ -7,7 +7,7 @@ examples = [ { name = "bounded-queues"; file = ../bounded-queues.hs; sed = "s!^(finish:.*|start: (6|7|8|9|10))$!...!"; } { name = "branching"; file = ../branching.hs; sed = "s!^It's .* noon$!It's ... noon!"; } { name = "common-types"; file = ../common-types.hs; } - { name = "concurrent-logging"; file = ../concurrent-logging.hs; } + { name = "concurrent-logging"; file = ../concurrent-logging.hs; after = ["echo '--- log1.txt ---' >> $out" "cat log1.txt >> $out" "echo '\n--- log2.txt ---' >> $out" "cat log2.txt >> $out"]; } { name = "crypto-hashing"; file = ../crypto-hashing.hs; } { name = "dynamic"; file = ../dynamic.hs; } { name = "enum-ranges"; file = ../enum-ranges.hs; }