diff --git a/Database/PostgreSQL/Simple/Streaming.hs b/Database/PostgreSQL/Simple/Streaming.hs index 60832a8..2c028d0 100644 --- a/Database/PostgreSQL/Simple/Streaming.hs +++ b/Database/PostgreSQL/Simple/Streaming.hs @@ -5,39 +5,81 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +{- | + +There are two main ways to fetch data from PostgreSQL in a streaming +fashion: + +1. Directly, using 'ResourceT' to manage resources. + +2. Using the various bracketed @with*@ functions; these play nicely + with @ContT@ from "Control.Monad.Trans.Cont" or - if you will be + running it all directly in IO with no other transformers on the + stack - the + package. + +-} module Database.PostgreSQL.Simple.Streaming - ( -- * Queries that return results - query + ( -- * Obtaining a connection + withPGConnection + + -- * Queries that return results + , query + , withQuery , query_ + , withQuery_ -- ** Queries taking parser as argument , queryWith + , withQueryWith , queryWith_ + , withQueryWith_ -- * Queries that stream results , stream + , withStream , streamWithOptions + , withStreamWithOptions , stream_ + , withStream_ , streamWithOptions_ + , withStreamWithOptions_ -- ** Queries that stream results taking a parser as an argument , streamWith + , withStreamWith , streamWithOptionsAndParser + , withStreamWithOptionsAndParser , streamWith_ + , withStreamWith_ , streamWithOptionsAndParser_ + , withStreamWithOptionsAndParser_ -- * Streaming data in and out of PostgreSQL with @COPY@ , copyIn , copyOut + , withCopyOut -- * Re-exported symbols , runResourceT + -- * Obtaining a connection + , connectPostgreSQL + , connect + , ConnectInfo(..) + , defaultConnectInfo + , postgreSQLConnectionString + , close + -- ** Possible exceptions + , QueryError(..) + , ResultError(..) + , ManyErrors(..) + , SqlError(..) ) where import Control.Exception.Safe (MonadMask, catch, throwM, mask) import Control.Monad (unless) -import Control.Monad.Catch (onException) +import Control.Monad.Catch (bracket, onException, finally) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Reader (runReaderT) @@ -57,7 +99,9 @@ import qualified Database.PostgreSQL.LibPQ as LibPQ import Database.PostgreSQL.Simple (Connection, QueryError(..), ResultError(..), ToRow, FromRow, formatQuery, FoldOptions(..), FetchQuantity(..), execute_, - defaultFoldOptions, rollback) + defaultFoldOptions, rollback, connect, connectPostgreSQL, + ConnectInfo(..), defaultConnectInfo, postgreSQLConnectionString, + close, SqlError(..)) import qualified Database.PostgreSQL.Simple.Copy as Pg import Database.PostgreSQL.Simple.FromRow (fromRow) import Database.PostgreSQL.Simple.Internal @@ -74,6 +118,13 @@ import Streaming.Internal (Stream(..)) import Streaming.Prelude (Of(..), reread, for, untilRight) import qualified Streaming.Prelude as S +-- | A bracketed method of obtaining a PostgreSQL connection which +-- will close on an exception. +withPGConnection :: (MonadMask m, MonadIO m) => B.ByteString + -> (Connection -> m r) -> m r +withPGConnection connStr = bracket (liftIO (connectPostgreSQL connStr)) + (liftIO . close) + {-| Perform a @SELECT@ or other SQL query that is expected to return results. Uses @@ -125,6 +176,16 @@ query query conn template qs = doQuery fromRow conn template =<< liftIO (formatQuery conn template qs) +-- | A variant of 'query' that takes a continuation. +withQuery + :: (ToRow q, FromRow a, MonadIO m, MonadMask m) + => Connection + -> Query + -> q + -> (Stream (Of a) m () -> m r) + -> m r +withQuery = withQueryWith fromRow + -- | A version of 'query' that does not perform query substitution. query_ :: (FromRow r, MonadResource m) @@ -133,6 +194,15 @@ query_ -> Stream (Of r) m () query_ conn template@(Query que) = doQuery fromRow conn template que +-- | A version of 'withQuery' that does not perform query substitution. +withQuery_ + :: (FromRow a, MonadIO m, MonadMask m) + => Connection + -> Query + -> (Stream (Of a) m () -> m r) + -> m r +withQuery_ = withQueryWith_ fromRow + -- | A version of 'query' taking parser as argument. queryWith :: (ToRow q, MonadResource m) @@ -144,12 +214,35 @@ queryWith queryWith parser conn template qs = doQuery parser conn template =<< liftIO (formatQuery conn template qs) +-- | A version of 'withQuery' taking the row parser as an argument. +withQueryWith + :: (ToRow q, MonadIO m, MonadMask m) + => RowParser a + -> Connection + -> Query + -> q + -> (Stream (Of a) m () -> m r) + -> m r +withQueryWith parser conn template qs f = + liftIO (formatQuery conn template qs) + >>= \que -> doWithQuery parser conn template que f + -- | A version of 'query_' taking parser as argument. queryWith_ :: MonadResource m => RowParser r -> Connection -> Query -> Stream (Of r) m () queryWith_ parser conn template@(Query que) = doQuery parser conn template que +-- | A version of 'withQuery_' taking the row parser as an argument. +withQueryWith_ + :: (MonadIO m, MonadMask m) + => RowParser a + -> Connection + -> Query + -> (Stream (Of a) m () -> m r) + -> m r +withQueryWith_ parser conn template@(Query que) = doWithQuery parser conn template que + doQuery :: (MonadResource m) => RowParser r -> Connection -> Query -> B.ByteString -> Stream (Of r) m () @@ -197,6 +290,50 @@ doQuery parser conn q que = do _ -> streamResult conn parser result +-- @do@ prefix because we don't want to call this 'withQuery' +doWithQuery :: (MonadMask m, MonadIO m) + => RowParser a -> Connection -> Query -> B.ByteString + -> (Stream (Of a) m () -> m r) -> m r +doWithQuery parser conn q que f = do + ok <- + liftIO $ + withConnection conn $ \h -> + LibPQ.sendQuery h que <* LibPQ.setSingleRowMode h + if not ok + then liftIO (withConnection conn LibPQ.errorMessage) + >>= throwM . flip QueryError q . maybe "Unspecified error" C8.unpack + else f yieldResults `finally` drainRemainingResults + where + drainRemainingResults = S.effects (results conn) + + yieldResults = + for (results conn) $ \result -> do + status <- liftIO (LibPQ.resultStatus result) + case status of + LibPQ.EmptyQuery -> + liftIO $ throwM $ QueryError "query: Empty query" q + + LibPQ.CommandOk -> + liftIO $ throwM $ QueryError "query resulted in a command response" q + + LibPQ.CopyOut -> + liftIO $ throwM $ QueryError "query: COPY TO is not supported" q + + LibPQ.CopyIn -> + liftIO $ throwM $ QueryError "query: COPY FROM is not supported" q + + LibPQ.BadResponse -> + liftIO (throwResultError "query" result status) + + LibPQ.NonfatalError -> + liftIO (throwResultError "query" result status) + + LibPQ.FatalError -> + liftIO (throwResultError "query" result status) + + _ -> + streamResult conn parser result + results :: MonadIO m => Connection -> Stream (Of LibPQ.Result) m () results = reread (liftIO . flip withConnection LibPQ.getResult) @@ -255,6 +392,13 @@ stream => Connection -> Query -> params -> Stream (Of row) m () stream = streamWith fromRow +-- | A version of 'stream' that uses a continuation to help handle +-- resource allocation. +withStream :: (FromRow a, ToRow params, MonadMask m, MonadIO m) + => Connection -> Query -> params + -> (Stream (Of a) m () -> m r) -> m r +withStream = withStreamWith fromRow + streamWithOptions :: (FromRow row,ToRow params,MonadResource m,MonadMask m) => FoldOptions -> Connection @@ -263,12 +407,30 @@ streamWithOptions :: (FromRow row,ToRow params,MonadResource m,MonadMask m) -> Stream (Of row) m () streamWithOptions options = streamWithOptionsAndParser options fromRow +-- | A version of 'withStream' taking folding options as an +-- argument. +withStreamWithOptions :: (FromRow a, ToRow params, MonadMask m, MonadIO m) + => FoldOptions + -> Connection + -> Query + -> params + -> (Stream (Of a) m () -> m r) + -> m r +withStreamWithOptions options = withStreamWithOptionsAndParser options fromRow + -- | A version of 'stream' taking a parser as an argument. streamWith :: (ToRow params, MonadMask m, MonadResource m) => RowParser row -> Connection -> Query -> params -> Stream (Of row) m () streamWith = streamWithOptionsAndParser defaultFoldOptions +-- | A version of 'withStream' taking the row parser as an argument. +withStreamWith + :: (ToRow params, MonadMask m, MonadIO m) + => RowParser a -> Connection -> Query -> params + -> (Stream (Of a) m () -> m r) -> m r +withStreamWith = withStreamWithOptionsAndParser defaultFoldOptions + -- | A version of 'streamWithOptions' taking a parser as an argument. streamWithOptionsAndParser :: (ToRow params, MonadMask m, MonadResource m) @@ -282,6 +444,20 @@ streamWithOptionsAndParser options parser conn template qs = do q <- liftIO (formatQuery conn template qs) doFold options parser conn (Query q) +-- | A version of 'withStreamWithOptions' taking the row parser as an +-- argument. +withStreamWithOptionsAndParser + :: (ToRow params, MonadMask m, MonadIO m) + => FoldOptions + -> RowParser a + -> Connection + -> Query + -> params + -> (Stream (Of a) m () -> m r) + -> m r +withStreamWithOptionsAndParser options parser conn template qs f = do + q <- liftIO (formatQuery conn template qs) + withFold options parser conn (Query q) f -- | A version of 'stream' that does not perform query substitution. stream_ @@ -289,6 +465,11 @@ stream_ => Connection -> Query -> Stream (Of row) m () stream_ = streamWith_ fromRow +-- | A version of 'withStream_' that does not perform query substitution. +withStream_ :: (FromRow a, MonadMask m, MonadIO m) + => Connection -> Query -> (Stream (Of a) m () -> m r) -> m r +withStream_ = withStreamWith_ fromRow + streamWithOptions_ :: (FromRow row,MonadResource m,MonadMask m) => FoldOptions -> Connection @@ -296,12 +477,27 @@ streamWithOptions_ :: (FromRow row,MonadResource m,MonadMask m) -> Stream (Of row) m () streamWithOptions_ options = streamWithOptionsAndParser_ options fromRow +-- | A version of 'withStream_' that takes the fold options as an argument. +withStreamWithOptions_ :: (FromRow a, MonadMask m, MonadIO m) + => FoldOptions + -> Connection + -> Query + -> (Stream (Of a) m () -> m r) + -> m r +withStreamWithOptions_ options = withStreamWithOptionsAndParser_ options fromRow + -- | A version of 'stream_' taking a parser as an argument. streamWith_ :: (MonadMask m, MonadResource m) => RowParser row -> Connection -> Query -> Stream (Of row) m () streamWith_ parser conn template = doFold defaultFoldOptions parser conn template +-- | A version of 'withStream_' taking the row parser as an argument. +withStreamWith_ :: (MonadMask m, MonadIO m) + => RowParser a -> Connection -> Query + -> (Stream (Of a) m () -> m r) -> m r +withStreamWith_ = withStreamWithOptionsAndParser_ defaultFoldOptions + -- | A version of 'streamWithOptions_' taking a parser as an argument. streamWithOptionsAndParser_ :: (MonadMask m, MonadResource m) @@ -309,6 +505,14 @@ streamWithOptionsAndParser_ streamWithOptionsAndParser_ options parser conn template = doFold options parser conn template +-- | A version of 'withStreamWithOptions_' taking the fold options and +-- row parser as arguments. +withStreamWithOptionsAndParser_ + :: (MonadMask m, MonadIO m) + => FoldOptions -> RowParser a -> Connection -> Query + -> (Stream (Of a) m () -> m r) -> m r +withStreamWithOptionsAndParser_ = withFold + doFold :: forall row m. (MonadIO m,MonadMask m,MonadResource m) => FoldOptions @@ -320,9 +524,9 @@ doFold FoldOptions{..} parser conn q = do stat <- liftIO (withConnection conn LibPQ.transactionStatus) case stat of LibPQ.TransIdle -> - bracket (liftIO (beginMode transactionMode conn)) - (\_ -> ifInTransaction $ liftIO (commit conn)) - (\_ -> go `onException` ifInTransaction (liftIO (rollback conn))) + bracketStream (liftIO (beginMode transactionMode conn)) + (\_ -> ifInTransaction $ liftIO (commit conn)) + (\_ -> go `onException` ifInTransaction (liftIO (rollback conn))) LibPQ.TransInTrans -> go LibPQ.TransActive -> fail "foldWithOpts FIXME: PQ.TransActive" -- This _shouldn't_ occur in the current incarnation of @@ -349,7 +553,7 @@ doFold FoldOptions{..} parser conn q = do [ "DECLARE ", name, " NO SCROLL CURSOR FOR ", q ] return name - close name = + closeConn name = ifInTransaction $ (execute_ conn ("CLOSE " <> name) >> return ()) `catch` \ex -> -- Don't throw exception if CLOSE failed because the transaction is @@ -358,7 +562,7 @@ doFold FoldOptions{..} parser conn q = do go :: Stream (Of row) m () go = - bracket (liftIO declare) (liftIO . close) $ \(Query name) -> + bracketStream (liftIO declare) (liftIO . closeConn) $ \(Query name) -> let fetchQ = toByteString (byteString "FETCH FORWARD " <> intDec chunkSize <> @@ -392,6 +596,90 @@ doFold FoldOptions{..} parser conn q = do Automatic -> 256 Fixed n -> n +withFold :: forall a m r. (MonadIO m, MonadMask m) + => FoldOptions + -> RowParser a + -> Connection + -> Query + -> (Stream (Of a) m () -> m r) + -> m r +withFold FoldOptions{..} parser conn q f = do + stat <- liftIO (withConnection conn LibPQ.transactionStatus) + case stat of + LibPQ.TransIdle -> + bracket (liftIO (beginMode transactionMode conn)) + (\_ -> ifInTransaction (liftIO (commit conn))) + (\_ -> go `finally` ifInTransaction (liftIO (rollback conn))) + LibPQ.TransInTrans -> go + LibPQ.TransActive -> fail "foldWithOpts FIXME: PQ.TransActive" + -- This _shouldn't_ occur in the current incarnation of + -- the library, as we aren't using libpq asynchronously. + -- However, it could occur in future incarnations of + -- this library or if client code uses the Internal module + -- to use raw libpq commands on postgresql-simple connections. + LibPQ.TransInError -> fail "foldWithOpts FIXME: PQ.TransInError" + -- This should be turned into a better error message. + -- It is probably a bad idea to automatically roll + -- back the transaction and start another. + LibPQ.TransUnknown -> fail "foldWithOpts FIXME: PQ.TransUnknown" + -- Not sure what this means. + where + ifInTransaction m = do + stat <- liftIO (withConnection conn LibPQ.transactionStatus) + case stat of + LibPQ.TransInTrans -> m + _ -> return () + + declare = do + name <- newTempName conn + _ <- execute_ conn $ mconcat + [ "DECLARE ", name, " NO SCROLL CURSOR FOR ", q ] + return name + + closeConn name = + ifInTransaction $ + (execute_ conn ("CLOSE " <> name) >> return ()) `catch` \ex -> + -- Don't throw exception if CLOSE failed because the transaction is + -- aborted. Otherwise, it will throw away the original error. + unless (isFailedTransactionError ex) $ throwM ex + + go = bracket (liftIO declare) + (liftIO . closeConn) + (f . yieldResults) + + yieldResults :: Query -> Stream (Of a) m () + yieldResults (Query name) = + let fetchQ = + toByteString + (byteString "FETCH FORWARD " <> intDec chunkSize <> + byteString " FROM " <> + byteString name) + fetches = untilRight $ do + stat <- liftIO (withConnection conn LibPQ.transactionStatus) + case stat of + LibPQ.TransInTrans -> return () + _ -> fail "Stream transaction prematurely aborted" + result <- liftIO (exec conn fetchQ) + status <- liftIO (LibPQ.resultStatus result) + case status of + LibPQ.TuplesOk -> do + nrows <- liftIO (LibPQ.ntuples result) + if nrows > 0 + then return $ Left result + else return $ Right () + _ -> liftIO (throwResultError "fold" result status) + in for fetches (streamResult conn parser) + + -- FIXME: choose the Automatic chunkSize more intelligently + -- One possibility is to use the type of the results, although this + -- still isn't a perfect solution, given that common types (e.g. text) + -- are of highly variable size. + -- A refinement of this technique is to pick this number adaptively + -- as results are read in from the database. + chunkSize = case fetchQuantity of + Automatic -> 256 + Fixed n -> n + -------------------------------------------------------------------------------- -- @@ -485,10 +773,10 @@ liftMask maskVariant k = do loop $ k unmask -bracket +bracketStream :: (MonadIO m, MonadMask m, MonadResource m) => m a -> (a -> IO ()) -> (a -> Stream (Of b) m c) -> Stream (Of b) m c -bracket before after action = liftMask mask $ \restore -> do +bracketStream before after action = liftMask mask $ \restore -> do h <- lift before r <- restore (action h) `onTermination` after h liftIO (after h) @@ -549,3 +837,12 @@ copyOut conn q params = do case res of Pg.CopyOutRow bytes -> return (Left bytes) Pg.CopyOutDone n -> return (Right n) + +-- | A variant of 'copyOut' that takes a continuation to handle the output. +-- +-- No extra safety is gained with this function, but it can help +-- chaining together with all the other @with*@ functions. +withCopyOut :: (MonadIO m, ToRow params) + => Connection -> Query -> params + -> (Stream (Of B.ByteString) m Int64 -> m r) -> m r +withCopyOut conn q params f = f (copyOut conn q params)