Skip to content

Commit ff015c7

Browse files
committed
wip
1 parent 02aaaca commit ff015c7

File tree

9 files changed

+217
-12
lines changed

9 files changed

+217
-12
lines changed

cardano-db-sync/cardano-db-sync.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ library
135135
Cardano.DbSync.Threads.Database
136136
Cardano.DbSync.Threads.EpochStake
137137
Cardano.DbSync.Threads.Ledger
138+
Cardano.DbSync.Threads.Rewards
138139
Cardano.DbSync.Threads.Stake
139140
Cardano.DbSync.Threads.TxInResolve
140141
Cardano.DbSync.Tracing.ToObjectOrphans

cardano-db-sync/src/Cardano/DbSync/Config.hs

-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ module Cardano.DbSync.Config (
1515
SyncProtocol (..),
1616
SyncNodeConfig (..),
1717
SyncNodeParams (..),
18-
cardanoLedgerConfig,
1918
genesisProtocolMagicId,
2019
readCardanoGenesisConfig,
2120
readSyncNodeConfig,

cardano-db-sync/src/Cardano/DbSync/Config/Cardano.hs

-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
module Cardano.DbSync.Config.Cardano (
99
GenesisConfig (..),
10-
cardanoLedgerConfig,
1110
genesisProtocolMagicId,
1211
mkTopLevelConfig,
1312
mkProtocolInfoCardano,
@@ -35,7 +34,6 @@ import Ouroboros.Consensus.Cardano (Nonce (..), ProtVer (ProtVer))
3534
import qualified Ouroboros.Consensus.Cardano as Consensus
3635
import Ouroboros.Consensus.Cardano.Node
3736
import Ouroboros.Consensus.Config (TopLevelConfig (..), emptyCheckpointsMap)
38-
import Ouroboros.Consensus.Ledger.Basics (LedgerConfig)
3937
import Ouroboros.Consensus.Node.ProtocolInfo (ProtocolInfo)
4038
import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus
4139
import Ouroboros.Consensus.Shelley.Eras (StandardCrypto)
@@ -73,9 +71,6 @@ readCardanoGenesisConfig enc =
7371

7472
-- -------------------------------------------------------------------------------------------------
7573

76-
cardanoLedgerConfig :: GenesisConfig -> LedgerConfig CardanoBlock
77-
cardanoLedgerConfig = topLevelConfigLedger . mkTopLevelConfig
78-
7974
mkTopLevelConfig :: GenesisConfig -> TopLevelConfig CardanoBlock
8075
mkTopLevelConfig cfg = Consensus.pInfoConfig $ fst $ mkProtocolInfoCardano cfg []
8176

cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Generic/Rewards.hs

+45
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Cardano.DbSync.Era.Shelley.Generic.Rewards (
1010
RewardRests (..),
1111
rewardsCount,
1212
rewardsTotalAda,
13+
getRewardsUpdate,
1314
) where
1415

1516
import Cardano.Db (Ada, RewardSource (..), word64ToAda)
@@ -19,6 +20,19 @@ import Cardano.Prelude
1920
import qualified Data.Map.Strict as Map
2021
import qualified Data.Set as Set
2122
import Ouroboros.Consensus.Cardano.CanHardFork ()
23+
import Cardano.Ledger.BaseTypes (strictMaybeToMaybe)
24+
import Cardano.Ledger.Shelley.LedgerState hiding (LedgerState)
25+
import Data.SOP.Strict.NP
26+
import Ouroboros.Consensus.Shelley.Ledger.Ledger
27+
import Ouroboros.Consensus.Shelley.Ledger (ShelleyBlock)
28+
import Ouroboros.Consensus.Config (TopLevelConfig (..))
29+
import Cardano.Ledger.Crypto (StandardCrypto)
30+
import Ouroboros.Consensus.Cardano.Block (LedgerState (..), EraCrypto)
31+
import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..))
32+
import Ouroboros.Consensus.HardFork.Combinator.PartialConfig
33+
import Ouroboros.Consensus.Shelley.ShelleyHFC
34+
import Ouroboros.Consensus.HardFork.Combinator.Basics
35+
import Ouroboros.Consensus.HardFork.Combinator.AcrossEras
2236

