Skip to content

Commit

Permalink
Remove resendPendingSubmitRequests in websocket internals
Browse files Browse the repository at this point in the history
  • Loading branch information
marcusbfs committed Feb 13, 2025
1 parent 06a5955 commit 99b98a0
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 147 deletions.
153 changes: 7 additions & 146 deletions src/Internal/QueryM/Ogmios/Mempool.purs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import Aeson
, Aeson
, JsonDecodeError(UnexpectedValue, TypeMismatch)
, decodeAeson
, encodeAeson
, getField
, parseJsonStringToAeson
, stringifyAeson
Expand All @@ -48,7 +47,6 @@ import Ctl.Internal.QueryM.Ogmios.Mempool.Dispatcher
, Dispatcher
, GenericPendingRequests
, PendingRequests
, PendingSubmitTxRequests
, RequestBody
, WebsocketDispatch
, mkWebsocketDispatch
Expand Down Expand Up @@ -79,7 +77,6 @@ import Ctl.Internal.QueryM.Ogmios.Types
, decodeOgmios
, decodeResult
, ogmiosDecodeErrorToError
, submitSuccessPartialResp
)
import Ctl.Internal.Service.Helpers (aesonNull, aesonObject, aesonString)
import Data.Argonaut.Encode.Encoders as Argonaut
Expand All @@ -89,14 +86,12 @@ import Data.Foldable (foldl)
import Data.Generic.Rep (class Generic)
import Data.Log.Level (LogLevel(Error, Debug))
import Data.Map as Map
import Data.Maybe (Maybe(Just, Nothing), maybe)
import Data.Newtype (class Newtype, unwrap, wrap)
import Data.Maybe (Maybe(Nothing, Just))
import Data.Newtype (class Newtype, wrap)
import Data.Show.Generic (genericShow)
import Data.Traversable (for_, traverse_)
import Data.Tuple (fst)
import Data.Tuple.Nested (type (/\), (/\))
import Effect (Effect)
import Effect.Aff (Aff, Canceler(Canceler), delay, launchAff_, makeAff, runAff_)
import Effect.Aff (Aff, Canceler(Canceler), makeAff)
import Effect.Class (liftEffect)
import Effect.Exception (Error, error)
import Effect.Ref as Ref
Expand All @@ -109,25 +104,6 @@ type Logger = LogLevel -> String -> Effect Unit
-- Ogmios Local Tx Monitor Protocol
--------------------------------------------------------------------------------

acquireMempoolSnapshotAff
:: OgmiosWebSocket -> Logger -> Aff MempoolSnapshotAcquired
acquireMempoolSnapshotAff ogmiosWs logger =
mkOgmiosRequestAff ogmiosWs logger acquireMempoolSnapshotCall
_.acquireMempool
unit

mempoolSnapshotHasTxAff
:: OgmiosWebSocket
-> Logger
-> MempoolSnapshotAcquired
-> TransactionHash
-> Aff Boolean
mempoolSnapshotHasTxAff ogmiosWs logger ms txh =
unwrap <$> mkOgmiosRequestAff ogmiosWs logger
(mempoolSnapshotHasTxCall ms)
_.mempoolHasTx
txh

acquireMempoolSnapshotCall
:: JsonRpc2Call Unit MempoolSnapshotAcquired
acquireMempoolSnapshotCall =
Expand Down Expand Up @@ -160,20 +136,6 @@ releaseMempoolCall
releaseMempoolCall _ =
mkOgmiosCallTypeNoArgs "releaseMempool"

withMempoolSnapshot
:: OgmiosWebSocket
-> Logger
-> (Maybe MempoolSnapshotAcquired -> Aff Unit)
-> Effect Unit
withMempoolSnapshot ogmiosWs logger cont =
flip runAff_ (acquireMempoolSnapshotAff ogmiosWs logger) $ case _ of
Left err -> do
logger Error $
"Failed to acquire a mempool snapshot: Error: " <> show err
launchAff_ (cont Nothing)
Right mempoolSnapshot ->
launchAff_ (cont $ Just mempoolSnapshot)

--------------------------------------------------------------------------------
-- Helpers
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -224,12 +186,11 @@ listeners (WebSocket _ ls) = ls
type IsTxConfirmed = TransactionHash -> Aff Boolean

mkOgmiosWebSocketAff
:: IsTxConfirmed
-> Logger
:: Logger
-> String
-> Aff OgmiosWebSocket
mkOgmiosWebSocketAff isTxConfirmed logger serverUrl = do
lens <- liftEffect $ mkOgmiosWebSocketLens logger isTxConfirmed
mkOgmiosWebSocketAff logger serverUrl = do
lens <- liftEffect $ mkOgmiosWebSocketLens logger
makeAff $ mkServiceWebSocket lens serverUrl

mkServiceWebSocket
Expand Down Expand Up @@ -262,7 +223,6 @@ mkServiceWebSocket lens url continue = do
lens.logger Debug $
lens.serviceName <>
" WebSocket connection re-established, resending pending requests..."
lens.resendPendingRequests ws
false -> do
lens.logger Debug $ "Connection to " <> lens.serviceName <> " established"
Ref.write true hasConnectedOnceRef
Expand All @@ -279,79 +239,6 @@ mkServiceWebSocket lens url continue = do
_wsClose ws
continue $ Left $ err

--------------------------------------------------------------------------------
-- Resend pending `SubmitTx` requests
--------------------------------------------------------------------------------

