Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: store interface #1436

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ library
Simplex.Messaging.Agent.Store.AgentStore
Simplex.Messaging.Agent.Store.Common
Simplex.Messaging.Agent.Store.DB
Simplex.Messaging.Agent.Store.Interface
Simplex.Messaging.Agent.Store.Migrations
Simplex.Messaging.Agent.Store.Shared
Simplex.Messaging.Agent.TRcvQueues
Expand Down
3 changes: 2 additions & 1 deletion src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 18 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 18 in src/Simplex/Messaging/Agent.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

-- |
-- Module : Simplex.Messaging.Agent
Expand Down Expand Up @@ -170,6 +170,7 @@
import Simplex.Messaging.Agent.Store.AgentStore
import Simplex.Messaging.Agent.Store.Common (DBStore)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL)
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
import Simplex.Messaging.Client (SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse)
Expand Down Expand Up @@ -279,7 +280,7 @@
t_ <- atomically (swapTVar acThread Nothing) $>>= (liftIO . deRefWeak)
disconnectAgentClient c
mapM_ killThread t_
liftIO $ closeStore store
liftIO $ closeDBStore store

resumeAgentClient :: AgentClient -> IO ()
resumeAgentClient c = atomically $ writeTVar (active c) True
Expand Down
14 changes: 2 additions & 12 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
Expand Down Expand Up @@ -70,6 +69,7 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store (createStore)
import Simplex.Messaging.Agent.Store.Common (DBStore)
import Simplex.Messaging.Agent.Store.Interface (DBOpts)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
Expand All @@ -87,11 +87,6 @@ import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryA
import System.Mem.Weak (Weak)
import System.Random (StdGen, newStdGen)
import UnliftIO.STM
#if defined(dbPostgres)
import Database.PostgreSQL.Simple (ConnectInfo (..))
#else
import Data.ByteArray (ScrubbedBytes)
#endif

type AM' a = ReaderT Env IO a

Expand Down Expand Up @@ -277,13 +272,8 @@ newSMPAgentEnv config store = do
multicastSubscribers <- newTMVarIO 0
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}

#if defined(dbPostgres)
createAgentStore :: ConnectInfo -> String -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createAgentStore :: DBOpts -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createAgentStore = createStore
#else
createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
createAgentStore = createStore
#endif

data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
Expand Down
31 changes: 4 additions & 27 deletions src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
Expand Down Expand Up @@ -26,13 +25,12 @@ import Data.List (find)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (isJust)
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.Store.Common
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.Interface (DBOpts, createDBStore)
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import qualified Simplex.Messaging.Crypto as C
Expand All @@ -54,30 +52,9 @@ import Simplex.Messaging.Protocol
VersionSMPC,
)
import qualified Simplex.Messaging.Protocol as SMP
#if defined(dbPostgres)
import Database.PostgreSQL.Simple (ConnectInfo (..))
import qualified Simplex.Messaging.Agent.Store.Postgres as Store
#else
import Data.ByteArray (ScrubbedBytes)
import qualified Simplex.Messaging.Agent.Store.SQLite as Store
#endif

#if defined(dbPostgres)
createStore :: ConnectInfo -> String -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore connectInfo schema = Store.createDBStore connectInfo schema Migrations.app
#else
createStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
createStore dbFilePath dbKey keepKey = Store.createDBStore dbFilePath dbKey keepKey Migrations.app
#endif

closeStore :: DBStore -> IO ()
closeStore = Store.closeDBStore

reopenStore :: DBStore -> IO ()
reopenStore = Store.reopenDBStore

execSQL :: DB.Connection -> Text -> IO [Text]
execSQL = Store.execSQL

createStore :: DBOpts -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore dbOpts = createDBStore dbOpts Migrations.app

-- * Queue types

