Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up from review on PR #554 #572

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,8 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels ul
OneShot -> do
let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs))
let !thresh = creditThresholdForLevel conf ln
_leftoverCredits <- MR.supplyCredits mr thresh required
leftoverCredits <- MR.supplyCredits mr thresh required
assert (leftoverCredits == 0) $ return ()
-- This ensures the merge is really completed. However, we don't
-- release the merge yet and only briefly inspect the resulting run.
bracket (MR.expectCompleted mr) releaseRef $ \r ->
Expand Down Expand Up @@ -878,6 +879,9 @@ supplyCredits conf c levels =
let !c' = scaleCreditsForMerge mp mr c
let !thresh = creditThresholdForLevel conf ln
_leftoverCredits <- MR.supplyCredits mr thresh c'
--TODO: assert leftoverCredits == 0
-- to assert that we did not finished the merge too early,
-- and thus have spread the work out evenly.
return ()

-- | Scale a number of credits to a number of merge steps to be performed, based
Expand Down Expand Up @@ -915,4 +919,4 @@ scaleCreditsForMerge LevelTiering mr (Credits c) =
creditThresholdForLevel :: TableConfig -> LevelNo -> MR.CreditThreshold
creditThresholdForLevel conf (LevelNo _i) =
let AllocNumEntries (NumEntries x) = confWriteBufferAlloc conf
in MR.CreditThreshold (MR.Credits x)
in MR.CreditThreshold (MR.UnspentCredits (MR.Credits x))
46 changes: 27 additions & 19 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ duplicateRuns (DeRef mr) =
V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs

-- | Take a snapshot of the state of a merging run.
--
-- TODO: this is not concurrency safe! The inputs runs to the merging run could
-- be released concurrently by another thread that completes the merge, while
-- the snapshot is taking place. The solution is for snapshot here to duplicate
-- the runs it returns _while_ holding the mergeState MVar (to exclude threads
-- that might concurrently complete the merge). And then the caller of course
-- must be updated to release the extra references.
--
snapshot ::
(PrimMonad m, MonadMVar m)
=> Ref (MergingRun m h)
Expand Down Expand Up @@ -267,7 +275,7 @@ work to do).
The implementation is similar but somewhat more complex. We also accumulate
unspent credits until they reach a threshold at which point we do a batch of
merging work. Unlike the prototype, the implementation tracks both credits
spent credits as yet unspent. We will elaborate on why and how below.
spent and credits as yet unspent. We will elaborate on why and how below.
In the prototype, the credits spent equals the merge steps performed. The
same holds in the real implementation, but making it so is more complicated.
Expand Down Expand Up @@ -296,7 +304,8 @@ Thus we track two things:
* credits unspent ('UnspentCredits'): credits supplied that are not yet spent
and are thus available to spend.
The credits supplied is the sum of the credits spent and unspent.
The credits supplied is the sum of the credits spent and unspent. We guarantee
that the supplied credits never exceeds the total debt.
The credits spent and the steps performed (or in the process of being
performed) will typically be equal. They are not guaranteed to be equal in the
Expand Down Expand Up @@ -330,7 +339,7 @@ numEntriesToTotalDebt (NumEntries n) = Credits n
-- Note that ideally the batch size for different LSM levels should be
-- co-prime so that merge work at different levels is not synchronised.
--
newtype CreditThreshold = CreditThreshold Credits
newtype CreditThreshold = CreditThreshold UnspentCredits

-- | The supplied credits is simply the sum of all the credits that have been
-- (successfully) supplied to a merging run via 'supplyCredits'.
Expand Down Expand Up @@ -559,8 +568,8 @@ atomicDepositAndSpendCredits (CreditsVar !var) !totalDebt

-- 2. not case 1, but enough unspent credits have accumulated to do
-- a batch of merge work;
| (\(UnspentCredits x)->x) unspent' >= batchThreshold
= spendBatchCredits spent unspent'
| unspent' >= batchThreshold
= spendBatchCredits spent unspent' batchThreshold

-- 3. not case 1 or 2, not enough credits to do any merge work.
| otherwise
Expand All @@ -587,14 +596,15 @@ atomicDepositAndSpendCredits (CreditsVar !var) !totalDebt
assert (leftover >= 0) $
(supplied', UnspentCredits unspent', leftover)