-- | For each pending `SubmitTx` request, checks whether the transaction has
-- | been added to the mempool or has been included in a block before retrying
-- | the request.
resendPendingSubmitRequests
:: OgmiosWebSocket
-> IsTxConfirmed
-> Logger
-> (RequestBody -> Effect Unit)
-> Dispatcher
-> PendingSubmitTxRequests
-> Effect Unit
resendPendingSubmitRequests
ogmiosWs
isTxConfirmed
logger
sendRequest
dispatcher
pr = do
submitTxPendingRequests <- Ref.read pr
unless (Map.isEmpty submitTxPendingRequests) do
-- Acquiring a mempool snapshot should never fail and,
-- after ws reconnection, should be instantaneous.
withMempoolSnapshot ogmiosWs logger case _ of
Nothing ->
liftEffect $ traverse_ (sendRequest <<< fst) submitTxPendingRequests
Just ms -> do
-- A delay of 5 sec for transactions to be processed by the node
-- and added to the mempool:
delay (wrap 5000.0)
let (pr' :: Array _) = Map.toUnfoldable submitTxPendingRequests
for_ pr' \(listenerId /\ requestBody /\ txHash) ->
handlePendingSubmitRequest ms listenerId requestBody txHash
where
log :: String -> Boolean -> TransactionHash -> Aff Unit
log label value txHash =
liftEffect $ logger Debug $
label <> ": " <> show value <> " TransactionHash: " <> show txHash

handlePendingSubmitRequest
:: MempoolSnapshotAcquired
-> ListenerId
-> RequestBody
-> TransactionHash
-> Aff Unit
handlePendingSubmitRequest ms listenerId requestBody txHash = do
-- Check if the transaction was added to the mempool:
txInMempool <- mempoolSnapshotHasTxAff ogmiosWs logger ms txHash
log "Tx in the mempool" txInMempool txHash
retrySubmitTx <-
if txInMempool then pure false
else do
-- Check if the transaction was included in the block:
txConfirmed <- isTxConfirmed txHash
log "Tx confirmed" txConfirmed txHash
unless txConfirmed $ liftEffect do
sendRequest requestBody
pure (not txConfirmed)
-- Manually dispatch `SubmitTx` response if resending is not required:
unless retrySubmitTx $ liftEffect do
Ref.modify_ (Map.delete listenerId) pr
dispatchMap <- Ref.read dispatcher
Ref.modify_ (Map.delete listenerId) dispatcher
Map.lookup listenerId dispatchMap #
maybe (pure unit) (_ $ submitSuccessPartialRespInner)
where
submitSuccessPartialRespInner :: Aeson
submitSuccessPartialRespInner =
encodeAeson $ submitSuccessPartialResp txHash

--------------------------------------------------------------------------------
-- `MkServiceWebSocketLens` for ogmios
--------------------------------------------------------------------------------
Expand All @@ -361,17 +248,14 @@ type MkServiceWebSocketLens (listeners :: Type) =
, dispatcher :: Dispatcher
, logger :: Logger
, typedWebSocket :: JsWebSocket -> WebSocket listeners
, resendPendingRequests :: JsWebSocket -> Effect Unit
}

mkOgmiosWebSocketLens
:: Logger
-> IsTxConfirmed
-> Effect (MkServiceWebSocketLens OgmiosListeners)
mkOgmiosWebSocketLens logger isTxConfirmed = do
mkOgmiosWebSocketLens logger = do
dispatcher <- newDispatcher
pendingRequests <- newPendingRequests
pendingSubmitTxRequests <- newPendingRequests
pure $
let
ogmiosWebSocket :: JsWebSocket -> OgmiosWebSocket
Expand All @@ -388,21 +272,11 @@ mkOgmiosWebSocketLens logger isTxConfirmed = do
mkListenerSet dispatcher pendingRequests
}

resendPendingRequests :: JsWebSocket -> Effect Unit
resendPendingRequests ws = do
let sendRequest = _wsSend ws (logger Debug)
Ref.read pendingRequests >>= traverse_ sendRequest
resendPendingSubmitRequests (ogmiosWebSocket ws) isTxConfirmed
logger
sendRequest
dispatcher
pendingSubmitTxRequests
in
{ serviceName: "ogmios"
, dispatcher
, logger
, typedWebSocket: ogmiosWebSocket
, resendPendingRequests
}

--------------------------------------------------------------------------------
Expand Down Expand Up @@ -472,19 +346,6 @@ mkListenerSet dispatcher pendingRequests =
Ref.modify_ (Map.insert reflection requestBody) pendingRequests
}

-- | Builds an Ogmios request action using `Aff`
mkOgmiosRequestAff
:: forall (request :: Type) (response :: Type)
. OgmiosWebSocket
-> Logger
-> JsonRpc2.JsonRpc2Call request response
-> (OgmiosListeners -> ListenerSet request response)
-> request
-> Aff response
mkOgmiosRequestAff ogmiosWs = mkRequestAff
(listeners ogmiosWs)
(underlyingWebSocket ogmiosWs)

mkRequestAff
:: forall (request :: Type) (response :: Type) (listeners :: Type)
. listeners
Expand Down
2 changes: 1 addition & 1 deletion test/Testnet/Contract/OgmiosMempool.purs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mkWebsocket = do
ogmiosConfig <- case config.backend of
CtlBackend ctlBackend _ -> pure ctlBackend.ogmiosConfig
_ -> throwError $ error "Ogmios backend not supported"
liftAff $ mkOgmiosWebSocketAff (const $ pure true)
liftAff $ mkOgmiosWebSocketAff
(mkLogger config.logLevel config.customLogger)
(mkWsUrl ogmiosConfig)

Expand Down

0 comments on commit 99b98a0

Please sign in to comment.