diff --git a/src/demeter/lib/Demeter.hs b/src/demeter/lib/Demeter.hs index 21cc28d..53fe0df 100644 --- a/src/demeter/lib/Demeter.hs +++ b/src/demeter/lib/Demeter.hs @@ -1,13 +1,17 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MagicHash #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecursiveDo #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} +{-# LANGUAGE UnboxedTuples #-} -- | A resource pool manages the lifecycle and distribution of -- resources across multiple threads, where a "resource" is anything @@ -172,7 +176,8 @@ module Demeter ) where -import Control.Concurrent +import Control.Applicative ((<|>)) +import Control.Concurrent hiding (forkIO) import Control.Concurrent.STM import Control.Concurrent.STM.Fsifo import Control.Exception @@ -181,25 +186,44 @@ import Data.Coerce (coerce) import Data.Foldable (traverse_) import Data.Word (Word64) import GHC.Clock (getMonotonicTimeNSec) +import GHC.Conc (ThreadId (ThreadId)) +import GHC.Exts (fork#) +import GHC.IO (IO (IO), unsafeUnmask) -- | A resource pool data Pool a = Pool - { availableResources :: {-# UNPACK #-} !(TVar [PoolEntry a]), + { -- The resources that have been used and returned to the pool. If they remain here for `resourceExpiry`, then + -- they'll be released by the reaper thread. The only way a resource can end up here is if there are no waiters in + -- `awaitingResources` at the time it's returned to the pool. The resources are approximately in descending return + -- time order: the head of the list contains the resource that was returned most recently, with the greatest (or + -- very close to the greatest) `entryTime`. + availableResources :: {-# UNPACK #-} !(TVar [PoolEntry a]), + -- A queue of waiters. When we go to return a resource to the pool, we'll first try handing it to the first waiter + -- in the queue, if any. awaitingResources :: {-# UNPACK #-} !(Fsifo (TMVar (ResourceNotification a))), + -- The total number of resources we've either created, or have committed to creating. All resources we've committed + -- to creating will be in the form of `CreationTicket`s in the `availableResources` list, which allow takers to + -- (attempt to) create a resource without bumping this counter again. createdResourceCount :: {-# UNPACK #-} !(TVar Int), + -- The maximum number of resources that can be created at any given time. maxResourceCount :: {-# UNPACK #-} !Int, + -- The amount of time a resource can sit in `availableResources` before it is released. resourceExpiry :: {-# UNPACK #-} !Nanoseconds, acquireResource :: IO a, releaseResource :: a -> IO (), - reaperThread :: {-# UNPACK #-} !(Thread ()) + reaperThread :: {-# UNPACK #-} !(Thread (Maybe UnexpectedReaperException)) } +-- A "resource notification" is what one waiting on a resource will receive: either a resource, or permission to create +-- a new resource. data ResourceNotification a = ResourceAvailable {-# UNPACK #-} !(PoolEntry a) - | CreationTicket + | -- Permission to create a new resource + CreationTicket data PoolEntry a = PoolEntry + -- The monotonic time that this resource was returned to the pool. {-# UNPACK #-} !Nanoseconds a @@ -258,7 +282,9 @@ destroyPool Pool {reaperThread} = do let Thread tid doneVar = reaperThread uninterruptibleMask_ do killThread tid - atomically (readTMVar doneVar) + atomically (readTMVar doneVar) >>= \case + Nothing -> pure () + Just exception -> throwIO exception data Thread a = Thread {-# UNPACK #-} !ThreadId {-# UNPACK #-} !(TMVar a) @@ -266,9 +292,14 @@ data Thread a nanoSecondsToSleepTime :: Nanoseconds -> Microseconds nanoSecondsToSleepTime w = -- don't sleep less than 1 second - max 1000000 (ns2td w + 1) + max 1_000_000 (ns2td w + 1) -data UnexpectedReaperException +data WorkerDiedByAsyncException + = WorkerDiedByAsyncException SomeException -- invariant: this is an async exception + deriving stock (Show) + deriving anyclass (Exception) + +newtype UnexpectedReaperException = UnexpectedReaperException SomeException deriving stock (Show) @@ -293,36 +324,60 @@ forkReaper :: TVar [PoolEntry a] -> TVar Int -> (a -> IO ()) -> - IO (Thread ()) + IO (Thread (Maybe UnexpectedReaperException)) forkReaper expiry waiters resourceVar createdResourceCountVar destroyAction = do mtid <- myThreadId resVar <- newEmptyTMVarIO workerCountVar <- newTVarIO @Int 0 + -- If an async exception is raised in a worker, the worker (tries to) place the exception here + workerExceptionVar <- newEmptyTMVarIO tid <- mask_ $ forkIOWithUnmask \unmask -> do - let errHandler e = do - case fromException @AsyncException e of - Just ThreadKilled -> do - reap =<< readTVarIO resourceVar - waitForWorkers - _ -> do - throwTo mtid (UnexpectedReaperException e) - atomically (putTMVar resVar ()) + let errHandler e = uninterruptibleMask_ do + reap =<< readTVarIO resourceVar + -- FIXME we don't want to wait for workers, then have a new worker spawned after + waitForWorkers + let propagateException e2 = + unsafeUnmask (try (throwTo mtid e2)) >>= \case + Left e3 + | Just ThreadKilled <- fromException e3 -> atomically (putTMVar resVar (Just e2)) + | otherwise -> propagateException e2 + Right () -> atomically (putTMVar resVar Nothing) + case () of + () + | Just ThreadKilled <- fromException e -> + atomically do + maybeWorkerException <- tryTakeTMVar workerExceptionVar + putTMVar resVar $ + coerce @(Maybe SomeException) @(Maybe UnexpectedReaperException) maybeWorkerException + | Just (WorkerDiedByAsyncException e2) <- fromException e -> + propagateException (UnexpectedReaperException e2) + | otherwise -> propagateException (UnexpectedReaperException e) expirySleepTime = nanoSecondsToSleepTime expiry action sleepTime = do - nap sleepTime - readTVarIO resourceVar >>= \case - -- If the resource var is empty, then we add ourselves as - -- a waiter to the resourceVar and return to sleep. Once a - -- value is put to the resource var we restart our loop - -- with 'expiry' sleep time (as a freshly returned - -- resource will remain valid for at least 'expiry' time). - -- - -- This avoids waking up the reaper if there is no pool - -- activity or so much activity that resources are - -- returned straight to waiters. - [] -> atomically (guard . not . null =<< readTVar resourceVar) >> action (nanoSecondsToSleepTime expiry) - _ -> reapDead >>= action + signal <- registerDelay (coerce @Microseconds @Int sleepTime) + (join . unmask . atomically) do + let workerDiedHandler = do + exception <- readTMVar workerExceptionVar + pure (errHandler (toException (WorkerDiedByAsyncException exception))) + let naptimeOverHandler = do + readTVar signal >>= \case + False -> retry + True -> pure () + pure do + readTVarIO resourceVar >>= \case + -- If the resource var is empty, then we add ourselves as + -- a waiter to the resourceVar and return to sleep. Once a + -- value is put to the resource var we restart our loop + -- with 'expiry' sleep time (as a freshly returned + -- resource will remain valid for at least 'expiry' time). + -- + -- This avoids waking up the reaper if there is no pool + -- activity or so much activity that resources are + -- returned straight to waiters. + [] -> atomically (guard . not . null =<< readTVar resourceVar) >> action expirySleepTime + _ -> reapDead >>= action + workerDiedHandler <|> naptimeOverHandler reapDead = do ct <- getCurrentTime let expiryAgo = ct - expiry @@ -331,11 +386,10 @@ forkReaper expiry waiters resourceVar createdResourceCountVar destroyAction = do writeTVar resourceVar living pure (living, dead) reap dead - pure $ case living of + pure case living of [] -> expirySleepTime - _ -> nanoSecondsToSleepTime ((entryTime $ last living) - expiryAgo) - reap xs = do - traverse_ (releaseWithWorker . entryValue) xs + _ -> nanoSecondsToSleepTime ((entryTime $ last living) - expiryAgo) -- last living expires first + reap = traverse_ (releaseWithWorker . entryValue) waitForWorkers = do atomically do readTVar workerCountVar >>= \case @@ -343,19 +397,28 @@ forkReaper expiry waiters resourceVar createdResourceCountVar destroyAction = do _ -> retry releaseWithWorker resource = do - atomically do - modifyTVar' workerCountVar (+ 1) + atomically (modifyTVar' workerCountVar (+ 1)) _tid <- forkIO do - unmask (destroyResource' destroyAction waiters resourceVar createdResourceCountVar resource) `catch` \(_ :: SomeException) -> pure () - atomically do - modifyTVar' workerCountVar (subtract 1) + destroyResource' destroyAction waiters resourceVar createdResourceCountVar resource + -- We know `e` is an async exception, because destroyResource' silences synchronous exceptions. We don't + -- want to ignore it, so we propagate it to the reaper, which will eventually propagate it to the thread + -- that created it. + `catch` \(e :: SomeException) -> void (atomically (tryPutTMVar workerExceptionVar e)) + atomically (modifyTVar' workerCountVar (subtract 1)) pure () - action (nanoSecondsToSleepTime expiry) `catch` (uninterruptibleMask_ . errHandler) + action expirySleepTime `catch` errHandler pure (Thread tid resVar) +-- Return a resource notification to the pool. +-- +-- If there is a waiter, hand it to them directly. +-- +-- Otherwise, +-- If the resource notification is a real resource, add it to the list of available resources. +-- Otherwise, it's a creation ticket; "return" it by decrementing the created resources count. returnResourceSTM :: Pool a -> ResourceNotification a -> STM () -returnResourceSTM Pool {awaitingResources, availableResources, createdResourceCount} c = - returnResourceSTM' awaitingResources availableResources createdResourceCount c +returnResourceSTM Pool {awaitingResources, availableResources, createdResourceCount} = + returnResourceSTM' awaitingResources availableResources createdResourceCount {-# INLINE returnResourceSTM #-} returnResourceSTM' :: @@ -364,11 +427,11 @@ returnResourceSTM' :: TVar Int -> ResourceNotification a -> STM () -returnResourceSTM' awaitingResources availableResources createdResourceCount c = do - pop awaitingResources >>= \case +returnResourceSTM' awaitingResources availableResources createdResourceCount c = + popFsifo awaitingResources >>= \case Nothing -> case c of ResourceAvailable v -> modifyTVar availableResources (v :) - CreationTicket -> modifyTVar' createdResourceCount (\x -> x - 1) + CreationTicket -> modifyTVar' createdResourceCount (subtract 1) Just w -> putTMVar w c {-# INLINE returnResourceSTM' #-} @@ -381,6 +444,15 @@ popAvailable Pool {availableResources} = do pure (Just x) {-# INLINE popAvailable #-} +-- Destroy a resource by calling its release action (ignoring exceptions), then returning a creation ticket to the pool. +-- +-- We ignore release action exceptions because there are only two circumstances under which we destroy resources: +-- +-- - A user callback threw an exception; we consider that exception more important. +-- - The reaper is releasing a resource after it's been idle for `resourceExpiry`. +-- - We are destroying the pool, so we don't care about release action exceptions. +-- +-- This action never throws a synchronous exception. destroyResource :: Pool a -> a -> IO () destroyResource Pool {releaseResource, awaitingResources, availableResources, createdResourceCount} c = do destroyResource' releaseResource awaitingResources availableResources createdResourceCount c @@ -394,7 +466,7 @@ destroyResource' :: a -> IO () destroyResource' release waiters rs crc r = do - release r `onException` atomically (returnResourceSTM' waiters rs crc CreationTicket) + release r `catchAllSynchronous` \_ -> pure () atomically (returnResourceSTM' waiters rs crc CreationTicket) {-# INLINE destroyResource' #-} @@ -430,31 +502,40 @@ takeResource :: Pool a -> IO a takeResource p@Pool {createdResourceCount, maxResourceCount, acquireResource, awaitingResources} = (join . atomically) do popAvailable p >>= \case - Just (PoolEntry _ x) -> pure (pure x) + Just entry -> pure (pure (entryValue entry)) Nothing -> do resourceCount <- readTVar createdResourceCount case resourceCount < maxResourceCount of True -> do writeTVar createdResourceCount $! resourceCount + 1 - pure (acquireResource `onException` atomically (returnResourceSTM p CreationTicket)) + pure acquire False -> do var <- newEmptyTMVar - removeSelf <- push awaitingResources var - let handleNotif = \case - ResourceAvailable x -> pure (entryValue x) - CreationTicket -> acquireResource `onException` atomically (returnResourceSTM p CreationTicket) - dequeue = atomically do - _ <- removeSelf - tryTakeTMVar var >>= \case - Nothing -> pure () - Just v -> returnResourceSTM p v - pure (handleNotif =<< (atomically (takeTMVar var) `onException` dequeue)) + removeSelf <- pushFsifo awaitingResources var + pure do + notification <- + atomically (takeTMVar var) `onException` do + -- We got hit with an async exception while waiting for a resource, so remove ourselves from the + -- waiters queue. If someone happened to give us a resource before we manage to do so, return it to + -- the pool. + atomically do + _ <- removeSelf + tryTakeTMVar var >>= \case + Nothing -> pure () + Just v -> returnResourceSTM p v + case notification of + ResourceAvailable x -> pure (entryValue x) + CreationTicket -> acquire + where + -- Attempt to acquire a resource. If that fails, return a creation ticket to the pool, which will allow the next + -- taker to try without bumping the `createdResourceCount`. + acquire = acquireResource `onException` atomically (returnResourceSTM p CreationTicket) {-# INLINE takeResource #-} returnResource :: Pool a -> a -> IO () -returnResource a b = do +returnResource pool resource = do ct <- getCurrentTime - atomically (returnResourceSTM a (ResourceAvailable (PoolEntry ct b))) + atomically (returnResourceSTM pool (ResourceAvailable (PoolEntry ct resource))) {-# INLINE returnResource #-} newtype Nanoseconds @@ -471,7 +552,7 @@ newtype Microseconds ms2ns :: Milliseconds -> Nanoseconds ms2ns (Milliseconds x) = - let ns = x * 1000000 + let ns = x * 1_000_000 in case ns < x of True -> Nanoseconds maxBound False -> Nanoseconds ns @@ -485,8 +566,19 @@ ns2td (Nanoseconds w) = case microseconds >= largestIntWord of largestInt = maxBound @Int largestIntWord = fromIntegral @Int @Word64 largestInt -nap :: Microseconds -> IO () -nap (Microseconds x) = threadDelay x - getCurrentTime :: IO Nanoseconds getCurrentTime = coerce getMonotonicTimeNSec + +-- Control.Concurrent.forkIO without the default exception handler +forkIO :: IO () -> IO ThreadId +forkIO (IO action) = + IO \s0 -> + case fork# action s0 of + (# s1, tid #) -> (# s1, ThreadId tid #) + +catchAllSynchronous :: IO a -> (SomeException -> IO a) -> IO a +catchAllSynchronous action handler = + action `catch` \e -> + case fromException e of + Just (SomeAsyncException _) -> throwIO e + _ -> handler e diff --git a/src/stm-fsifo/lib/Control/Concurrent/STM/Fsifo.hs b/src/stm-fsifo/lib/Control/Concurrent/STM/Fsifo.hs index 77c2e41..926910e 100644 --- a/src/stm-fsifo/lib/Control/Concurrent/STM/Fsifo.hs +++ b/src/stm-fsifo/lib/Control/Concurrent/STM/Fsifo.hs @@ -4,8 +4,8 @@ module Control.Concurrent.STM.Fsifo ( Fsifo, newFsifo, newFsifoIO, - push, - pop, + pushFsifo, + popFsifo, ) where @@ -37,7 +37,7 @@ data TDList a {-# UNPACK #-} !(TVar (TDList a)) | TNil --- | Create a @Fsifo@ +-- | Create a @Fsifo@. newFsifo :: STM (Fsifo a) newFsifo = do emptyVarL <- newTVar TNil @@ -45,7 +45,7 @@ newFsifo = do pure (Fsifo emptyVarL emptyVarR) {-# INLINEABLE newFsifo #-} --- | Create a @Fsifo@ in @IO@ +-- | Create a @Fsifo@ in @IO@. newFsifoIO :: IO (Fsifo a) newFsifoIO = do emptyVarL <- newTVarIO TNil @@ -91,7 +91,7 @@ removeSelf tv prevPP prevP nextP = do -- | Push an element onto a queue. -- --- @push@ returns an action that attempts to remove the element from +-- @pushFsifo@ returns an action that attempts to remove the element from -- the queue. -- -- The action returns: @@ -99,8 +99,8 @@ removeSelf tv prevPP prevP nextP = do -- * @True@ if the element was removed from the queue -- -- * @False@ if the element was discovered to be no longer in the queue -push :: Fsifo a -> a -> STM (STM Bool) -push (Fsifo _ tv) a = do +pushFsifo :: Fsifo a -> a -> STM (STM Bool) +pushFsifo (Fsifo _ tv) a = do fwdPointer <- readTVar tv backPointer <- newTVar fwdPointer emptyVar <- newTVar TNil @@ -108,14 +108,14 @@ push (Fsifo _ tv) a = do writeTVar fwdPointer cell writeTVar tv emptyVar pure (maybeRemoveSelf tv backPointer emptyVar) -{-# INLINEABLE push #-} +{-# INLINEABLE pushFsifo #-} -- | Pop an element from a queue. -pop :: Fsifo a -> STM (Maybe a) -pop (Fsifo hv tv) = do +popFsifo :: Fsifo a -> STM (Maybe a) +popFsifo (Fsifo hv tv) = do readTVar hv >>= \case TNil -> pure Nothing TCons bp a fp -> do removeSelf tv bp hv fp pure (Just a) -{-# INLINEABLE pop #-} +{-# INLINEABLE popFsifo #-} diff --git a/src/stm-fsifo/test/Main.hs b/src/stm-fsifo/test/Main.hs index 3405ad1..1032a2d 100644 --- a/src/stm-fsifo/test/Main.hs +++ b/src/stm-fsifo/test/Main.hs @@ -74,7 +74,7 @@ prop_queue = property do <$> liftIO ( atomically do q <- readTVar qvar - Fsifo.push q n + Fsifo.pushFsifo q n ) upd = Update \(QueueModel s pushCount im _) (Push i) out -> let !pushCount' = pushCount + 1 @@ -92,7 +92,7 @@ prop_queue = property do execute Pop = do liftIO $ atomically do q <- readTVar qvar - Fsifo.pop q + Fsifo.popFsifo q upd = Update \(QueueModel s pushCount im _) Pop _out -> let mk = fst <$> IM.lookupGE minBound s s' = maybe s (\k -> IM.delete k s) mk