spendBatchCredits (SpentCredits !spent) (UnspentCredits !unspent) =
spendBatchCredits (SpentCredits !spent) (UnspentCredits !unspent)
(UnspentCredits unspentBatchThreshold) =
-- numBatches may be zero, in which case the result will be zero
let !nBatches = unspent `div` batchThreshold
!spend = nBatches * batchThreshold
let !nBatches = unspent `div` unspentBatchThreshold
!spend = nBatches * unspentBatchThreshold
!spent' = spent + spend
!unspent' = unspent - spend
in assert (spend >= 0) $
assert (unspent' < batchThreshold) $
assert (unspent' < unspentBatchThreshold) $
assert (spent' + unspent' == spent + unspent) $
(spend, SpentCredits spent', UnspentCredits unspent')

Expand Down Expand Up @@ -702,11 +712,10 @@ performMergeSteps ::
-> Credits
-> m Bool
performMergeSteps mergeVar creditsVar (Credits credits) =
assert (credits >= 0) $
withMVar mergeVar $ \case
CompletedMerge{} -> pure False
OngoingMerge _rs m -> do
-- We have dealt with the case of credits <= 0 above,
-- so here we know credits is positive
let stepsToDo = credits
(stepsDone, stepResult) <- Merge.steps m stepsToDo
assert (stepResult == MergeDone || stepsDone >= stepsToDo) (pure ())
Expand Down Expand Up @@ -743,8 +752,9 @@ completeMerge mergeVar mergeKnownCompletedVar = do
(OngoingMerge rs m) -> do
-- first try to complete the merge before performing other side effects,
-- in case the completion fails
--TODO: Run.fromMutable claims not to be exception safe
-- may need to use uninteruptible mask
--TODO: Run.fromMutable (used in Merge.complete) claims not to be
-- exception safe so we should probably be using the resource registry
-- and test for exception safety.
r <- Merge.complete m
V.forM_ rs releaseRef
-- Cache the knowledge that we completed the merge
Expand All @@ -768,16 +778,14 @@ expectCompleted (DeRef MergingRun {..}) = do
let totalDebt = numEntriesToTotalDebt mergeNumEntries
suppliedCredits = spentCredits + unspentCredits
!credits = assert (suppliedCredits == totalDebt) $
assert (unspentCredits >= 0) $
unspentCredits

--TODO: what about exception safety: check if it is ok to be interrupted
-- between performMergeSteps and completeMerge here, and above.
weFinishedMerge <- performMergeSteps mergeState mergeCreditsVar credits
-- If an async exception happens before we get to perform the
-- completion, then that is fine. The next 'expectCompleted' will
-- complete the merge.
when weFinishedMerge $ completeMerge mergeState mergeKnownCompleted
-- TODO: can we think of a check to see if we did not do too much work
-- here? <-- assert (suppliedCredits == totalDebt) ought to do it!
-- A related question is if we finished the merge too early, could have
-- spread out the work better.
withMVar mergeState $ \case
CompletedMerge r -> dupRef r -- return a fresh reference to the run
OngoingMerge{} -> do
Expand Down
14 changes: 10 additions & 4 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ instance NFData r => NFData (SnapMergingRunState r) where
Conversion to levels snapshot format
-------------------------------------------------------------------------------}

--TODO: probably generally all the Ref (Run _) here ought to be fresh
-- references, created as we snapshot the levels, so that the runs don't
-- disappear under our feet during the process of making the snapshot durable.
-- At minimum the volatile runs are the inputs to merging runs, but it may be
-- simpler to duplicate them all, and release them all at the end.

{-# SPECIALISE toSnapLevels :: Levels IO h -> IO (SnapLevels (Ref (Run IO h))) #-}
toSnapLevels ::
(PrimMonad m, MonadMVar m)
Expand All @@ -194,14 +200,14 @@ toSnapIncomingRun ::
-> m (SnapIncomingRun (Ref (Run m h)))
toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
toSnapIncomingRun (Merging mergePolicy mergingRun) = do
-- We need to know how many credits were spend and yet unspent so we can
-- restore merge work on snapshot load. No need to snapshot the contents
-- of totalStepsVar here, since we still start counting from 0 again when
-- loading the snapshot.
-- We need to know how many credits were supplied so we can restore merge
-- work on snapshot load.
(mergingRunState,
MR.SuppliedCredits (MR.Credits suppliedCredits),
mergeNumRuns,
mergeNumEntries) <- MR.snapshot mergingRun
-- TODO: MR.snapshot needs to return duplicated run references, and we
-- need to arrange to release them when the snapshoting is done.
let smrs = toSnapMergingRunState mergingRunState
pure $
SnapMergingRun
Expand Down
2 changes: 1 addition & 1 deletion test/Test/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tests = testGroup "Test.Database.LSMTree.Internal.MergingRun"
]

-- | The representation of CreditsPair should round trip properly. This is
-- non-trivial because it uses a packed bit the representation.
-- non-trivial because it uses a packed bitfield representation.
--
prop_CreditsPair :: SpentCredits -> UnspentCredits -> Property
prop_CreditsPair spentCredits unspentCredits =
Expand Down