Skip to content

Commit

Permalink
Prototype: change credit scaling, using nominal vs physical credit
Browse files Browse the repository at this point in the history
Previously we've had a somewhat complex method to scale credits from
those supplied in a level (1 per update) to the credits used in merging.

The existing scheme has a number of downsides:
* no clear distinction between scaled and unscaled credits, what does
  each one measure?
* complex, needs scaling dependent of the merge-policy
* always uses worst case supply of credits so often finishes early
* rounding errors compound problem of credit over-supply
* no satisfactory way to check we do not over-supply credits
* contributing credits to a merge from multiple handles will cause the
  merge to finish earlier than any of the handles needs it to finish,
  thus doing work more eagerly than necessary.

The new scheme involves:
* distinguishing physical vs nominal credits, with a clear definition
  for each measure
* converting between nominal and physical without merge policy
* physical debt is no longer worst case but matches actual cost
* integer rounding errors do not compound
* we can assert that we reach the nominal and physical debt totals at
  the same moment (when the merge is done) and thus we can assert no
  over-supply of physical credits.
* we can avoid supplying credits too quickly to a merge that is shared
  between multiple handles, each one only pushes it as far as it needs.
  • Loading branch information
dcoutts committed Feb 12, 2025
1 parent 535fdb5 commit 702031b
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 67 deletions.
184 changes: 120 additions & 64 deletions prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ module ScheduledMerges (
LevelMergeType(..),
MergeCredit(..),
MergeDebt(..),
NominalCredit(..),
NominalDebt(..),
Run,
runSize,
supplyCreditsMergingTree,
Expand Down Expand Up @@ -123,7 +125,9 @@ data Level s = Level !(IncomingRun s) ![Run]
-- single run without having to read the 'STRef', and secondly to make it easier
-- to avoid supplying merge credits. It's not essential, but simplifies things
-- somewhat.
data IncomingRun s = Merging !MergePolicy !(MergingRun LevelMergeType s)
data IncomingRun s = Merging !MergePolicy
!NominalDebt !(STRef s NominalCredit)
!(MergingRun LevelMergeType s)
| Single !Run

-- | The merge policy for a LSM level can be either tiering or levelling.
Expand Down Expand Up @@ -321,7 +325,7 @@ invariant (LSMContent _ levels ul) = do
mrs <- case ir of
Single r ->
return (CompletedMerge r)
Merging mp (MergingRun mt _ ref) -> do
Merging mp _ _ (MergingRun mt _ ref) -> do
assertST $ mp == mergePolicyForLevel ln ls ul
&& mt == mergeTypeForLevel ls ul
readSTRef ref
Expand Down Expand Up @@ -520,7 +524,9 @@ assertST p = assert p $ return ()
--

-- | Credits for keeping track of merge progress. These credits
-- correspond directly to merge steps performed.
-- correspond directly to merge steps performed. We also call these \"physical\"
-- credits (since they correspond to steps done), and as opposed to \"nominal\"
-- credits in 'NominalCredit' and 'NominalDebt'.
type Credit = Int

-- | Debt for keeping track of the total merge work to do.
Expand Down Expand Up @@ -586,6 +592,8 @@ paydownMergeDebt :: MergeDebt -> MergeCredit -> Credit -> MergeDebtPaydown
paydownMergeDebt MergeDebt {totalDebt}
MergeCredit {spentCredits, unspentCredits}
c
| assert (c >= 0) False = undefined

| let !suppliedCredits' = suppliedCredits + c
, suppliedCredits' >= totalDebt
, let !leftover = suppliedCredits' - totalDebt
Expand Down Expand Up @@ -633,19 +641,18 @@ mergeBatchSize = 32
-- Merging run abstraction
--

newMergingRun :: IsMergeType t => Maybe Debt -> t -> [Run] -> ST s (MergingRun t s)
newMergingRun mdebt mergeType runs = do
newMergingRun :: IsMergeType t => t -> [Run] -> ST s (MergingRun t s)
newMergingRun mergeType runs = do
assertST $ length runs > 1
-- in some cases, no merging is required at all
(debt, state) <- case filter (\r -> runSize r > 0) runs of
[] -> let (r:_) = runs -- just re-use the empty input
in return (runSize r, CompletedMerge r)
[r] -> return (runSize r, CompletedMerge r)
rs -> do
let !cost = sum (map runSize rs)
!debt = case mdebt of
Nothing -> cost
Just d -> assert (d >= cost) d
-- The (physical) debt is always exactly the cost (merge steps),
-- which is the sum of run lengths in elements.
let !debt = sum (map runSize rs)
let merged = mergek mergeType rs -- deliberately lazy
return (debt, OngoingMerge zeroMergeCredit rs merged)
MergingRun mergeType (MergeDebt debt) <$> newSTRef state
Expand Down Expand Up @@ -708,6 +715,15 @@ supplyCreditsMergingRun =
writeSTRef ref (OngoingMerge mergeCredit' rs r)
return 0

suppliedCreditMergingRun :: MergingRun t s -> ST s Credit
suppliedCreditMergingRun (MergingRun _ d ref) =
readSTRef ref >>= \case
CompletedMerge{} ->
let MergeDebt { totalDebt } = d in
return totalDebt
OngoingMerge MergeCredit {spentCredits, unspentCredits} _ _ ->
return (spentCredits + unspentCredits)

-------------------------------------------------------------------------------
-- LSM handle
--
Expand Down Expand Up @@ -750,7 +766,7 @@ update tr (LSMHandle scr lsmr) k op = do
sc <- readSTRef scr
content@(LSMContent wb ls unionLevel) <- readSTRef lsmr
modifySTRef' scr (+1)
supplyCreditsLevels 1 ls
supplyCreditsLevels (NominalCredit 1) ls
invariant content
let wb' = Map.insertWith combine k op wb
if bufferSize wb' >= maxBufferSize
Expand All @@ -762,7 +778,7 @@ update tr (LSMHandle scr lsmr) k op = do
else
writeSTRef lsmr (LSMContent wb' ls unionLevel)

supplyMergeCredits :: LSM s -> Credit -> ST s ()
supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
supplyMergeCredits (LSMHandle scr lsmr) credits = do
content@(LSMContent _ ls _) <- readSTRef lsmr
modifySTRef' scr (+1)
Expand Down Expand Up @@ -965,48 +981,79 @@ lookupsTree k = go
lookupBatch' = lookupBatch Nothing k

-------------------------------------------------------------------------------
-- Updates
-- Nominal credits
--

newtype NominalCredit = NominalCredit Credit
deriving stock Show

newtype NominalDebt = NominalDebt Credit
deriving stock Show

-- TODO: If there is a UnionLevel, there is no (more expensive) last level merge
-- in the regular levels, so a little less merging work is required than if
-- there was no UnionLevel. It might be a good idea to spend this "saved" work
-- on the UnionLevel instead. This makes future lookups cheaper and ensures that
-- we can get rid of the UnionLevel at some point, even if a user just keeps
-- inserting without calling 'supplyUnionCredits'.
supplyCreditsLevels :: Credit -> Levels s -> ST s ()
supplyCreditsLevels unscaled =
supplyCreditsLevels :: NominalCredit -> Levels s -> ST s ()
supplyCreditsLevels nominalDeposit =
traverse_ $ \(Level ir _rs) -> do
case ir of
Single{} -> return ()
Merging mp mr -> do
factor <- creditsForMerge mp mr
let credits = ceiling (fromIntegral unscaled * factor)
when (credits > 0) $ do
_ <- supplyCreditsMergingRun credits mr
-- we don't mind leftover credits, each level completes independently
return ()

-- | The general case (and thus worst case) of how many merge credits we need
-- for a level. This is based on the merging policy at the level.
--
creditsForMerge :: MergePolicy -> MergingRun t s -> ST s Rational

-- A levelling merge has 1 input run and one resident run, which is (up to) 4x
-- bigger than the others.
-- It needs to be completed before another run comes in.
creditsForMerge MergePolicyLevelling _ =
return $ (1 + 4) / 1
Merging _mp nominalDebt nominalCreditVar
mr@(MergingRun _ physicalDebt _) -> do

nominalCredit <- depositNominalCredit
nominalDebt nominalCreditVar nominalDeposit
physicalCredit <- suppliedCreditMergingRun mr
let !physicalCredit' = scaleNominalToPhysicalCredit
nominalDebt physicalDebt nominalCredit
-- Our target physicalCredit' could actually be less than the
-- actual current physicalCredit if other tables were contributing
-- credits to the shared merge.
!physicalDeposit = physicalCredit' - physicalCredit

-- So we may have a zero or negative deposit, which we ignore.
when (physicalDeposit > 0) $ do
leftoverCredits <- supplyCreditsMergingRun physicalDeposit mr
-- For merges at ordinary levels (not unions) we expect to hit the
-- debt limit exactly and never exceed it.
assert (leftoverCredits == 0) $ return ()

scaleNominalToPhysicalCredit ::
NominalDebt
-> MergeDebt
-> NominalCredit
-> Credit
scaleNominalToPhysicalCredit (NominalDebt nominalDebt)
MergeDebt { totalDebt = physicalDebt }
(NominalCredit nominalCredit) =
floor $ toRational nominalCredit * toRational physicalDebt
/ toRational nominalDebt
-- This specification using Rational as an intermediate representation can
-- be implemented efficiently using only integer operations.

depositNominalCredit ::
NominalDebt
-> STRef s NominalCredit
-> NominalCredit
-> ST s NominalCredit
depositNominalCredit (NominalDebt nominalDebt)
nominalCreditVar
(NominalCredit deposit) = do
NominalCredit before <- readSTRef nominalCreditVar
-- Depositing _could_ leave the credit higher than the debt, because
-- sometimes under-full runs mean we don't shuffle runs down the levels
-- as quickly as the worst case. So here we do just drop excess nominal
-- credits.
let !after = NominalCredit (min (before + deposit) nominalDebt)
writeSTRef nominalCreditVar after
return after

-- A tiering merge has 5 runs at most (once could be held back to merged again)
-- and must be completed before the level is full (once 4 more runs come in).
creditsForMerge MergePolicyTiering (MergingRun _ _ ref) = do
readSTRef ref >>= \case
CompletedMerge _ -> return 0
OngoingMerge _ rs _ -> do
let numRuns = length rs
assertST $ numRuns `elem` [4, 5]
return $ fromIntegral numRuns / 4
-------------------------------------------------------------------------------
-- Updates
--

increment :: forall s. Tracer (ST s) Event
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s)
Expand All @@ -1028,7 +1075,7 @@ increment tr sc run0 ls0 ul = do
go !ln incoming (Level ir rs : ls) = do
r <- case ir of
Single r -> return r
Merging mergePolicy mr -> do
Merging mergePolicy _ _ mr -> do
r <- expectCompletedMergingRun mr
traceWith tr' MergeCompletedEvent {
mergePolicy,
Expand Down Expand Up @@ -1087,26 +1134,37 @@ newLevelMerge :: Tracer (ST s) EventDetail
-> [Run] -> ST s (IncomingRun s)
newLevelMerge _ _ _ _ [r] = return (Single r)
newLevelMerge tr level mergePolicy mergeType rs = do
assertST (length rs `elem` [4, 5])
mergingRun@(MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
assertST (totalDebt physicalDebt <= maxPhysicalDebt)
traceWith tr MergeStartedEvent {
mergePolicy,
mergeType,
mergeDebt = debt,
mergeDebt = totalDebt physicalDebt,
mergeRunsSize = map runSize rs
}
assertST (length rs `elem` [4, 5])
Merging mergePolicy <$> newMergingRun (Just debt) mergeType rs
nominalCreditVar <- newSTRef (NominalCredit 0)
pure (Merging mergePolicy nominalDebt nominalCreditVar mergingRun)
where
-- How much we need to discharge before the merge can be guaranteed
-- complete. More precisely, this is the maximum amount a merge at this
-- level could need. While the real @cost@ of a merge would lead to merges
-- finishing early, the overestimation @debt@ means that in this prototype
-- merges will only complete at the last possible moment.
-- Note that for levelling this is includes the single run in the current
-- level.
debt = case mergePolicy of
MergePolicyLevelling -> 4 * tieringRunSize (level-1)
+ levellingRunSize level
MergePolicyTiering -> length rs * tieringRunSize (level-1)
-- The nominal debt equals the minimum of credits we will supply before we
-- expect the merge to complete. This is the same as the number of updates.
nominalDebt = NominalDebt (tieringRunSize level)

-- The physical debt is the number of actual merge steps we will need to
-- perform before the merge is complete. This is always the sum of the
-- lengths of the input runs.
--
-- As we supply nominal credit, we scale them and supply physical credits,
-- such that we pay off the physical and nominal debts at the same time.
--
-- We can bound the worst case physical debt: this is the maximum amount of
-- steps a merge at this level could need. Note that for levelling this is
-- includes the single run in the current level.
maxPhysicalDebt =
case mergePolicy of
MergePolicyLevelling -> 4 * tieringRunSize (level-1)
+ levellingRunSize level
MergePolicyTiering -> length rs * tieringRunSize (level-1)

-- | Only based on run count, not their sizes.
tieringLevelIsFull :: Int -> [Run] -> [Run] -> Bool
Expand Down Expand Up @@ -1172,8 +1230,8 @@ newPendingLevelMerge irs tree = do
st = PendingTreeMerge (PendingLevelMerge prs tree)
Just . MergingTree <$> newSTRef st
where
incomingToPreExistingRun (Single r) = PreExistingRun r
incomingToPreExistingRun (Merging _ mr) = PreExistingMergingRun mr
incomingToPreExistingRun (Single r) = PreExistingRun r
incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr

-- | Ensures that the merge contains more than one input.
newPendingUnionMerge :: [MergingTree s] -> ST s (Maybe (MergingTree s))
Expand Down Expand Up @@ -1286,12 +1344,10 @@ supplyCreditsMergingTreeState credits !state = do
else do
-- all children must be done, create new merge!
(mergeType, rs) <- expectCompletedChildren pm
-- no reason to claim a larger debt than sum of run sizes
let debt = Nothing
case rs of
[r] -> return (c', CompletedTreeMerge r)
_ -> do
state' <- OngoingTreeMerge <$> newMergingRun debt mergeType rs
state' <- OngoingTreeMerge <$> newMergingRun mergeType rs
-- use any remaining credits to progress the new merge
supplyCreditsMergingTreeState c' state'

Expand Down Expand Up @@ -1368,8 +1424,8 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir

flattenIncomingRun :: IncomingRun s -> ST s [Run]
flattenIncomingRun = \case
Single r -> return [r]
Merging _ mr -> flattenMergingRun mr
Single r -> return [r]
Merging _ _ _ mr -> flattenMergingRun mr

flattenMergingRun :: MergingRun t s -> ST s [Run]
flattenMergingRun (MergingRun _ _ ref) = do
Expand Down Expand Up @@ -1431,7 +1487,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do
dumpLevel :: Level s -> ST s LevelRepresentation
dumpLevel (Level (Single r) rs) =
return (Nothing, (r:rs))
dumpLevel (Level (Merging mp (MergingRun mt _ ref)) rs) = do
dumpLevel (Level (Merging mp _nd _nc (MergingRun mt _ ref)) rs) = do
mrs <- readSTRef ref
return (Just (mp, mt, mrs), rs)

Expand Down
6 changes: 3 additions & 3 deletions prototypes/ScheduledMergesTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ test_regression_empty_run =
]

-- finish merge
LSM.supplyMergeCredits lsm 16
LSM.supplyMergeCredits lsm (NominalCredit 16)

expectShape lsm
0
Expand Down Expand Up @@ -143,7 +143,7 @@ test_merge_again_with_incoming =
]

-- complete the merge (20 entries, but credits get scaled up by 1.25)
LSM.supplyMergeCredits lsm 16
LSM.supplyMergeCredits lsm (NominalCredit 16)

expectShape lsm
0
Expand Down Expand Up @@ -272,7 +272,7 @@ fromP (PMergingRun m) = PreExistingMergingRun <$> fromM m
fromM :: IsMergeType t => M t -> ST s (MergingRun t s)
fromM m = do
let (mergeType, mergeDebt, state) = case m of
MCompleted mt md r -> (mt, md, CompletedMerge r)
MCompleted mt md r -> (mt, md, CompletedMerge r)
MOngoing mt md mc rs -> (mt, md, OngoingMerge mc rs' (mergek mt rs'))
where rs' = map getNonEmptyRun rs
MergingRun mergeType mergeDebt <$> newSTRef state
Expand Down

0 comments on commit 702031b

Please sign in to comment.