Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations #22

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 92 additions & 73 deletions src/System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import Control.Concurrent.MVar
import Control.Concurrent.QSemN
import Control.Concurrent.Chan
import Control.Exception (mask_, throw, ArrayException(UndefinedElement),
finally, assert, throwIO)
finally, assert, throwIO, onException)
import System.IO.Error
import GHC.IO.Exception (IOErrorType(ResourceVanished, InvalidArgument))

Expand Down Expand Up @@ -187,93 +187,109 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !
-- at least the target number in flight at once.
--
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult)
submitIO IOCtx {
ioctxBatchSizeLimit',
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
ioops
| iobatchOpCount == 0 = return VU.empty

| iobatchOpCount > ioctxBatchSizeLimit' = do
-- create completion mvars for each sub-batch
batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do
submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops
-- Typical small case. We can be more direct.
| V.length ioops > 0 && V.length ioops <= ioctxBatchSizeLimit'
= mask_ $ do
iobatchCompletion <- newEmptyMVar
prepAndSubmitIOBatch ioctx ioops iobatchCompletion
takeMVar iobatchCompletion

submitIO ioctx@IOCtx {ioctxBatchSizeLimit'} !ioops0 =
-- General case. Needs multiple batches and combining results.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or it needs no batch at all, if V.null ioops. The code handles this case correctly, but the comment only mentions the V.length ioops > ioctxBatchSizeLimit' case

mask_ $ do
iobatchCompletions <- prepAndSubmitIOBatches [] ioops0
awaitIOBatches iobatchCompletions
where
prepAndSubmitIOBatches acc !ioops
| V.null ioops = return acc
| otherwise = do
let batch = V.take ioctxBatchSizeLimit' ioops
iobatchCompletion <- newEmptyMVar
return (b, iobatchCompletion)

forM_ batches $ \(batch, iobatchCompletion) -> do
let !iobatchOpCount' = V.length batch
waitQSemN ioctxQSemN iobatchOpCount'
iobatchIx <- readChan ioctxChanIOBatchIx
let iobatchKeepAlives = batch
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount = iobatchOpCount',
iobatchCompletion,
iobatchKeepAlives
}

submitBatch iobatchIx batch

waitAndCombine batches

| otherwise = do
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
prepAndSubmitIOBatch ioctx batch iobatchCompletion
prepAndSubmitIOBatches (iobatchCompletion:acc)
(V.drop ioctxBatchSizeLimit' ioops)

awaitIOBatches iobatchCompletions =
VU.concat <$> mapM takeMVar (reverse iobatchCompletions)
Comment on lines +213 to +214
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was giving some thought to whether it is okay that takeMVar is interruptible here. I suppose if the submitting thread gets interrupted and doesn't take the MVars, that should be fine because the MVars can be GC'ed. No explicit cleanup required

Comment on lines +213 to +214
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Int terms of allocations/number of allocated objects, would reverse on the vector be better than reverse on the list?


-- Must be called with async exceptions masked. See mask_ above in submitIO.
prepAndSubmitIOBatch :: IOCtx
-> V.Vector (IOOp IO)
-> MVar (VU.Vector IOResult)
-> IO ()
prepAndSubmitIOBatch IOCtx {
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
!iobatch !iobatchCompletion = do
let !iobatchOpCount = V.length iobatch
-- We're called with async exceptions masked, but 'waitQSemN' can block and
-- receive exceptions. That's ok. But once we acquire the semaphore
Comment on lines +229 to +230
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because waitQSemN is interruptible, right?

-- quantitiy we must eventully return it. There's two cases for returning:
-- 1. we successfully submit the I/O and pass the information off to the
-- completionThread which will signal the semaphore upon completion, or
-- 2. we encounter an exception here in which case we need to undo the
-- semaphore acquisition.
-- For the latter case we use 'onException'. We also need to obtain a
-- batch index. This should never block because we have as many tokens as
-- QSemN initial quantitiy, and the batch ix is released before the QSemN
-- is signaled in the completionThread.
waitQSemN ioctxQSemN iobatchOpCount
!iobatchIx <- readChan ioctxChanIOBatchIx
-- Thus undoing the acquisition involves releasing the batch index and
-- semaphore quantitiy (which themselves cannot blocks).
let undoAcquisition = do writeChan ioctxChanIOBatchIx iobatchIx
signalQSemN ioctxQSemN iobatchOpCount
flip onException undoAcquisition $ do
-- We can receive an async exception if takeMVar blocks. That's ok, we'll
-- undo the acquisition.
muring <- takeMVar ioctxURing
-- From here on we cannot receive any async exceptions, because we do not
-- do any more blocking operations. But we can encounter sync exceptions,
-- so we may still need to release the mvar on exception.
flip onException (putMVar ioctxURing muring) $ do
uring <- maybe (throwIO closed) pure muring
V.iforM_ iobatch $ \ioopix ioop -> case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt (packIOOpId iobatchIx ioopix)
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt (packIOOpId iobatchIx ioopix)
-- TODO: if submitIO or guardPinned throws an exception, we need to
-- undo / clear the SQEs that we prepared.
URing.submitIO uring

-- More async exception safety: we want to inform the completionThread
-- /if and only if/ we successfully submitted a bathc of IO. So now that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- /if and only if/ we successfully submitted a bathc of IO. So now that
-- /if and only if/ we successfully submitted a batch of IO. So now that

-- we have submitted a batch we need to inform the completionThread
-- without interruptions. We're still masked, but writeChan does not
-- throw exceptions and never blocks (unbounded channel) so we should
-- not get async or sync exceptions.
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
iobatchKeepAlives = iobatch
}
submitBatch iobatchIx ioops
takeMVar iobatchCompletion
putMVar ioctxURing muring
where
!iobatchOpCount = V.length ioops

guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing

{-# INLINE submitBatch #-}
submitBatch iobatchIx batch =
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Just uring -> do
V.iforM_ batch $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix in
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
IOOpWrite fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring

waitAndCombine :: [(a, MVar (VU.Vector IOResult))]
-> IO (VU.Vector IOResult)
waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd)

chunksOf :: Int -> V.Vector a -> [V.Vector a]
chunksOf n xs
| V.length xs == 0 = []
| otherwise = V.take n xs : chunksOf n (V.drop n xs)

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
iobatchOpCount :: !Int,
iobatchCompletion :: MVar (VU.Vector IOResult),
iobatchCompletion :: !(MVar (VU.Vector IOResult)),
-- | The list of I\/O operations is sent to the completion
-- thread so that the buffers are kept alive while the kernel
-- is using them.
Expand Down Expand Up @@ -341,6 +357,9 @@ completionThread !uring !done !maxc !qsem !chaniobatch !chaniobatchix = do
VM.write keepAlives iobatchix invalidEntry
result' <- VU.unsafeFreeze result
putMVar completion (result' :: VU.Vector IOResult)
-- Important: release batch index _before_ we signal the QSemN.
-- The other side needs the guarantee that the index is available
-- once it acquires the QSemN.
writeChan chaniobatchix iobatchix
let !qrelease = VU.length result'
signalQSemN qsem qrelease
Expand Down