1+ {-# LANGUAGE DataKinds #-}
12{-# LANGUAGE PatternSynonyms #-}
3+ {-# LANGUAGE UnboxedTuples #-}
24
35{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
46{-# OPTIONS_GHC -Wno-partial-fields #-}
2729module ScheduledMerges (
2830 -- * Main API
2931 LSM ,
32+ TableId (.. ),
3033 LSMConfig (.. ),
3134 Key (K ), Value (V ), resolveValue , Blob (B ),
3235 new ,
@@ -100,25 +103,35 @@ module ScheduledMerges (
100103import Prelude hiding (lookup )
101104
102105import Data.Foldable (for_ , toList , traverse_ )
106+ import Data.Functor.Contravariant
103107import Data.Map.Strict (Map )
104108import qualified Data.Map.Strict as Map
105109import Data.Maybe (catMaybes )
110+ import Data.Primitive.Types
106111import Data.STRef
107112
108113import qualified Control.Exception as Exc (assert )
109114import Control.Monad (foldM , forM , when )
110115import Control.Monad.ST
111116import qualified Control.Monad.Trans.Except as E
112- import Control.Tracer ( Tracer , contramap , traceWith )
117+ import Control.Tracer
113118import GHC.Stack (HasCallStack , callStack )
114119
115120import Text.Printf (printf )
116121
117122import qualified Test.QuickCheck as QC
118123
119- data LSM s = LSMHandle ! (STRef s Counter )
120- ! LSMConfig
121- ! (STRef s (LSMContent s ))
124+ data LSM s = LSMHandle {
125+ tableId :: ! TableId
126+ , _tableCounter :: ! (STRef s Counter )
127+ , _tableConfig :: ! LSMConfig
128+ , _tableContents :: ! (STRef s (LSMContent s ))
129+ }
130+
131+ -- | Identifiers for 'LSM' tables
132+ newtype TableId = TableId Int
133+ deriving stock (Show , Eq , Ord )
134+ deriving newtype (Enum , Prim )
122135
123136-- | Configuration options for individual LSM tables.
124137data LSMConfig = LSMConfig {
@@ -960,8 +973,8 @@ suppliedCreditMergingRun (MergingRun _ d ref) =
960973-- LSM handle
961974--
962975
963- new :: ST s (LSM s )
964- new = newWith conf
976+ new :: Tracer ( ST s ) Event -> TableId -> ST s (LSM s )
977+ new tr tid = newWith tr tid conf
965978 where
966979 -- 4 was the default for both the max write buffer size and size ratio
967980 -- before they were made configurable
@@ -970,16 +983,17 @@ new = newWith conf
970983 , configSizeRatio = 4
971984 }
972985
973- newWith :: LSMConfig -> ST s (LSM s )
974- newWith conf
986+ newWith :: Tracer ( ST s ) Event -> TableId -> LSMConfig -> ST s (LSM s )
987+ newWith tr tid conf
975988 | configMaxWriteBufferSize conf <= 0 =
976989 error " newWith: configMaxWriteBufferSize should be positive"
977990 | configSizeRatio conf <= 1 =
978991 error " newWith: configSizeRatio should be larger than 1"
979992 | otherwise = do
993+ traceWith tr $ NewTableEvent tid conf
980994 c <- newSTRef 0
981995 lsm <- newSTRef (LSMContent Map. empty [] NoUnion )
982- pure (LSMHandle c conf lsm)
996+ pure (LSMHandle tid c conf lsm)
983997
984998inserts :: Tracer (ST s ) Event -> LSM s -> [(Key , Value , Maybe Blob )] -> ST s ()
985999inserts tr lsm kvbs = updates tr lsm [ (k, Insert v b) | (k, v, b) <- kvbs ]
@@ -1009,7 +1023,8 @@ updates :: Tracer (ST s) Event -> LSM s -> [(Key, Entry)] -> ST s ()
10091023updates tr lsm = mapM_ (uncurry (update tr lsm))
10101024
10111025update :: Tracer (ST s ) Event -> LSM s -> Key -> Entry -> ST s ()
1012- update tr (LSMHandle scr conf lsmr) k entry = do
1026+ update tr (LSMHandle tid scr conf lsmr) k entry = do
1027+ traceWith tr $ UpdateEvent tid k entry
10131028 sc <- readSTRef scr
10141029 content@ (LSMContent wb ls unionLevel) <- readSTRef lsmr
10151030 modifySTRef' scr (+ 1 )
@@ -1018,15 +1033,15 @@ update tr (LSMHandle scr conf lsmr) k entry = do
10181033 let wb' = Map. insertWith combine k entry wb
10191034 if bufferSize wb' >= maxWriteBufferSize conf
10201035 then do
1021- ls' <- increment tr sc conf (bufferToRun wb') ls unionLevel
1036+ ls' <- increment ( LevelEvent tid >$< tr) sc conf (bufferToRun wb') ls unionLevel
10221037 let content' = LSMContent Map. empty ls' unionLevel
10231038 invariant conf content'
10241039 writeSTRef lsmr content'
10251040 else
10261041 writeSTRef lsmr (LSMContent wb' ls unionLevel)
10271042
10281043supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
1029- supplyMergeCredits (LSMHandle scr conf lsmr) credits = do
1044+ supplyMergeCredits (LSMHandle _ scr conf lsmr) credits = do
10301045 content@ (LSMContent _ ls _) <- readSTRef lsmr
10311046 modifySTRef' scr (+ 1 )
10321047 supplyCreditsLevels credits ls
@@ -1038,22 +1053,24 @@ data LookupResult v b =
10381053 deriving stock (Eq , Show )
10391054
10401055lookups :: LSM s -> [Key ] -> ST s [LookupResult Value Blob ]
1041- lookups (LSMHandle _ _conf lsmr) ks = do
1056+ lookups (LSMHandle _ _ _conf lsmr) ks = do
10421057 LSMContent wb ls ul <- readSTRef lsmr
10431058 runs <- concat <$> flattenLevels ls
10441059 traverse (doLookup wb runs ul) ks
10451060
1046- lookup :: LSM s -> Key -> ST s (LookupResult Value Blob )
1047- lookup (LSMHandle _ _conf lsmr) k = do
1061+ lookup :: Tracer (ST s ) Event -> LSM s -> Key -> ST s (LookupResult Value Blob )
1062+ lookup tr (LSMHandle tid _ _conf lsmr) k = do
1063+ traceWith tr $ LookupEvent tid k
10481064 LSMContent wb ls ul <- readSTRef lsmr
10491065 runs <- concat <$> flattenLevels ls
10501066 doLookup wb runs ul k
10511067
1052- duplicate :: LSM s -> ST s (LSM s )
1053- duplicate (LSMHandle _scr conf lsmr) = do
1068+ duplicate :: Tracer (ST s ) Event -> TableId -> LSM s -> ST s (LSM s )
1069+ duplicate tr childTid (LSMHandle parentTid _scr conf lsmr) = do
1070+ traceWith tr $ DuplicateEvent childTid parentTid
10541071 scr' <- newSTRef 0
10551072 lsmr' <- newSTRef =<< readSTRef lsmr
1056- pure (LSMHandle scr' conf lsmr')
1073+ pure (LSMHandle childTid scr' conf lsmr')
10571074 -- it's that simple here, because we share all the pure value and all the
10581075 -- STRefs and there's no ref counting to be done
10591076
@@ -1064,9 +1081,12 @@ duplicate (LSMHandle _scr conf lsmr) = do
10641081-- merge that can be performed incrementally (somewhat similar to a thunk).
10651082--
10661083-- The more merge work remains, the more expensive are lookups on the table.
1067- unions :: [LSM s ] -> ST s (LSM s )
1068- unions lsms = do
1069- (confs, trees) <- fmap unzip $ forM lsms $ \ (LSMHandle _ conf lsmr) ->
1084+ unions :: Tracer (ST s ) Event -> TableId -> [LSM s ] -> ST s (LSM s )
1085+ unions tr childTid lsms = do
1086+ traceWith tr $
1087+ let parentTids = fmap tableId lsms
1088+ in UnionsEvent childTid parentTids
1089+ (confs, trees) <- fmap unzip $ forM lsms $ \ (LSMHandle _ _ conf lsmr) ->
10701090 (conf,) <$> (contentToMergingTree =<< readSTRef lsmr)
10711091 -- Check that the configurations are equal
10721092 conf <- case confs of
@@ -1081,7 +1101,7 @@ unions lsms = do
10811101 Union tree <$> newSTRef debt
10821102 lsmr <- newSTRef (LSMContent Map. empty [] unionLevel)
10831103 c <- newSTRef 0
1084- pure (LSMHandle c conf lsmr)
1104+ pure (LSMHandle childTid c conf lsmr)
10851105
10861106-- | The /current/ upper bound on the number of 'UnionCredits' that have to be
10871107-- supplied before a 'union' is completed.
@@ -1097,7 +1117,7 @@ newtype UnionDebt = UnionDebt Debt
10971117-- | Return the current union debt. This debt can be reduced until it is paid
10981118-- off using 'supplyUnionCredits'.
10991119remainingUnionDebt :: LSM s -> ST s UnionDebt
1100- remainingUnionDebt (LSMHandle _ _conf lsmr) = do
1120+ remainingUnionDebt (LSMHandle _ _ _conf lsmr) = do
11011121 LSMContent _ _ ul <- readSTRef lsmr
11021122 UnionDebt <$> case ul of
11031123 NoUnion -> pure 0
@@ -1123,7 +1143,7 @@ newtype UnionCredits = UnionCredits Credit
11231143-- a union has finished. In particular, if the returned number of credits is
11241144-- non-negative, then the union is finished.
11251145supplyUnionCredits :: LSM s -> UnionCredits -> ST s UnionCredits
1126- supplyUnionCredits (LSMHandle scr conf lsmr) (UnionCredits credits)
1146+ supplyUnionCredits (LSMHandle _ scr conf lsmr) (UnionCredits credits)
11271147 | credits <= 0 = pure (UnionCredits 0 )
11281148 | otherwise = do
11291149 content@ (LSMContent _ _ ul) <- readSTRef lsmr
@@ -1399,7 +1419,7 @@ depositNominalCredit (NominalDebt nominalDebt)
13991419-- Updates
14001420--
14011421
1402- increment :: forall s . Tracer (ST s ) Event
1422+ increment :: forall s . Tracer (ST s ) ( EventAt EventDetail )
14031423 -> Counter
14041424 -> LSMConfig
14051425 -> Run -> Levels s -> UnionLevel s -> ST s (Levels s )
@@ -1411,19 +1431,21 @@ increment tr sc conf run0 ls0 ul = do
14111431
14121432 go :: Int -> [Run ] -> Levels s -> ST s (Levels s )
14131433 go ! ln incoming [] = do
1414- let mergePolicy = mergePolicyForLevel ln [] ul
14151434 traceWith tr' AddLevelEvent
1435+ let mergePolicy = mergePolicyForLevel ln [] ul
14161436 ir <- newLevelMerge tr' conf ln mergePolicy (mergeTypeFor [] ) incoming
14171437 pure (Level ir [] : [] )
14181438 where
14191439 tr' = contramap (EventAt sc ln) tr
14201440
14211441 go ! ln incoming (Level ir rs : ls) = do
14221442 r <- case ir of
1423- Single r -> pure r
1443+ Single r -> do
1444+ traceWith tr' $ SingleRunCompletedEvent r
1445+ pure r
14241446 Merging mergePolicy _ _ mr -> do
14251447 r <- expectCompletedMergingRun mr
1426- traceWith tr' MergeCompletedEvent {
1448+ traceWith tr' LevelMergeCompletedEvent {
14271449 mergePolicy,
14281450 mergeType = let MergingRun mt _ _ = mr in mt,
14291451 mergeSize = runSize r
@@ -1436,6 +1458,8 @@ increment tr sc conf run0 ls0 ul = do
14361458 -- If r is still too small for this level then keep it and merge again
14371459 -- with the incoming runs.
14381460 LevelTiering | runTooSmallForLevel LevelTiering conf ln r -> do
1461+ traceWith tr' $ RunTooSmallForLevelEvent LevelTiering r
1462+
14391463 ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) (incoming ++ [r])
14401464 pure (Level ir' rs : ls)
14411465
@@ -1444,29 +1468,37 @@ increment tr sc conf run0 ls0 ul = do
14441468 -- as a bundle and move them down to the level below. We start a merge
14451469 -- for the new incoming runs. This level is otherwise empty.
14461470 LevelTiering | levelIsFullTiering conf ln incoming resident -> do
1471+ traceWith tr' $ LevelIsFullEvent LevelTiering
1472+
14471473 ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
14481474 ls' <- go (ln+ 1 ) resident ls
14491475 pure (Level ir' [] : ls')
14501476
14511477 -- This tiering level is not yet full. We move the completed merged run
14521478 -- into the level proper, and start the new merge for the incoming runs.
14531479 LevelTiering -> do
1480+ traceWith tr' $ LevelIsNotFullEvent LevelTiering
1481+
14541482 ir' <- newLevelMerge tr' conf ln LevelTiering (mergeTypeFor ls) incoming
1455- traceWith tr' (AddRunEvent ( length resident) )
1483+ traceWith tr' (AddRunEvent resident)
14561484 pure (Level ir' resident : ls)
14571485
14581486 -- The final level is using levelling. If the existing completed merge
14591487 -- run is too large for this level, we promote the run to the next
14601488 -- level and start merging the incoming runs into this (otherwise
14611489 -- empty) level .
14621490 LevelLevelling | levelIsFullLevelling conf ln incoming r -> do
1491+ traceWith tr' $ LevelIsFullEvent LevelLevelling
1492+
14631493 assert (null rs && null ls) $ pure ()
14641494 ir' <- newLevelMerge tr' conf ln LevelTiering MergeMidLevel incoming
14651495 ls' <- go (ln+ 1 ) [r] []
14661496 pure (Level ir' [] : ls')
14671497
14681498 -- Otherwise we start merging the incoming runs into the run.
14691499 LevelLevelling -> do
1500+ traceWith tr' $ LevelIsNotFullEvent LevelLevelling
1501+
14701502 assert (null rs && null ls) $ pure ()
14711503 ir' <- newLevelMerge tr' conf ln LevelLevelling (mergeTypeFor ls)
14721504 (incoming ++ [r])
@@ -1479,17 +1511,19 @@ newLevelMerge :: Tracer (ST s) EventDetail
14791511 -> LSMConfig
14801512 -> Int -> MergePolicyForLevel -> LevelMergeType
14811513 -> [Run ] -> ST s (IncomingRun s )
1482- newLevelMerge _ _ _ _ _ [r] = pure (Single r)
1514+ newLevelMerge tr _ _ _ _ [r] = do
1515+ traceWith tr $ NewSingleRunEvent r
1516+ pure (Single r)
14831517newLevelMerge tr conf@ LSMConfig {.. } level mergePolicy mergeType rs = do
1484- assertST (length rs `elem` [configSizeRatio, configSizeRatio + 1 ])
14851518 mergingRun@ (MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
1486- assertWithMsgM $ leq (totalDebt physicalDebt) maxPhysicalDebt
1487- traceWith tr MergeStartedEvent {
1519+ traceWith tr NewLevelMergeEvent {
14881520 mergePolicy,
14891521 mergeType,
1490- mergeDebt = totalDebt physicalDebt,
1491- mergeRunsSize = map runSize rs
1522+ mergeDebt = totalDebt physicalDebt,
1523+ mergeRuns = rs
14921524 }
1525+ assertST (length rs `elem` [configSizeRatio, configSizeRatio + 1 ])
1526+ assertWithMsgM $ leq (totalDebt physicalDebt) maxPhysicalDebt
14931527 nominalCreditVar <- newSTRef (NominalCredit 0 )
14941528 pure (Merging mergePolicy nominalDebt nominalCreditVar mergingRun)
14951529 where
@@ -1766,7 +1800,7 @@ data MTree r = MLeaf r
17661800 deriving stock (Eq , Foldable , Functor , Show )
17671801
17681802allLevels :: LSM s -> ST s (Buffer , [[Run ]], Maybe (MTree Run ))
1769- allLevels (LSMHandle _ _conf lsmr) = do
1803+ allLevels (LSMHandle _ _ _conf lsmr) = do
17701804 LSMContent wb ls ul <- readSTRef lsmr
17711805 rs <- flattenLevels ls
17721806 tree <- case ul of
@@ -1836,7 +1870,7 @@ type LevelRepresentation =
18361870 [Run ])
18371871
18381872dumpRepresentation :: LSM s -> ST s Representation
1839- dumpRepresentation (LSMHandle _ _conf lsmr) = do
1873+ dumpRepresentation (LSMHandle _ _ _conf lsmr) = do
18401874 LSMContent wb ls ul <- readSTRef lsmr
18411875 levels <- mapM dumpLevel ls
18421876 tree <- case ul of
@@ -1877,7 +1911,15 @@ representationShape (wb, levels, tree) =
18771911
18781912-- TODO: these events are incomplete, in particular we should also trace what
18791913-- happens in the union level.
1880- type Event = EventAt EventDetail
1914+ data Event =
1915+ NewTableEvent TableId LSMConfig
1916+ | UpdateEvent TableId Key Entry
1917+ | LookupEvent TableId Key
1918+ | DuplicateEvent TableId TableId
1919+ | UnionsEvent TableId [TableId ]
1920+ | LevelEvent TableId (EventAt EventDetail )
1921+ deriving stock Show
1922+
18811923data EventAt e = EventAt {
18821924 eventAtStep :: Counter ,
18831925 eventAtLevel :: Int ,
@@ -1886,21 +1928,27 @@ data EventAt e = EventAt {
18861928 deriving stock Show
18871929
18881930data EventDetail =
1889- AddLevelEvent
1890- | AddRunEvent {
1891- runsAtLevel :: Int
1892- }
1893- | MergeStartedEvent {
1894- mergePolicy :: MergePolicyForLevel ,
1895- mergeType :: LevelMergeType ,
1896- mergeDebt :: Debt ,
1897- mergeRunsSize :: [Int ]
1898- }
1899- | MergeCompletedEvent {
1900- mergePolicy :: MergePolicyForLevel ,
1901- mergeType :: LevelMergeType ,
1902- mergeSize :: Int
1903- }
1931+ AddLevelEvent
1932+ | AddRunEvent {
1933+ runsAtLevel :: [Run ]
1934+ }
1935+ | NewLevelMergeEvent {
1936+ mergePolicy :: MergePolicyForLevel ,
1937+ mergeType :: LevelMergeType ,
1938+ mergeDebt :: Debt ,
1939+ mergeRuns :: [Run ]
1940+ }
1941+ | NewSingleRunEvent Run
1942+ | LevelMergeCompletedEvent {
1943+ mergePolicy :: MergePolicyForLevel ,
1944+ mergeType :: LevelMergeType ,
1945+ mergeSize :: Int
1946+ }
1947+ | SingleRunCompletedEvent Run
1948+
1949+ | RunTooSmallForLevelEvent MergePolicyForLevel Run
1950+ | LevelIsFullEvent MergePolicyForLevel
1951+ | LevelIsNotFullEvent MergePolicyForLevel
19041952 deriving stock Show
19051953
19061954-------------------------------------------------------------------------------
0 commit comments