Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Changed

- [BREAKING] Output of marlowe-finder has changed. It now outputs JSON objects with the following event types:
- `"NewBlock"` when a new block in the chain is encountered.
- `"NewContract"` when a new contract is found.
- `"InputsApplied"` when inputs are applied to a contract.
- `"PayoutsWithdrawn"` when role payouts are withdrawn from a payout validator.
- Performance of marlowe-finder improved significantly.
148 changes: 84 additions & 64 deletions marlowe-apps/finder/Main.hs
Original file line number Diff line number Diff line change
@@ -1,76 +1,106 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}

module Main (
main,
) where

import Data.Text (Text)
import Language.Marlowe.Runtime.App.Parser (getConfigParser)
import Language.Marlowe.Runtime.App.Stream (ContractStream (..), TChanEOF)
import Language.Marlowe.Runtime.App.Types (
Config,
FinishOnClose (FinishOnClose),
FinishOnWait (FinishOnWait),
PollingFrequency (PollingFrequency),
FinishOnWait (..),
)
import Language.Marlowe.Runtime.Core.Api (ContractId, MarloweVersionTag (V1))
import Observe.Event (EventBackend, addField)
import Observe.Event.Dynamic (DynamicEventSelector (..))
import Observe.Event.Render.JSON (DefaultRenderSelectorJSON (defaultRenderSelectorJSON))
import Observe.Event.Render.JSON.Handle (simpleJsonStderrBackend)
import Observe.Event.Syntax ((≔))

