From b8740a16a8b66b79493c565200ef6ea9c9ef6bba Mon Sep 17 00:00:00 2001 From: Oleg Grenrus Date: Thu, 13 Jun 2024 17:17:52 +0300 Subject: [PATCH 1/3] Test submitting big batches --- test/test.hs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/test/test.hs b/test/test.hs index 5d25b59..c9a7e2b 100644 --- a/test/test.hs +++ b/test/test.hs @@ -1,6 +1,12 @@ +{-# LANGUAGE ViewPatterns #-} module Main (main) where -import Control.Exception (SomeException, try) +import Control.Exception (SomeException, try) +import qualified Data.Primitive.ByteArray as P +import qualified Data.Vector as V +import GHC.IO.FD (FD (..)) +import GHC.IO.Handle.FD (handleToFd) +import System.IO import System.IO.BlockIO import Test.Tasty import Test.Tasty.HUnit @@ -11,6 +17,7 @@ main = defaultMain tests tests :: TestTree tests = testGroup "test" [ testCase "example_initClose" example_initClose + , testCase "example_initReadClose" example_initReadClose , testCase "example_closeIsIdempotent" example_closeIsIdempotent ] @@ -19,6 +26,17 @@ example_initClose = do ctx <- initIOCtx defaultIOCtxParams closeIOCtx ctx +example_initReadClose :: Assertion +example_initReadClose = do + ctx <- initIOCtx defaultIOCtxParams + withFile "blockio-uring.cabal" ReadMode $ \hdl -> do + -- handleToFd is available since base-4.16.0.0 + FD { fdFD = fromIntegral -> fd } <- handleToFd hdl + mba <- P.newPinnedByteArray 10 -- TODO: shouldn't use the same array for all ops :) + submitIO ctx $ V.replicate 96 $ + IOOpRead fd 0 mba 0 10 + closeIOCtx ctx + example_closeIsIdempotent :: Assertion example_closeIsIdempotent = do ctx <- initIOCtx defaultIOCtxParams From 03af1447418bf8dbb8c53f1980c85ad3359da639 Mon Sep 17 00:00:00 2001 From: Oleg Grenrus Date: Thu, 13 Jun 2024 19:12:53 +0300 Subject: [PATCH 2/3] Split big batches into subbatches This removes the limit previously imposed on callers to keep their batch sizes below a limit. Splitting into subbatches, submitting them (in a pipelined fashion) and combining the results is now done automatically. This simplifies the interface for users of the library. --- src/System/IO/BlockIO.hs | 100 +++++++++++++++++++++++++++------------ test/test.hs | 10 ++-- 2 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/System/IO/BlockIO.hs b/src/System/IO/BlockIO.hs index e941564..11c5592 100644 --- a/src/System/IO/BlockIO.hs +++ b/src/System/IO/BlockIO.hs @@ -52,6 +52,9 @@ import System.IO.BlockIO.URing (IOResult(..)) -- | IO context: a handle used by threads submitting IO batches. -- data IOCtx = IOCtx { + -- | This is initialised from the 'ioctxBatchSizeLimit' from the 'IOCtxParams'. + ioctxBatchSizeLimit' :: !Int, + -- | IO concurrency control: used by writers to reserve the -- right to submit an IO batch of a given size, and by the -- completion thread to return it on batch completion. @@ -111,6 +114,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do initialBatchIxs = [0 .. ioctxConcurrencyLimit-1] writeList2Chan ioctxChanIOBatchIx initialBatchIxs return IOCtx { + ioctxBatchSizeLimit' = ioctxBatchSizeLimit, ioctxQSemN, ioctxURing, ioctxChanIOBatch, @@ -184,33 +188,67 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int ! -- submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult) submitIO IOCtx { + ioctxBatchSizeLimit', ioctxQSemN, ioctxURing, ioctxChanIOBatch, ioctxChanIOBatchIx } - ioops = do - let !iobatchOpCount = V.length ioops - waitQSemN ioctxQSemN iobatchOpCount - iobatchIx <- readChan ioctxChanIOBatchIx - iobatchCompletion <- newEmptyMVar - let iobatchKeepAlives = ioops - writeChan ioctxChanIOBatch - IOBatch { - iobatchIx, - iobatchOpCount, - iobatchCompletion, - iobatchKeepAlives - } - withMVar ioctxURing $ \case - Nothing -> throwIO closed - Just uring -> do --- print ("submitIO", iobatchOpCount) - V.iforM_ ioops $ \ioopix ioop -> - let !ioopid = packIOOpId iobatchIx ioopix - in - --print ioop >> - case ioop of + ioops + | iobatchOpCount == 0 = return VU.empty + + | iobatchOpCount > ioctxBatchSizeLimit' = do + -- create completion mvars for each sub-batch + batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do + iobatchCompletion <- newEmptyMVar + return (b, iobatchCompletion) + + forM_ batches $ \(batch, iobatchCompletion) -> do + let !iobatchOpCount' = V.length batch + waitQSemN ioctxQSemN iobatchOpCount' + iobatchIx <- readChan ioctxChanIOBatchIx + let iobatchKeepAlives = batch + writeChan ioctxChanIOBatch + IOBatch { + iobatchIx, + iobatchOpCount = iobatchOpCount', + iobatchCompletion, + iobatchKeepAlives + } + + submitBatch iobatchIx batch + + waitAndCombine batches + + | otherwise = do + waitQSemN ioctxQSemN iobatchOpCount + iobatchIx <- readChan ioctxChanIOBatchIx + iobatchCompletion <- newEmptyMVar + let iobatchKeepAlives = ioops + writeChan ioctxChanIOBatch + IOBatch { + iobatchIx, + iobatchOpCount, + iobatchCompletion, + iobatchKeepAlives + } + submitBatch iobatchIx ioops + takeMVar iobatchCompletion + where + !iobatchOpCount = V.length ioops + + guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned + closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing + notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing + + {-# INLINE submitBatch #-} + submitBatch iobatchIx batch = + withMVar ioctxURing $ \case + Nothing -> throwIO closed + Just uring -> do + V.iforM_ batch $ \ioopix ioop -> + let !ioopid = packIOOpId iobatchIx ioopix in + case ioop of IOOpRead fd off buf bufOff cnt -> do guardPinned buf URing.prepareRead uring fd off @@ -221,14 +259,16 @@ submitIO IOCtx { URing.prepareWrite uring fd off (mutableByteArrayContents buf `plusPtr` bufOff) cnt ioopid - URing.submitIO uring --- print ("submitIO", "submitting done") - takeMVar iobatchCompletion - where - closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing - guardPinned mba = do - unless (isMutableByteArrayPinned mba) $ throwIO notPinned - notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing + URing.submitIO uring + + waitAndCombine :: [(a, MVar (VU.Vector IOResult))] + -> IO (VU.Vector IOResult) + waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd) + +chunksOf :: Int -> V.Vector a -> [V.Vector a] +chunksOf n xs + | V.length xs == 0 = [] + | otherwise = V.take n xs : chunksOf n (V.drop n xs) data IOBatch = IOBatch { iobatchIx :: !IOBatchIx, diff --git a/test/test.hs b/test/test.hs index c9a7e2b..3aa122c 100644 --- a/test/test.hs +++ b/test/test.hs @@ -17,7 +17,9 @@ main = defaultMain tests tests :: TestTree tests = testGroup "test" [ testCase "example_initClose" example_initClose - , testCase "example_initReadClose" example_initReadClose + , testCase "example_initReadClose 32" $ example_initReadClose 32 + , testCase "example_initReadClose 96" $ example_initReadClose 96 + , testCase "example_initReadClose 200" $ example_initReadClose 200 , testCase "example_closeIsIdempotent" example_closeIsIdempotent ] @@ -26,14 +28,14 @@ example_initClose = do ctx <- initIOCtx defaultIOCtxParams closeIOCtx ctx -example_initReadClose :: Assertion -example_initReadClose = do +example_initReadClose :: Int -> Assertion +example_initReadClose size = do ctx <- initIOCtx defaultIOCtxParams withFile "blockio-uring.cabal" ReadMode $ \hdl -> do -- handleToFd is available since base-4.16.0.0 FD { fdFD = fromIntegral -> fd } <- handleToFd hdl mba <- P.newPinnedByteArray 10 -- TODO: shouldn't use the same array for all ops :) - submitIO ctx $ V.replicate 96 $ + submitIO ctx $ V.replicate size $ IOOpRead fd 0 mba 0 10 closeIOCtx ctx From 385c84a7064a6e0f0bbdc222efcac4066d429674 Mon Sep 17 00:00:00 2001 From: Oleg Grenrus Date: Thu, 13 Jun 2024 16:05:38 +0300 Subject: [PATCH 3/3] Test submitting empty batches --- test/test.hs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/test.hs b/test/test.hs index 3aa122c..8e19559 100644 --- a/test/test.hs +++ b/test/test.hs @@ -20,6 +20,7 @@ tests = testGroup "test" , testCase "example_initReadClose 32" $ example_initReadClose 32 , testCase "example_initReadClose 96" $ example_initReadClose 96 , testCase "example_initReadClose 200" $ example_initReadClose 200 + , testCase "example_initEmptyClose" example_initEmptyClose , testCase "example_closeIsIdempotent" example_closeIsIdempotent ] @@ -39,6 +40,12 @@ example_initReadClose size = do IOOpRead fd 0 mba 0 10 closeIOCtx ctx +example_initEmptyClose :: Assertion +example_initEmptyClose = do + ctx <- initIOCtx defaultIOCtxParams + _ <- submitIO ctx V.empty + closeIOCtx ctx + example_closeIsIdempotent :: Assertion example_closeIsIdempotent = do ctx <- initIOCtx defaultIOCtxParams