Skip to content


Merge pull request #3 from awkward-squad/misc-refactoring
Browse files Browse the repository at this point in the history
misc refactoring
  • Loading branch information
tstat authored Mar 16, 2023
2 parents 47f6056 + 3548cea commit 0f7f2b6
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 76 deletions.
218 changes: 155 additions & 63 deletions src/demeter/lib/Demeter.hs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UnboxedTuples #-}

-- | A resource pool manages the lifecycle and distribution of
-- resources across multiple threads, where a "resource" is anything
Expand Down Expand Up @@ -172,7 +176,8 @@ module Demeter

import Control.Concurrent
import Control.Applicative ((<|>))
import Control.Concurrent hiding (forkIO)
import Control.Concurrent.STM
import Control.Concurrent.STM.Fsifo
import Control.Exception
Expand All @@ -181,25 +186,44 @@ import Data.Coerce (coerce)
import Data.Foldable (traverse_)
import Data.Word (Word64)
import GHC.Clock (getMonotonicTimeNSec)
import GHC.Conc (ThreadId (ThreadId))
import GHC.Exts (fork#)
import GHC.IO (IO (IO), unsafeUnmask)

-- | A resource pool
data Pool a = Pool
{ availableResources :: {-# UNPACK #-} !(TVar [PoolEntry a]),
{ -- The resources that have been used and returned to the pool. If they remain here for `resourceExpiry`, then
-- they'll be released by the reaper thread. The only way a resource can end up here is if there are no waiters in
-- `awaitingResources` at the time it's returned to the pool. The resources are approximately in descending return
-- time order: the head of the list contains the resource that was returned most recently, with the greatest (or
-- very close to the greatest) `entryTime`.
availableResources :: {-# UNPACK #-} !(TVar [PoolEntry a]),
-- A queue of waiters. When we go to return a resource to the pool, we'll first try handing it to the first waiter
-- in the queue, if any.
awaitingResources :: {-# UNPACK #-} !(Fsifo (TMVar (ResourceNotification a))),
-- The total number of resources we've either created, or have committed to creating. All resources we've committed
-- to creating will be in the form of `CreationTicket`s in the `availableResources` list, which allow takers to
-- (attempt to) create a resource without bumping this counter again.
createdResourceCount :: {-# UNPACK #-} !(TVar Int),
-- The maximum number of resources that can be created at any given time.
maxResourceCount :: {-# UNPACK #-} !Int,
-- The amount of time a resource can sit in `availableResources` before it is released.
resourceExpiry :: {-# UNPACK #-} !Nanoseconds,
acquireResource :: IO a,
releaseResource :: a -> IO (),
reaperThread :: {-# UNPACK #-} !(Thread ())
reaperThread :: {-# UNPACK #-} !(Thread (Maybe UnexpectedReaperException))

-- A "resource notification" is what one waiting on a resource will receive: either a resource, or permission to create
-- a new resource.
data ResourceNotification a
= ResourceAvailable {-# UNPACK #-} !(PoolEntry a)
| CreationTicket
| -- Permission to create a new resource

data PoolEntry a
= PoolEntry
-- The monotonic time that this resource was returned to the pool.
{-# UNPACK #-} !Nanoseconds

Expand Down Expand Up @@ -258,17 +282,24 @@ destroyPool Pool {reaperThread} = do
let Thread tid doneVar = reaperThread
uninterruptibleMask_ do
killThread tid
atomically (readTMVar doneVar)
atomically (readTMVar doneVar) >>= \case
Nothing -> pure ()
Just exception -> throwIO exception

data Thread a
= Thread {-# UNPACK #-} !ThreadId {-# UNPACK #-} !(TMVar a)

nanoSecondsToSleepTime :: Nanoseconds -> Microseconds
nanoSecondsToSleepTime w =
-- don't sleep less than 1 second
max 1000000 (ns2td w + 1)
max 1_000_000 (ns2td w + 1)

data UnexpectedReaperException
data WorkerDiedByAsyncException
= WorkerDiedByAsyncException SomeException -- invariant: this is an async exception
deriving stock (Show)
deriving anyclass (Exception)

newtype UnexpectedReaperException
= UnexpectedReaperException SomeException
deriving stock (Show)

Expand All @@ -293,36 +324,60 @@ forkReaper ::
TVar [PoolEntry a] ->
TVar Int ->
(a -> IO ()) ->
IO (Thread ())
IO (Thread (Maybe UnexpectedReaperException))
forkReaper expiry waiters resourceVar createdResourceCountVar destroyAction = do
mtid <- myThreadId
resVar <- newEmptyTMVarIO
workerCountVar <- newTVarIO @Int 0
-- If an async exception is raised in a worker, the worker (tries to) place the exception here
workerExceptionVar <- newEmptyTMVarIO

tid <- mask_ $ forkIOWithUnmask \unmask -> do
let errHandler e = do
case fromException @AsyncException e of
Just ThreadKilled -> do
reap =<< readTVarIO resourceVar
_ -> do
throwTo mtid (UnexpectedReaperException e)
atomically (putTMVar resVar ())
let errHandler e = uninterruptibleMask_ do
reap =<< readTVarIO resourceVar
-- FIXME we don't want to wait for workers, then have a new worker spawned after
let propagateException e2 =
unsafeUnmask (try (throwTo mtid e2)) >>= \case
Left e3
| Just ThreadKilled <- fromException e3 -> atomically (putTMVar resVar (Just e2))
| otherwise -> propagateException e2
Right () -> atomically (putTMVar resVar Nothing)
case () of
| Just ThreadKilled <- fromException e ->
atomically do
maybeWorkerException <- tryTakeTMVar workerExceptionVar
putTMVar resVar $
coerce @(Maybe SomeException) @(Maybe UnexpectedReaperException) maybeWorkerException
| Just (WorkerDiedByAsyncException e2) <- fromException e ->
propagateException (UnexpectedReaperException e2)
| otherwise -> propagateException (UnexpectedReaperException e)
expirySleepTime = nanoSecondsToSleepTime expiry
action sleepTime = do
nap sleepTime
readTVarIO resourceVar >>= \case
-- If the resource var is empty, then we add ourselves as
-- a waiter to the resourceVar and return to sleep. Once a
-- value is put to the resource var we restart our loop
-- with 'expiry' sleep time (as a freshly returned
-- resource will remain valid for at least 'expiry' time).
-- This avoids waking up the reaper if there is no pool
-- activity or so much activity that resources are
-- returned straight to waiters.
[] -> atomically (guard . not . null =<< readTVar resourceVar) >> action (nanoSecondsToSleepTime expiry)
_ -> reapDead >>= action
signal <- registerDelay (coerce @Microseconds @Int sleepTime)
(join . unmask . atomically) do
let workerDiedHandler = do
exception <- readTMVar workerExceptionVar
pure (errHandler (toException (WorkerDiedByAsyncException exception)))
let naptimeOverHandler = do
readTVar signal >>= \case
False -> retry
True -> pure ()
pure do
readTVarIO resourceVar >>= \case
-- If the resource var is empty, then we add ourselves as
-- a waiter to the resourceVar and return to sleep. Once a
-- value is put to the resource var we restart our loop
-- with 'expiry' sleep time (as a freshly returned
-- resource will remain valid for at least 'expiry' time).
-- This avoids waking up the reaper if there is no pool
-- activity or so much activity that resources are
-- returned straight to waiters.
[] -> atomically (guard . not . null =<< readTVar resourceVar) >> action expirySleepTime
_ -> reapDead >>= action
workerDiedHandler <|> naptimeOverHandler
reapDead = do
ct <- getCurrentTime
let expiryAgo = ct - expiry
Expand All @@ -331,31 +386,39 @@ forkReaper expiry waiters resourceVar createdResourceCountVar destroyAction = do
writeTVar resourceVar living
pure (living, dead)
reap dead
pure $ case living of
pure case living of
[] -> expirySleepTime
_ -> nanoSecondsToSleepTime ((entryTime $ last living) - expiryAgo)
reap xs = do
traverse_ (releaseWithWorker . entryValue) xs
_ -> nanoSecondsToSleepTime ((entryTime $ last living) - expiryAgo) -- last living expires first
reap = traverse_ (releaseWithWorker . entryValue)
waitForWorkers = do
atomically do
readTVar workerCountVar >>= \case
0 -> pure ()
_ -> retry

releaseWithWorker resource = do
atomically do
modifyTVar' workerCountVar (+ 1)
atomically (modifyTVar' workerCountVar (+ 1))
_tid <- forkIO do
unmask (destroyResource' destroyAction waiters resourceVar createdResourceCountVar resource) `catch` \(_ :: SomeException) -> pure ()
atomically do
modifyTVar' workerCountVar (subtract 1)
destroyResource' destroyAction waiters resourceVar createdResourceCountVar resource
-- We know `e` is an async exception, because destroyResource' silences synchronous exceptions. We don't
-- want to ignore it, so we propagate it to the reaper, which will eventually propagate it to the thread
-- that created it.
`catch` \(e :: SomeException) -> void (atomically (tryPutTMVar workerExceptionVar e))
atomically (modifyTVar' workerCountVar (subtract 1))
pure ()
action (nanoSecondsToSleepTime expiry) `catch` (uninterruptibleMask_ . errHandler)
action expirySleepTime `catch` errHandler
pure (Thread tid resVar)

-- Return a resource notification to the pool.
-- If there is a waiter, hand it to them directly.
-- Otherwise,
-- If the resource notification is a real resource, add it to the list of available resources.
-- Otherwise, it's a creation ticket; "return" it by decrementing the created resources count.
returnResourceSTM :: Pool a -> ResourceNotification a -> STM ()
returnResourceSTM Pool {awaitingResources, availableResources, createdResourceCount} c =
returnResourceSTM' awaitingResources availableResources createdResourceCount c
returnResourceSTM Pool {awaitingResources, availableResources, createdResourceCount} =
returnResourceSTM' awaitingResources availableResources createdResourceCount
{-# INLINE returnResourceSTM #-}

returnResourceSTM' ::
Expand All @@ -364,11 +427,11 @@ returnResourceSTM' ::
TVar Int ->
ResourceNotification a ->
STM ()
returnResourceSTM' awaitingResources availableResources createdResourceCount c = do
pop awaitingResources >>= \case
returnResourceSTM' awaitingResources availableResources createdResourceCount c =
popFsifo awaitingResources >>= \case
Nothing -> case c of
ResourceAvailable v -> modifyTVar availableResources (v :)
CreationTicket -> modifyTVar' createdResourceCount (\x -> x - 1)
CreationTicket -> modifyTVar' createdResourceCount (subtract 1)
Just w -> putTMVar w c
{-# INLINE returnResourceSTM' #-}

Expand All @@ -381,6 +444,15 @@ popAvailable Pool {availableResources} = do
pure (Just x)
{-# INLINE popAvailable #-}

-- Destroy a resource by calling its release action (ignoring exceptions), then returning a creation ticket to the pool.
-- We ignore release action exceptions because there are only two circumstances under which we destroy resources:
-- - A user callback threw an exception; we consider that exception more important.
-- - The reaper is releasing a resource after it's been idle for `resourceExpiry`.
-- - We are destroying the pool, so we don't care about release action exceptions.
-- This action never throws a synchronous exception.
destroyResource :: Pool a -> a -> IO ()
destroyResource Pool {releaseResource, awaitingResources, availableResources, createdResourceCount} c = do
destroyResource' releaseResource awaitingResources availableResources createdResourceCount c
Expand All @@ -394,7 +466,7 @@ destroyResource' ::
a ->
IO ()
destroyResource' release waiters rs crc r = do
release r `onException` atomically (returnResourceSTM' waiters rs crc CreationTicket)
release r `catchAllSynchronous` \_ -> pure ()
atomically (returnResourceSTM' waiters rs crc CreationTicket)
{-# INLINE destroyResource' #-}

Expand Down Expand Up @@ -430,31 +502,40 @@ takeResource :: Pool a -> IO a
takeResource p@Pool {createdResourceCount, maxResourceCount, acquireResource, awaitingResources} =
(join . atomically) do
popAvailable p >>= \case
Just (PoolEntry _ x) -> pure (pure x)
Just entry -> pure (pure (entryValue entry))
Nothing -> do
resourceCount <- readTVar createdResourceCount
case resourceCount < maxResourceCount of
True -> do
writeTVar createdResourceCount $! resourceCount + 1
pure (acquireResource `onException` atomically (returnResourceSTM p CreationTicket))
pure acquire
False -> do
var <- newEmptyTMVar
removeSelf <- push awaitingResources var
let handleNotif = \case
ResourceAvailable x -> pure (entryValue x)
CreationTicket -> acquireResource `onException` atomically (returnResourceSTM p CreationTicket)
dequeue = atomically do
_ <- removeSelf
tryTakeTMVar var >>= \case
Nothing -> pure ()
Just v -> returnResourceSTM p v
pure (handleNotif =<< (atomically (takeTMVar var) `onException` dequeue))
removeSelf <- pushFsifo awaitingResources var
pure do
notification <-
atomically (takeTMVar var) `onException` do
-- We got hit with an async exception while waiting for a resource, so remove ourselves from the
-- waiters queue. If someone happened to give us a resource before we manage to do so, return it to
-- the pool.
atomically do
_ <- removeSelf
tryTakeTMVar var >>= \case
Nothing -> pure ()
Just v -> returnResourceSTM p v
case notification of
ResourceAvailable x -> pure (entryValue x)
CreationTicket -> acquire
-- Attempt to acquire a resource. If that fails, return a creation ticket to the pool, which will allow the next
-- taker to try without bumping the `createdResourceCount`.
acquire = acquireResource `onException` atomically (returnResourceSTM p CreationTicket)
{-# INLINE takeResource #-}

returnResource :: Pool a -> a -> IO ()
returnResource a b = do
returnResource pool resource = do
ct <- getCurrentTime
atomically (returnResourceSTM a (ResourceAvailable (PoolEntry ct b)))
atomically (returnResourceSTM pool (ResourceAvailable (PoolEntry ct resource)))
{-# INLINE returnResource #-}

newtype Nanoseconds
Expand All @@ -471,7 +552,7 @@ newtype Microseconds

ms2ns :: Milliseconds -> Nanoseconds
ms2ns (Milliseconds x) =
let ns = x * 1000000
let ns = x * 1_000_000
in case ns < x of
True -> Nanoseconds maxBound
False -> Nanoseconds ns
Expand All @@ -485,8 +566,19 @@ ns2td (Nanoseconds w) = case microseconds >= largestIntWord of
largestInt = maxBound @Int
largestIntWord = fromIntegral @Int @Word64 largestInt

nap :: Microseconds -> IO ()
nap (Microseconds x) = threadDelay x

getCurrentTime :: IO Nanoseconds
getCurrentTime = coerce getMonotonicTimeNSec

-- Control.Concurrent.forkIO without the default exception handler
forkIO :: IO () -> IO ThreadId
forkIO (IO action) =
IO \s0 ->
case fork# action s0 of
(# s1, tid #) -> (# s1, ThreadId tid #)

catchAllSynchronous :: IO a -> (SomeException -> IO a) -> IO a
catchAllSynchronous action handler =
action `catch` \e ->
case fromException e of
Just (SomeAsyncException _) -> throwIO e
_ -> handler e

0 comments on commit 0f7f2b6

Please sign in to comment.