import Data.Time.Units (Second)
import Language.Marlowe.Runtime.App.Channel (RequeueFrequency (RequeueFrequency))
import qualified Language.Marlowe.Runtime.App.Channel as App (
LastSeen (..),
runContractAction,
runDetection,
runDiscovery',
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Aeson (ToJSON (..))
import Data.Foldable (fold, for_)
import qualified Data.Map as Map
import Data.Time.Units (Second, TimeUnit (toMicroseconds))
import Language.Marlowe.Protocol.BulkSync.Client hiding (runMarloweBulkSyncClient)
import Language.Marlowe.Runtime.App.Run (runClientWithConfig)
import Language.Marlowe.Runtime.ChainSync.Api (BlockHeader (..), TxOutRef (..))
import Language.Marlowe.Runtime.Client (runMarloweBulkSyncClient)
import Language.Marlowe.Runtime.Core.Api (MarloweVersion (..), Transaction (..))
import Language.Marlowe.Runtime.History.Api (
CreateStep (..),
MarloweApplyInputsTransaction (..),
MarloweBlock (..),
MarloweCreateTransaction (..),
MarloweWithdrawTransaction (..),
SomeCreateStep (..),
)
import Observe.Event.Dynamic
import Observe.Event.Explicit (addField, withEvent)
import Observe.Event.Render.JSON (DefaultRenderSelectorJSON (defaultRenderSelectorJSON), SomeJSONException)
import Observe.Event.Render.JSON.Handle (jsonHandleBackend)
import qualified Options.Applicative as O

runDetection
:: EventBackend IO r DynamicEventSelector
-> Config
-> PollingFrequency
-> TChanEOF ContractId
-> IO (TChanEOF (ContractStream 'V1))
runDetection eventBackend config pollingFrequency = do
let finishOnWait = FinishOnWait True
finishOnClose = FinishOnClose True
App.runDetection (const True) eventBackend config pollingFrequency finishOnClose finishOnWait

runFinder
:: EventBackend IO r DynamicEventSelector
-> RequeueFrequency
-> FinishOnWait
-> TChanEOF (ContractStream 'V1)
-> TChanEOF ContractId
-> IO ()
runFinder eventBackend =
App.runContractAction "FinderProcess" eventBackend $
\event App.LastSeen{..} ->
addField event $ ("transactionId" :: Text) ≔ lastTxId
import System.IO (stdout)

main :: IO ()
main =
do
Command{..} <- O.execParser =<< commandParser
let pollingFrequency' = PollingFrequency pollingFrequency
requeueFrequency' = RequeueFrequency requeueFrequency
eventBackend <- simpleJsonStderrBackend defaultRenderSelectorJSON
discoveryChannel <- App.runDiscovery' eventBackend config pollingFrequency' endOnWait
detectionChannel <- runDetection eventBackend config pollingFrequency' discoveryChannel
runFinder eventBackend requeueFrequency' endOnWait detectionChannel discoveryChannel
main = commandParser >>= O.execParser >>= runFinder

runFinder :: Command -> IO ()
runFinder Command{..} = do
backend <- jsonHandleBackend stdout (toJSON @SomeJSONException) defaultRenderSelectorJSON
let idle = SendMsgRequestNext 255 next
next =
ClientStNext
{ recvMsgRollForward = \blocks tip -> do
for_ blocks \MarloweBlock{..} -> liftIO do
let BlockHeader{..} = blockHeader
withEvent backend (DynamicEventSelector "NewBlock") \ev -> do
addField ev $ DynamicField "blockNo" $ toJSON blockNo
addField ev $ DynamicField "slotNo" $ toJSON slotNo
addField ev $ DynamicField "blockHeaderHash" $ toJSON headerHash
addField ev $ DynamicField "tip" $ toJSON tip
for_ createTransactions \MarloweCreateTransaction{..} ->
for_ @_ @_ @_ @() (Map.toAscList newContracts) \(txIx, SomeCreateStep MarloweV1 CreateStep{..}) ->
withEvent backend (DynamicEventSelector "NewContract") \ev -> do
addField ev $ DynamicField "contractId" $ toJSON $ TxOutRef txId txIx
addField ev $ DynamicField "marloweVersion" $ toJSON MarloweV1
addField ev $ DynamicField "marloweOutput" $ toJSON createOutput
addField ev $ DynamicField "metadata" $ toJSON metadata
addField ev $ DynamicField "payoutValidatorHash" $ toJSON payoutValidatorHash
for_ @_ @_ @_ @() applyInputsTransactions \MarloweApplyInputsTransaction{..} -> do
MarloweV1 <- pure marloweVersion
let Transaction{blockHeader = _, ..} = marloweTransaction
withEvent backend (DynamicEventSelector "InputsApplied") \ev -> do
addField ev $ DynamicField "contractId" $ toJSON contractId
addField ev $ DynamicField "marloweVersion" $ toJSON MarloweV1
addField ev $ DynamicField "transactionId" $ toJSON transactionId
addField ev $ DynamicField "metadata" $ toJSON metadata
addField ev $ DynamicField "validityLowerBound" $ toJSON validityLowerBound
addField ev $ DynamicField "validityUpperBound" $ toJSON validityUpperBound
addField ev $ DynamicField "previousOutput" $ toJSON marloweInput
addField ev $ DynamicField "inputs" $ toJSON inputs
addField ev $ DynamicField "output" $ toJSON output
for_ withdrawTransactions \MarloweWithdrawTransaction{..} ->
withEvent backend (DynamicEventSelector "PayoutsWithdrawn") \ev -> do
addField ev $ DynamicField "transactionId" $ toJSON consumingTx
addField ev $ DynamicField "payoutsWithdrawn" $ toJSON $ fold consumedPayouts
pure idle
, recvMsgRollBackward = \point tip -> liftIO do
withEvent backend (DynamicEventSelector "RolledBack") \ev -> do
addField ev $ DynamicField "newPoint" $ toJSON point
addField ev $ DynamicField "tip" $ toJSON tip
pure idle
, recvMsgWait =
if unFinishOnWait endOnWait
then pure $ SendMsgCancel $ SendMsgDone ()
else liftIO do
threadDelay $ fromIntegral $ toMicroseconds pollingFrequency
pure $ SendMsgPoll next
}
runClientWithConfig config $ runMarloweBulkSyncClient $ MarloweBulkSyncClient $ pure idle

data Command = Command
{ config :: Config
, pollingFrequency :: Second
, requeueFrequency :: Second
, endOnWait :: FinishOnWait
}
deriving (Show)
Expand All @@ -88,24 +118,14 @@ commandParser =
O.auto
(O.long "polling" <> O.value 5 <> O.metavar "SECONDS" <> O.help "The polling frequency for waiting on Marlowe Runtime.")
)
<*> fmap
fromInteger
( O.option
O.auto
( O.long "requeue"
<> O.value 20
<> O.metavar "SECONDS"
<> O.help "The requeuing frequency for reviewing the progress of contracts on Marlowe Runtime."
)
)
<*> O.flag
(FinishOnWait False)
(FinishOnWait True)
(O.long "end-at-tip" <> O.help "Stop the process when the tip of all contracts has been reached.")
pure $
O.info
(O.helper {- <*> O.versionOption -} <*> commandOptions)
(O.helper <*> commandOptions)
( O.fullDesc
<> O.progDesc "This command-line tool watches the blockchain for Marlowe contracts for active Marlowe contracts."
<> O.header "marlowe-finder : find active Marlowe contracts"
<> O.progDesc "This command-line tool watches the blockchain for Marlowe contract transactions."
<> O.header "marlowe-finder : find Marlowe contract transactions"
)
8 changes: 5 additions & 3 deletions marlowe-apps/marlowe-apps.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,16 @@ executable marlowe-finder
main-is: Main.hs
hs-source-dirs: finder
build-depends:
, aeson
, base >=4.9 && <5
, containers
, eventuo11y >=0.9 && <0.11
, eventuo11y-dsl ^>=0.2
, eventuo11y-json ^>=0.3
, marlowe-apps
, marlowe-runtime
, marlowe-chain-sync
, marlowe-client
, marlowe-runtime:{marlowe-runtime, history-api, sync-api}
, optparse-applicative
, text
, time-units

ghc-options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ newtype TxId = TxId {unTxId :: ByteString}

newtype TxIx = TxIx {unTxIx :: Word16}
deriving stock (Show, Eq, Ord, Generic)
deriving newtype (Num, Integral, Real, Enum, Bounded, Binary, ToJSON, Variations, Hashable)
deriving newtype (Num, Integral, Real, Enum, Bounded, Binary, ToJSON, ToJSONKey, Variations, Hashable)

newtype CertIx = CertIx {unCertIx :: Word64}
deriving stock (Show, Eq, Ord, Generic)
Expand Down
5 changes: 5 additions & 0 deletions marlowe-client/src/Control/Monad/Trans/Marlowe/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Data.Coerce (coerce)
import Data.Foldable (asum)
import Data.Set (Set)
import Data.Time (UTCTime)
import Language.Marlowe.Protocol.BulkSync.Client (MarloweBulkSyncClient)
import Language.Marlowe.Protocol.Client (MarloweRuntimeClient (..), hoistMarloweRuntimeClient)
import Language.Marlowe.Protocol.HeaderSync.Client (MarloweHeaderSyncClient)
import Language.Marlowe.Protocol.Load.Client (MarloweLoadClient, pushContract)
Expand Down Expand Up @@ -174,6 +175,10 @@ runMarloweSyncClient = runMarloweRuntimeClient . RunMarloweSyncClient
runMarloweHeaderSyncClient :: (MonadMarlowe m) => MarloweHeaderSyncClient m a -> m a
runMarloweHeaderSyncClient = runMarloweRuntimeClient . RunMarloweHeaderSyncClient

-- | Run a MarloweBulkSyncClient. Used to synchronize with contract transactions.
runMarloweBulkSyncClient :: (MonadMarlowe m) => MarloweBulkSyncClient m a -> m a
runMarloweBulkSyncClient = runMarloweRuntimeClient . RunMarloweBulkSyncClient

-- | Run a MarloweQueryClient.
runMarloweQueryClient :: (MonadMarlowe m) => MarloweQueryClient m a -> m a
runMarloweQueryClient = runMarloweRuntimeClient . RunMarloweQueryClient
Expand Down
Loading