2337
data Reward = Reward
2438
{ rewardSource :: !RewardSource
@@ -53,3 +67,34 @@ rewardsTotalAda rwds =
5367
. sum
5468
. concatMap (map (unCoin . rewardAmount) . Set.toList)
5569
$ Map.elems (unRewards rwds)
70+
71+
getRewardsUpdate :: TopLevelConfig CardanoBlock -> ExtLedgerState CardanoBlock -> Maybe (RewardUpdate StandardCrypto)
72+
getRewardsUpdate cfg els =
73+
case ledgerState els of
74+
LedgerStateByron _ -> Nothing
75+
LedgerStateShelley sls -> genericRewardUpdate cfg sls
76+
LedgerStateAllegra als -> genericRewardUpdate cfg als
77+
LedgerStateMary mls -> genericRewardUpdate cfg mls
78+
LedgerStateAlonzo als -> genericRewardUpdate cfg als
79+
LedgerStateBabbage bls -> genericRewardUpdate cfg bls
80+
LedgerStateConway cls -> genericRewardUpdate cfg cls
81+
82+
genericRewardUpdate ::
83+
forall era p.
84+
(EraCrypto era ~ StandardCrypto) =>
85+
TopLevelConfig CardanoBlock ->
86+
LedgerState (ShelleyBlock p era) ->
87+
Maybe (RewardUpdate StandardCrypto)
88+
genericRewardUpdate cfg lstate = do
89+
pulsing <- strictMaybeToMaybe mPulsing
90+
case pulsing of
91+
Complete _ -> Nothing
92+
Pulsing _ _ -> do
93+
let Identity (rewardUpdate, _) = runReaderT (completeRupd pulsing) globals
94+
Just rewardUpdate
95+
96+
where
97+
mPulsing = nesRu $ shelleyLedgerState lstate
98+
99+
globals = case getPerEraLedgerConfig $ hardForkLedgerConfigPerEra $ topLevelConfigLedger cfg of
100+
_ :* wplc :* _ -> shelleyLedgerGlobals $ shelleyLedgerConfig $ unwrapPartialLedgerConfig wplc

cardano-db-sync/src/Cardano/DbSync/Ledger/Async.hs

+52-5
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,20 @@
22

33
module Cardano.DbSync.Ledger.Async where
44

5+
import Cardano.DbSync.Types
6+
import Data.Set (Set)
7+
import Data.Map (Map)
58
import Cardano.DbSync.Ledger.Types
69
import Cardano.Ledger.BaseTypes (EpochNo)
710
import Cardano.Ledger.Crypto (StandardCrypto)
811
import qualified Cardano.Ledger.EpochBoundary as Ledger
912
import Control.Concurrent.Class.MonadSTM.Strict
1013
import qualified Control.Concurrent.STM.TBQueue as TBQ
14+
import qualified Cardano.Ledger.Rewards as Ledger
15+
16+
--------------------------------------------------------------------------------
17+
-- EpochStake
18+
--------------------------------------------------------------------------------
1119

1220
newEpochStakeChannels :: IO EpochStakeChannels
1321
newEpochStakeChannels =
@@ -18,9 +26,9 @@ newEpochStakeChannels =
1826
<*> newTVarIO Nothing
1927

2028
-- To be used by the main thread
21-
ensureEpochDone :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> IO ()
22-
ensureEpochDone sQueue epoch snapshot = atomically $ do
23-
mLastEpochDone <- waitFinished sQueue
29+
ensureStakeDone :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot StandardCrypto -> IO ()
30+
ensureStakeDone sQueue epoch snapshot = atomically $ do
31+
mLastEpochDone <- waitStakeFinished sQueue
2432
case mLastEpochDone of
2533
Just lastEpochDone | lastEpochDone == epoch -> pure ()
2634
_ -> do
@@ -29,8 +37,8 @@ ensureEpochDone sQueue epoch snapshot = atomically $ do
2937
retry
3038

3139
-- To be used by the main thread
32-
waitFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo)
33-
waitFinished sQueue = do
40+
waitStakeFinished :: EpochStakeChannels -> STM IO (Maybe EpochNo)
41+
waitStakeFinished sQueue = do
3442
stakeThreadState <- readTVar (epochResult sQueue)
3543
case stakeThreadState of
3644
Just (lastEpoch, Done) -> pure $ Just lastEpoch -- Normal case
@@ -42,3 +50,42 @@ writeEpochStakeAction :: EpochStakeChannels -> EpochNo -> Ledger.SnapShot Standa
4250
writeEpochStakeAction sQueue epoch snapShot checkFirst = do
4351
TBQ.writeTBQueue (estakeQueue sQueue) $ EpochStakeDBAction epoch snapShot checkFirst
4452
writeTVar (epochResult sQueue) $ Just (epoch, Running)
53+
54+
55+
--------------------------------------------------------------------------------
56+
-- Rewards
57+
--------------------------------------------------------------------------------
58+
59+
newRewardsChannels :: IO RewardsChannels
60+
newRewardsChannels =
61+
-- This may never be more than 1. But let's keep it a queue for extensibility shake.
62+
-- This may allow us to parallelize the events workload even further
63+
RewardsChannels
64+
<$> TBQ.newTBQueueIO 1
65+
<*> newTVarIO Nothing
66+
67+
-- To be used by the main thread
68+
ensureRewardsDone :: RewardsChannels -> EpochNo -> EpochNo -> Map StakeCred (Set (Ledger.Reward StandardCrypto)) -> IO ()
69+
ensureRewardsDone sQueue epoch epoch' mp = atomically $ do
70+
mLastEpochDone <- waitRewardsFinished sQueue
71+
case mLastEpochDone of
72+
Just lastEpochDone | lastEpochDone == epoch -> pure ()
73+
_ -> do
74+
-- If last is not already there, put it to list and wait again
75+
writeRewardsAction sQueue epoch epoch' mp True
76+
retry
77+
78+
-- To be used by the main thread
79+
waitRewardsFinished :: RewardsChannels -> STM IO (Maybe EpochNo)
80+
waitRewardsFinished sQueue = do
81+
rewardsThreadState <- readTVar (rewardsResult sQueue)
82+
case rewardsThreadState of
83+
Just (lastEpoch, Done) -> pure $ Just lastEpoch -- Normal case
84+
Just (_, Running) -> retry -- Wait to finish current work.
85+
Nothing -> pure Nothing -- This will happen after a restart
86+
87+
-- To be used by the main thread
88+
writeRewardsAction :: RewardsChannels -> EpochNo -> EpochNo -> Map StakeCred (Set (Ledger.Reward StandardCrypto)) -> Bool -> STM IO ()
89+
writeRewardsAction sQueue epoch epoch' mp checkFirst = do
90+
TBQ.writeTBQueue (rQueue sQueue) $ RewardsDBAction epoch epoch' mp checkFirst
91+
writeTVar (rewardsResult sQueue) $ Just (epoch, Running)

cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs

+11-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ import System.Directory (doesFileExist, listDirectory, removeFile)
133133
import System.FilePath (dropExtension, takeExtension, (</>))
134134
import System.Mem (performMajorGC)
135135
import Prelude (String, id)
136+
import Cardano.Ledger.Shelley.RewardUpdate
137+
import Control.Monad.Extra (whenJust)
136138

137139
-- Note: The decision on whether a ledger-state is written to disk is based on the block number
138140
-- rather than the slot number because while the block number is fully populated (for every block
@@ -176,6 +178,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
176178
intervar <- newTVarIO Strict.Nothing
177179
swQueue <- newTBQueueIO 5 -- Should be relatively shallow.
178180
stakeChans <- newEpochStakeChannels
181+
rewardsChans <- newRewardsChannels
179182
applyQueue <- newTBQueueIO 10
180183
pure
181184
HasLedgerEnv
@@ -194,6 +197,7 @@ mkHasLedgerEnv trce protoInfo dir nw systemStart syncOptions = do
194197
, leStateWriteQueue = swQueue
195198
, leApplyQueue = applyQueue
196199
, leEpochStakeChans = stakeChans
200+
, leRewardsChans = rewardsChans
197201
}
198202

199203
initCardanoLedgerState :: Consensus.ProtocolInfo CardanoBlock -> CardanoLedgerState
@@ -241,6 +245,7 @@ applyBlock env blk = do
241245
let !newEpochBlockNo = applyToEpochBlockNo (isJust $ blockIsEBB blk) (isJust newEpoch) (clsEpochBlockNo oldState)
242246
let !newState = CardanoLedgerState newLedgerState newEpochBlockNo
243247
asyncWriteStakeSnapShot env oldState newState
248+
asyncWriteRewards env newState (sdEpochNo details)
244249
let !ledgerDB' = pushLedgerDB ledgerDB newState
245250
atomically $ writeTVar (leStateVar env) (Strict.Just ledgerDB')
246251
let !appResult =
@@ -321,9 +326,14 @@ asyncWriteStakeSnapShot env oldState newState =
321326
EpochBlockNo n
322327
| n == 0
323328
, Just (snapshot, epoch) <- Generic.getSnapShot (clsState oldState) -> do
324-
ensureEpochDone (leEpochStakeChans env) epoch snapshot
329+
ensureStakeDone (leEpochStakeChans env) epoch snapshot
325330
_ -> pure ()
326331

332+
asyncWriteRewards :: HasLedgerEnv -> CardanoLedgerState -> EpochNo -> IO ()
333+
asyncWriteRewards env newState e =
334+
whenJust (Generic.getRewardsUpdate (getTopLevelconfigHasLedger env) (clsState newState)) $ \ru -> do
335+
atomically $ writeRewardsAction (leRewardsChans env) e e (rs ru) True -- (e-1) (e+1)
336+
327337
getSliceMeta :: Generic.StakeSliceRes -> Maybe (Bool, EpochNo)
328338
getSliceMeta (Generic.Slice (Generic.StakeSlice epochNo _) isFinal) = Just (isFinal, epochNo)
329339
getSliceMeta _ = Nothing

cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs

+16
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Cardano.DbSync.Types (
2121
CardanoPoint,
2222
PoolKeyHash,
2323
SlotDetails,
24+
StakeCred,
2425
)
2526
import Cardano.Ledger.Alonzo.Scripts (Prices)
2627
import qualified Cardano.Ledger.BaseTypes as Ledger
@@ -55,6 +56,7 @@ import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus
5556
import Ouroboros.Consensus.Shelley.Ledger (LedgerState (..), ShelleyBlock)
5657
import Ouroboros.Network.AnchoredSeq (Anchorable (..), AnchoredSeq (..))
5758
import Prelude (fail, id)
59+
import qualified Cardano.Ledger.Rewards as Ledger
5860

5961
--------------------------------------------------------------------------
6062
-- Ledger Types
@@ -76,6 +78,7 @@ data HasLedgerEnv = HasLedgerEnv
7678
, leStateWriteQueue :: !(TBQueue (FilePath, CardanoLedgerState))
7779
, leApplyQueue :: TBQueue LedgerAction
7880
, leEpochStakeChans :: EpochStakeChannels
81+
, leRewardsChans :: RewardsChannels
7982
}
8083

