Skip to content

Commit d5ba37b

Browse files
committed
add read bench
1 parent bc31f85 commit d5ba37b

File tree

3 files changed

+192
-0
lines changed

3 files changed

+192
-0
lines changed

hie.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,5 +315,8 @@ cradle:
315315
- path: "hstream-store/app/writeBench.hs"
316316
component: "hstream-store:exe:hstore-bench-writter"
317317

318+
- path: "hstream-store/app/readBench.hs"
319+
component: "hstream-store:exe:hstore-bench-reader"
320+
318321
- path: "hstream-gossip/src"
319322
component: "lib:hstream-gossip"

hstream-store/app/readBench.hs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
{-# LANGUAGE BangPatterns #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
{-# LANGUAGE RecordWildCards #-}
4+
{-# LANGUAGE TypeApplications #-}
5+
6+
import Control.Concurrent (MVar, modifyMVar_, newMVar, readMVar,
7+
threadDelay)
8+
import Control.Concurrent.Async (async, cancel, forConcurrently_)
9+
import Control.Monad (forM_, forever)
10+
import Data.ByteString (ByteString)
11+
import qualified Data.ByteString as BS
12+
import Data.Int (Int64)
13+
import GHC.Stack (HasCallStack)
14+
import qualified HStream.Logger as Log
15+
import HStream.Store (C_LogID, LDReader, newLDClient,
16+
newLDReader, readerReadAllowGap,
17+
readerStartReading)
18+
import qualified HStream.Store as HS
19+
import qualified HStream.Store.Logger as Log
20+
import HStream.Utils (getPOSIXTime)
21+
import Options.Applicative
22+
import Z.Data.CBytes (CBytes)
23+
24+
data SuccessReads = SuccessReads
25+
{ readSize :: Int64
26+
, msgCount :: Int64
27+
} deriving (Show)
28+
29+
mkSuccessReads :: SuccessReads
30+
mkSuccessReads = SuccessReads {readSize=0, msgCount=0}
31+
32+
recordSuccessRead :: SuccessReads -> Int64 -> Int64 -> SuccessReads
33+
recordSuccessRead sc@SuccessReads{..} msgCnt size = sc {readSize = readSize + size, msgCount = msgCount + msgCnt}
34+
35+
readBench :: HasCallStack => ReadConfig -> IO ()
36+
readBench cfg@ReadConfig{..} = do
37+
Log.info $ "read bench config: " <> Log.build (show cfg)
38+
let finalThreads = min threadCount readerCount
39+
logs = [from..to]
40+
chunkSize = length logs `div` finalThreads
41+
logsPerThreads = chunk chunkSize [from..to]
42+
successReads <- newMVar mkSuccessReads
43+
44+
Log.info "------ perf start ------"
45+
printProc <- async $ printStats successReads (reportInterval * 1000000)
46+
ldClient <- newLDClient configPath
47+
forConcurrently_ logsPerThreads $ \logIds -> do
48+
reader <- newLDReader ldClient (fromIntegral . length $ logIds) (Just readBufferSize)
49+
doRead successReads reader logIds maxLog
50+
51+
cancel printProc
52+
53+
doRead :: HasCallStack => MVar SuccessReads -> LDReader-> [C_LogID] -> Int -> IO ()
54+
doRead suc reader logs maxLogs = do
55+
Log.info $ "reader begin to read logs: [" <> Log.build (show logs) <> "]"
56+
forM_ logs $ \log -> readerStartReading reader log HS.LSN_MIN HS.LSN_MAX
57+
58+
forever $ do
59+
res <- readerReadAllowGap @ByteString reader maxLogs
60+
readSuccessRecords suc res
61+
where
62+
readSuccessRecords :: MVar SuccessReads -> Either HS.GapRecord [HS.DataRecord ByteString] -> IO ()
63+
readSuccessRecords _ (Left _gap) = do
64+
-- Log.info $ "reader meet gap: " <> Log.buildString (show gap)
65+
return ()
66+
readSuccessRecords sc (Right dataRecords) = do
67+
let size = sum $ map (BS.length . HS.recordPayload) dataRecords
68+
msgCnt = length dataRecords
69+
-- Log.info $ "reader read " <> Log.build msgCnt <> " records, total size: " <> Log.build size
70+
modifyMVar_ sc $ \sc' -> return $ recordSuccessRead sc' (fromIntegral msgCnt) (fromIntegral size)
71+
72+
printStats :: MVar SuccessReads -> Int -> IO ()
73+
printStats readst interval = do
74+
Log.info "start printStats thread."
75+
curr <- getPOSIXTime
76+
printStats' 0 0 curr
77+
where
78+
printStats' lastCnt lastRead lastTmp = do
79+
threadDelay interval
80+
curr <- getPOSIXTime
81+
SuccessReads{..} <- readMVar readst
82+
let elapsed = floor . (* 1e3) $ (curr - lastTmp)
83+
let messages = (fromIntegral (msgCount - lastCnt) :: Double) * 1000 / fromIntegral elapsed
84+
throughput = (fromIntegral (readSize - lastRead) :: Double) * 1000 / 1024 / 1024 / fromIntegral elapsed
85+
Log.info $ "[Read]: " <> Log.build messages <> " record/s"
86+
<> ", throughput: " <> Log.build throughput <> " MB/s"
87+
<> ", messages: " <> Log.build (msgCount - lastCnt)
88+
<> ", elapsed: " <> Log.buildString' elapsed
89+
printStats' msgCount readSize curr
90+
91+
chunk :: Int -> [a] -> [[a]]
92+
chunk _ [] = []
93+
chunk n xs = go xs
94+
where
95+
go [] = []
96+
go !ys = let (subLs, rest) = splitAt n ys
97+
in subLs : go rest
98+
99+
data ReadConfig = ReadConfig
100+
{ configPath :: CBytes
101+
, from :: C_LogID
102+
, to :: C_LogID
103+
, threadCount :: Int
104+
, readerCount :: Int
105+
, readBufferSize :: Int64
106+
, maxLog :: Int
107+
, reportInterval :: Int
108+
} deriving (Show)
109+
110+
parseConfig :: Parser ReadConfig
111+
parseConfig = ReadConfig
112+
<$> strOption ( long "path"
113+
<> metavar "PATH"
114+
<> showDefault
115+
<> value "/data/store/logdevice.conf"
116+
<> help "Specify the path of LogDevice configuration file."
117+
)
118+
<*> option auto ( long "from"
119+
<> metavar "INT"
120+
<> help "Start logId."
121+
)
122+
<*> option auto ( long "to"
123+
<> metavar "INT"
124+
<> help "End logId"
125+
)
126+
<*> option auto ( long "thread-count"
127+
<> metavar "INT"
128+
<> showDefault
129+
<> value 1
130+
<> help "Number of threads to run readers."
131+
)
132+
<*> option auto ( long "reader-count"
133+
<> metavar "INT"
134+
<> showDefault
135+
<> value 1
136+
<> help "Number of readers to subscribe stream."
137+
)
138+
<*> option auto ( long "read-buffer-size"
139+
<> metavar "INT"
140+
<> showDefault
141+
<> value 10
142+
<> help "reader read buffer size."
143+
)
144+
<*> option auto ( long "max-log"
145+
<> metavar "INT"
146+
<> showDefault
147+
<> value 100
148+
<> help "reader read buffer size."
149+
)
150+
<*> option auto ( long "interval"
151+
<> metavar "INT"
152+
<> showDefault
153+
<> value 3
154+
<> help "Display period of statistical information in seconds."
155+
)
156+
157+
newtype RBenchCmd = ReadBench ReadConfig
158+
159+
commandParser :: Parser RBenchCmd
160+
commandParser = hsubparser
161+
( command "readBench" (info (ReadBench <$> parseConfig) (progDesc "Read bench command.")) )
162+
163+
runCommand :: RBenchCmd -> IO()
164+
runCommand (ReadBench opts) = readBench opts
165+
166+
main :: IO ()
167+
main = do
168+
Log.setLogDeviceDbgLevel Log.C_DBG_WARNING
169+
runCommand =<< customExecParser (prefs showHelpOnEmpty) opts
170+
where
171+
opts = info (helper <*> commandParser) (fullDesc <> progDesc "HStore-Read-Bench-Tool")

hstream-store/hstream-store.cabal

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,21 @@ executable hstore-bench-writter
270270

271271
default-language: Haskell2010
272272
ghc-options: -threaded -rtsopts -with-rtsopts=-N
273+
274+
executable hstore-bench-reader
275+
import: shared-properties
276+
main-is: readBench.hs
277+
hs-source-dirs: app
278+
build-depends:
279+
, base
280+
, hstream-store
281+
, bytestring
282+
, optparse-applicative
283+
, stm
284+
, hstream-common
285+
, hstream-common-base
286+
, Z-Data
287+
, async
288+
289+
default-language: Haskell2010
290+
ghc-options: -threaded -rtsopts "-with-rtsopts=-N -A64m -n4m -qg -qn1"

0 commit comments

Comments
 (0)