Expand Down
14 changes: 14 additions & 0 deletions src/Simplex/Messaging/Agent/Store/Interface.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{-# LANGUAGE CPP #-}

module Simplex.Messaging.Agent.Store.Interface
#if defined(dbPostgres)
( module Simplex.Messaging.Agent.Store.Postgres,
)
where
import Simplex.Messaging.Agent.Store.Postgres
#else
( module Simplex.Messaging.Agent.Store.SQLite,
)
where
import Simplex.Messaging.Agent.Store.SQLite
#endif
41 changes: 22 additions & 19 deletions src/Simplex/Messaging/Agent/Store/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
{-# LANGUAGE ScopedTypeVariables #-}

module Simplex.Messaging.Agent.Store.Postgres
( createDBStore,
( DBOpts (..),
createDBStore,
closeDBStore,
reopenDBStore,
execSQL,
Expand All @@ -14,47 +15,49 @@ where

import Control.Exception (throwIO)
import Control.Monad (unless, void)
import Data.ByteString (ByteString)
import Data.Functor (($>))
import Data.String (fromString)
import Data.Text (Text)
import Database.PostgreSQL.Simple (ConnectInfo (..), Only (..))
import Database.PostgreSQL.Simple (Only (..))
import qualified Database.PostgreSQL.Simple as PSQL
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Simplex.Messaging.Agent.Store.Migrations (migrateSchema)
import Simplex.Messaging.Agent.Store.Postgres.Common
import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB
import Simplex.Messaging.Agent.Store.Postgres.Util (createDBAndUserIfNotExists)
import Simplex.Messaging.Agent.Store.Shared (Migration (..), MigrationConfirmation (..), MigrationError (..))
import Simplex.Messaging.Util (ifM)
import UnliftIO.Exception (bracketOnError, onException)
import UnliftIO.MVar
import UnliftIO.STM

-- | Create a new Postgres DBStore with the given connection info, schema name and migrations.
-- This function creates the user and/or database passed in connectInfo if they do not exist
-- (expects the default 'postgres' user and 'postgres' db to exist).
data DBOpts = DBOpts
{ connstr :: ByteString,
schema :: String
}

-- | Create a new Postgres DBStore with the given connection string, schema name and migrations.
-- If passed schema does not exist in connectInfo database, it will be created.
-- Applies necessary migrations to schema.
-- TODO [postgres] authentication / user password, db encryption (?)
createDBStore :: ConnectInfo -> String -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createDBStore connectInfo schema migrations confirmMigrations = do
createDBAndUserIfNotExists connectInfo
st <- connectPostgresStore connectInfo schema
createDBStore :: DBOpts -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createDBStore DBOpts {connstr, schema} migrations confirmMigrations = do
st <- connectPostgresStore connstr schema
r <- migrateSchema st migrations confirmMigrations True `onException` closeDBStore st
case r of
Right () -> pure $ Right st
Left e -> closeDBStore st $> Left e

connectPostgresStore :: ConnectInfo -> String -> IO DBStore
connectPostgresStore dbConnectInfo dbSchema = do
(dbConn, dbNew) <- connectDB dbConnectInfo dbSchema -- TODO [postgres] analogue for dbBusyLoop?
connectPostgresStore :: ByteString -> String -> IO DBStore
connectPostgresStore dbConnstr dbSchema = do
(dbConn, dbNew) <- connectDB dbConnstr dbSchema -- TODO [postgres] analogue for dbBusyLoop?
dbConnection <- newMVar dbConn
dbClosed <- newTVarIO False
pure DBStore {dbConnectInfo, dbSchema, dbConnection, dbNew, dbClosed}
pure DBStore {dbConnstr, dbSchema, dbConnection, dbNew, dbClosed}

connectDB :: ConnectInfo -> String -> IO (DB.Connection, Bool)
connectDB dbConnectInfo schema = do
db <- PSQL.connect dbConnectInfo
connectDB :: ByteString -> String -> IO (DB.Connection, Bool)
connectDB connstr schema = do
db <- PSQL.connectPostgreSQL connstr
schemaExists <- prepare db `onException` PSQL.close db
let dbNew = not schemaExists
pure (db, dbNew)
Expand Down Expand Up @@ -84,12 +87,12 @@ closeDBStore st@DBStore {dbClosed} =
atomically $ writeTVar dbClosed True

openPostgresStore_ :: DBStore -> IO ()
openPostgresStore_ DBStore {dbConnectInfo, dbSchema, dbConnection, dbClosed} =
openPostgresStore_ DBStore {dbConnstr, dbSchema, dbConnection, dbClosed} =
bracketOnError
(takeMVar dbConnection)
(tryPutMVar dbConnection)
$ \_dbConn -> do
(dbConn, _dbNew) <- connectDB dbConnectInfo dbSchema
(dbConn, _dbNew) <- connectDB dbConnstr dbSchema
atomically $ writeTVar dbClosed False
putMVar dbConnection dbConn

Expand Down
3 changes: 2 additions & 1 deletion src/Simplex/Messaging/Agent/Store/Postgres/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ module Simplex.Messaging.Agent.Store.Postgres.Common
)
where

import Data.ByteString (ByteString)
import qualified Database.PostgreSQL.Simple as PSQL
import UnliftIO.MVar
import UnliftIO.STM

-- TODO [postgres] use log_min_duration_statement instead of custom slow queries (SQLite's Connection type)
data DBStore = DBStore
{ dbConnectInfo :: PSQL.ConnectInfo,
{ dbConnstr :: ByteString,
dbSchema :: String,
dbConnection :: MVar PSQL.Connection,
dbClosed :: TVar Bool,
Expand Down
1 change: 0 additions & 1 deletion src/Simplex/Messaging/Agent/Store/Postgres/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

module Simplex.Messaging.Agent.Store.Postgres.Util
( createDBAndUserIfNotExists,
-- for tests
dropSchema,
dropAllSchemasExceptSystem,
dropDatabaseAndUser,
epoberezkin marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
14 changes: 11 additions & 3 deletions src/Simplex/Messaging/Agent/Store/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}

Check warning on line 24 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields

Check warning on line 24 in src/Simplex/Messaging/Agent/Store/SQLite.hs

View workflow job for this annotation

GitHub Actions / build-ubuntu-20.04-8.10.7

unrecognised warning flag: -fno-warn-ambiguous-fields
{-# OPTIONS_GHC -fno-warn-orphans #-}

module Simplex.Messaging.Agent.Store.SQLite
( createDBStore,
( DBOpts (..),
createDBStore,
closeDBStore,
reopenDBStore,
execSQL,
Expand Down Expand Up @@ -64,8 +65,15 @@

-- * SQLite Store implementation

createDBStore :: FilePath -> ScrubbedBytes -> Bool -> [Migration] -> MigrationConfirmation -> Bool -> IO (Either MigrationError DBStore)
createDBStore dbFilePath dbKey keepKey migrations confirmMigrations vacuum = do
data DBOpts = DBOpts
{ dbFilePath :: FilePath,
dbKey :: ScrubbedBytes,
keepKey :: Bool,
vacuum :: Bool
}

createDBStore :: DBOpts -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createDBStore DBOpts {dbFilePath, dbKey, keepKey, vacuum} migrations confirmMigrations = do
let dbDir = takeDirectory dbFilePath
createDirectoryIfMissing True dbDir
st <- connectSQLiteStore dbFilePath dbKey keepKey
Expand Down
5 changes: 3 additions & 2 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Agent.Store.Common (DBStore (..), withTransaction)
import Simplex.Messaging.Agent.Store.Interface
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), SMPProxyFallback (..), SMPProxyMode (..), TransportSessionMode (..), defaultClientConfig)
Expand Down Expand Up @@ -3107,13 +3108,13 @@ getSMPAgentClient' clientId cfg' initServers dbPath = do

#if defined(dbPostgres)
createStore :: String -> IO (Either MigrationError DBStore)
createStore schema = createAgentStore testDBConnectInfo schema MCError
createStore schema = createAgentStore (DBOpts testDBConnstr schema) MCError

insertUser :: DBStore -> IO ()
insertUser st = withTransaction st (`DB.execute_` "INSERT INTO users DEFAULT VALUES")
#else
createStore :: String -> IO (Either MigrationError DBStore)
createStore dbPath = createAgentStore dbPath "" False MCError True
createStore dbPath = createAgentStore (DBOpts dbPath "" False True) MCError

insertUser :: DBStore -> IO ()
insertUser st = withTransaction st (`DB.execute_` "INSERT INTO users (user_id) VALUES (1)")
Expand Down
22 changes: 17 additions & 5 deletions tests/AgentTests/MigrationTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ import Control.Monad
import Data.Maybe (fromJust)
import Data.Word (Word32)
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import Simplex.Messaging.Agent.Store.Interface
import Simplex.Messaging.Agent.Store.Migrations (migrationsToRun)
import Simplex.Messaging.Agent.Store.Shared
import System.Random (randomIO)
import Test.Hspec
#if defined(dbPostgres)
import Database.PostgreSQL.Simple (fromOnly)
import Fixtures
import Simplex.Messaging.Agent.Store.Postgres (closeDBStore, createDBStore)
import Simplex.Messaging.Agent.Store.Postgres.Util (dropSchema)
import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB
#else
import Database.SQLite.Simple (fromOnly)
import Simplex.Messaging.Agent.Store.SQLite (closeDBStore, createDBStore)
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import System.Directory (removeFile)
#endif
Expand Down Expand Up @@ -203,8 +202,13 @@ testSchema :: Word32 -> String
testSchema randSuffix = "test_migrations_schema" <> show randSuffix

createStore :: Word32 -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore randSuffix migrations confirmMigrations =
createDBStore testDBConnectInfo (testSchema randSuffix) migrations confirmMigrations
createStore randSuffix migrations confirmMigrations = do
let dbOpts =
DBOpts {
connstr = testDBConnstr,
schema = testSchema randSuffix
}
createDBStore dbOpts migrations confirmMigrations

cleanup :: Word32 -> IO ()
cleanup randSuffix = dropSchema testDBConnectInfo (testSchema randSuffix)
Expand All @@ -218,7 +222,15 @@ testDB :: Word32 -> FilePath
testDB randSuffix = "tests/tmp/test_migrations.db" <> show randSuffix

createStore :: Word32 -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore randSuffix migrations migrationConf = createDBStore (testDB randSuffix) "" False migrations migrationConf True
createStore randSuffix migrations confirmMigrations = do
let dbOpts =
DBOpts {
dbFilePath = testDB randSuffix,
dbKey = "",
keepKey = False,
vacuum = True
}
createDBStore dbOpts migrations confirmMigrations

cleanup :: Word32 -> IO ()
cleanup randSuffix = removeFile (testDB randSuffix)
Expand Down
6 changes: 3 additions & 3 deletions tests/AgentTests/NotificationTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, SENT)
import Simplex.Messaging.Agent.Store.AgentStore (getSavedNtfToken)
import Simplex.Messaging.Agent.Store (closeStore, reopenStore)
import Simplex.Messaging.Agent.Store.Common (withTransaction)
import Simplex.Messaging.Agent.Store.Interface (closeDBStore, reopenDBStore)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
Expand Down Expand Up @@ -501,7 +501,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag

threadDelay 500000
suspendAgent alice 0
closeStore store
closeDBStore store
threadDelay 1000000
putStrLn "before opening the database from another agent"

Expand All @@ -512,7 +512,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag

threadDelay 1000000
putStrLn "after closing the database in another agent"
reopenStore store
reopenDBStore store
foregroundAgent alice
threadDelay 500000

Expand Down
Loading
Loading