8184
data CardanoLedgerState = CardanoLedgerState
@@ -213,6 +216,19 @@ data EpochStakeChannels = EpochStakeChannels
213216
, epochResult :: StrictTVar IO (Maybe (EpochNo, EpochState))
214217
}
215218

219+
data EpochRewardState = RewRunning | Inserted | Cleaned
220+
data RewardsDBAction = RewardsDBAction
221+
{ raEpochNo :: EpochNo
222+
, raEpochNo' :: EpochNo
223+
, raSnapShot :: Map StakeCred (Set (Ledger.Reward StandardCrypto))
224+
, raCheckFirst :: Bool -- Check if the data is already there before inserting
225+
}
226+
227+
data RewardsChannels = RewardsChannels
228+
{ rQueue :: TBQueue RewardsDBAction
229+
, rewardsResult :: StrictTVar IO (Maybe (EpochNo, EpochState))
230+
}
231+
216232
-- | Per-era pure getters and setters on @NewEpochState@. Note this is a bit of an abuse
217233
-- of the cardano-ledger/ouroboros-consensus public APIs, because ledger state is not
218234
-- designed to be updated this way. We are only replaying the chain, so this should be
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
module Cardano.DbSync.Prepare.Tx.Types where
2+
3+
data Tx = Tx
4+
{ tx :: DB.Tx
5+
, txCbor :: Maybe DB.TxCbor
6+
, txGrouped :: Grouped
7+
}
8+
9+
data Grouped = Grouped
10+
{ txInputs :: [DB.TxIn]
11+
, txOutputs :: [DB.TxOutW]
12+
, txMetadata :: [DB.TxMetadata]
13+
, txMint :: [DB.MaTxMint]
14+
, txRedeemer :: [DB.Redeemer]
15+
, txCollateralInputs :: [DB.CollateralTxIn]
16+
, txReferenceInputs :: [DB.ReferenceTxIn]
17+
, txCollateralOutputs :: [DB.CollateralTxOut]
18+
, txCertificates :: [DBTxCertificate]
19+
, txWithdrawals :: [DB.Withdrawal]
20+
, txParamProposal :: [DB.ParamProposal]
21+
, txExtraKeyWitness :: [DB.ExtraKeyWitness]
22+
}
23+
24+
data DBTxCertificate =
25+
StakeRegistration DB.StakeRegistration
26+
| StakeDeregistration DB.StakeDeregistration
27+
| Delegation DB.Delegation
28+
| PoolRegister DB.PoolMetadataRef DB.PoolUpdate [DB.PoolOwner] [DB.PoolRelay]
29+
| PoolRetire DB.PoolRetire
30+
| MirCert MirCert
31+
| DrepRegistration (Maybe DB.VotingAnchor) DB.DrepRegistration
32+
| DrepRegistration DB.DrepRegistration
33+
| CommitteeRegistration DB.CommitteeRegistration
34+
| CommitteeDeRegistration (Maybe DB.VotingAnchor) DB.CommitteeDeRegistration
35+
36+
data MirCert = Reserve DB.Reserve | Treasury DB.Treasury | PotTransfer DB.PotTransfer
37+
38+
data GovProposal = GovProposal DB.VotingAnchor DB.ParamProposal
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
4+
module Cardano.DbSync.Threads.Rewards where
5+
6+
import Cardano.BM.Trace (logInfo)
7+
import qualified Cardano.Db as DB
8+
import Cardano.DbSync.Api
9+
import Cardano.DbSync.Api.Types
10+
import Cardano.DbSync.Era.Shelley.Generic.StakeDist
11+
import Cardano.DbSync.Era.Universal.Epoch
12+
import Cardano.DbSync.Error
13+
import Cardano.DbSync.Ledger.Types
14+
import Cardano.DbSync.Util
15+
import Cardano.Slotting.Slot (EpochNo (..))
16+
import Control.Concurrent.Class.MonadSTM.Strict
17+
import qualified Control.Concurrent.STM.TBQueue as TBQ
18+
import Control.Monad
19+
import Control.Monad.IO.Class (liftIO)
20+
import Control.Monad.Trans.Class
21+
import Control.Monad.Trans.Except.Extra (runExceptT)
22+
import Database.Persist.Postgresql (IsolationLevel (..), runSqlConnWithIsolation, withPostgresqlConn)
23+
24+
runRewardsThread ::
25+
SyncEnv ->
26+
IO ()
27+
runRewardsThread syncEnv =
28+
case envLedgerEnv syncEnv of
29+
NoLedger _ -> pure ()
30+
HasLedger le -> do
31+
logInfo trce "Running Rewards thread"
32+
logException trce "runRewardsThread: " (runRewLoop syncEnv le)
33+
logInfo trce "Shutting Rewards thread"
34+
where
35+
trce = getTrace syncEnv
36+
37+
runRewLoop :: SyncEnv -> HasLedgerEnv -> IO ()
38+
runRewLoop syncEnv lenv =
39+
DB.runIohkLogging trce $
40+
withPostgresqlConn (envConnectionString syncEnv) loop
41+
where
42+
loop backend = do
43+
runOrThrowIO $ runSqlConnWithIsolation (runExceptT loopAction) backend Serializable
44+
loop backend
45+
46+
loopAction = do
47+
rAction <- liftIO $ atomically $ TBQ.readTBQueue (rQueue rewardsChan)
48+
case rAction of
49+
RewardsDBAction epoch epoch' mp shouldCheck -> do
50+
insertRewards syncEnv epoch epoch' (toList $ convertPoolRewards mp)
51+
liftIO $ atomically $ writeTVar (rewardsResult rewardsChan) $ Just (epoch', Done)
52+
53+
rewardsChan = leRewardsChans lenv
54+
trce = getTrace syncEnv

0 commit comments

Comments
 (0)