diff --git a/CHANGELOG.md b/CHANGELOG.md index 207196f9..f3123235 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Add `PowerSyncDatabase.inMemory` to create an in-memory SQLite database with PowerSync. This may be useful for testing. +- The Supabase connector can now be subclassed to customize how rows are uploaded and how errors are handled. +- Experimental support for sync streams. ## 1.6.1 diff --git a/README.md b/README.md index aa5bdeca..ad0d92ed 100644 --- a/README.md +++ b/README.md @@ -26,18 +26,14 @@ and API documentation [here](https://powersync-ja.github.io/powersync-kotlin/). - This is the Kotlin Multiplatform SDK implementation. -- [connectors](./connectors/) - - - [SupabaseConnector.kt](./connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt) An example connector implementation for Supabase (Postgres). The backend - connector provides the connection between your application backend and the PowerSync managed database. It is used to: - 1. Retrieve a token to connect to the PowerSync service. - 2. Apply local changes on your backend application server (and from there, to your backend database). - - [integrations](./integrations/) - [room](./integrations/room/README.md): Allows using the [Room database library](https://developer.android.com/jetpack/androidx/releases/room) with PowerSync, making it easier to run typed queries on the database. - [sqldelight](./integrations/sqldelight/README.md): Allows using [SQLDelight](https://sqldelight.github.io/sqldelight) with PowerSync, also enabling typed statements on the database. - + - [SupabaseConnector.kt](./integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt) An example connector implementation for Supabase (Postgres). The backend + connector provides the connection between your application backend and the PowerSync managed database. It is used to: + 1. Retrieve a token to connect to the PowerSync service. + 2. Apply local changes on your backend application server (and from there, to your backend database). ## Demo Apps / Example Projects diff --git a/build.gradle.kts b/build.gradle.kts index 967269ec..e5d1df9c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -68,10 +68,10 @@ tasks.getByName("clean") { // Merges individual module docs into a single HTML output dependencies { dokka(project(":core:")) - dokka(project(":connectors:supabase")) dokka(project(":compose:")) dokka(project(":integrations:room")) dokka(project(":integrations:sqldelight")) + dokka(project(":integrations:supabase")) } dokka { diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 810d25d9..97eee155 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,7 +1,5 @@ package com.powersync -import androidx.sqlite.SQLiteConnection -import androidx.sqlite.execSQL import app.cash.turbine.test import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 1a8ed4f4..9f23f9b4 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -163,9 +163,10 @@ class SyncStreamTest : AbstractSyncTest(true) { requestedSyncStreams.clear() val subscription = database.syncStream("a").subscribe() + waitForSyncLinesChannelClosed() // Adding the subscription should reconnect - turbine.waitFor { it.connected && !it.downloading } + turbine.waitFor { it.connected } requestedSyncStreams shouldHaveSingleElement { val streams = it.jsonObject["streams"]!!.jsonObject val subscriptions = streams["subscriptions"]!!.jsonArray diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 0b533cfd..fed689ff 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -23,10 +23,12 @@ import io.ktor.client.HttpClient import io.ktor.client.engine.mock.toByteArray import io.ktor.http.ContentType import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path import kotlinx.serialization.json.JsonElement +import kotlin.coroutines.resume expect val factory: DatabaseDriverFactory @@ -102,6 +104,23 @@ internal class ActiveDatabaseTest( var connector = TestConnector() + suspend fun waitForSyncLinesChannelClosed() { + suspendCancellableCoroutine { continuation -> + var cancelled = false + continuation.invokeOnCancellation { + cancelled = true + } + + syncLines.invokeOnClose { + if (!cancelled) { + continuation.resume(Unit) + } + + syncLines = Channel() + } + } + } + fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } val db = @@ -123,7 +142,7 @@ internal class ActiveDatabaseTest( fun createSyncClient(): HttpClient { val engine = MockSyncService( - lines = syncLines, + lines = { syncLines }, generateCheckpoint = { checkpointResponse() }, syncLinesContentType = { syncLinesContentType }, trackSyncRequest = { diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index ea3cbf77..20c0dc12 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -38,7 +38,7 @@ import kotlinx.serialization.json.JsonElement */ @OptIn(LegacySyncImplementation::class) internal class MockSyncService( - private val lines: ReceiveChannel, + private val lines: () -> ReceiveChannel, private val syncLinesContentType: () -> ContentType, private val generateCheckpoint: () -> WriteCheckpointResponse, private val trackSyncRequest: suspend (HttpRequestData) -> Unit, @@ -60,12 +60,11 @@ internal class MockSyncService( trackSyncRequest(data) val job = scope.writer { - lines.consume { + lines().consume { while (true) { // Wait for a downstream listener being ready before requesting a sync line channel.awaitFreeSpace() - val line = receive() - when (line) { + when (val line = receive()) { is SyncLine -> { val serializedLine = JsonUtil.json.encodeToString(line) channel.writeStringUtf8("$serializedLine\n") diff --git a/demos/android-supabase-todolist/build.gradle.kts b/demos/android-supabase-todolist/build.gradle.kts index 391e8711..737ffd68 100644 --- a/demos/android-supabase-todolist/build.gradle.kts +++ b/demos/android-supabase-todolist/build.gradle.kts @@ -127,7 +127,7 @@ dependencies { // When adopting the PowerSync dependencies into your project, use the latest version available at // https://central.sonatype.com/artifact/com.powersync/core implementation(projects.core) // "com.powersync:core:latest.release" - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase:latest.release" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase:latest.release" implementation(projects.compose) // "com.powersync:compose:latest.release" implementation(libs.uuid) implementation(libs.kermit) diff --git a/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts b/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts index d8144eed..70fad978 100644 --- a/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts +++ b/demos/supabase-todolist/androidBackgroundSync/build.gradle.kts @@ -44,7 +44,7 @@ android { dependencies { // When copying this example, use the the current version available // at: https://central.sonatype.com/artifact/com.powersync/connector-supabase - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase" implementation(projects.demos.supabaseTodolist.shared) diff --git a/demos/supabase-todolist/shared/build.gradle.kts b/demos/supabase-todolist/shared/build.gradle.kts index 43b4a7a7..3a0fe227 100644 --- a/demos/supabase-todolist/shared/build.gradle.kts +++ b/demos/supabase-todolist/shared/build.gradle.kts @@ -42,7 +42,7 @@ kotlin { // When copying this example, use the current version available // at: https://central.sonatype.com/artifact/com.powersync/core api(projects.core) // "com.powersync:core" - implementation(projects.connectors.supabase) // "com.powersync:connector-supabase" + implementation(projects.integrations.supabase) // "com.powersync:connector-supabase" implementation(projects.compose) // "com.powersync:compose" implementation(libs.uuid) implementation(compose.runtime) diff --git a/gradle.properties b/gradle.properties index 72b5be9e..a2776781 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.6.1 +LIBRARY_VERSION=1.7.0 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/ diff --git a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt index e9267193..49b44ec6 100644 --- a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt +++ b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/PowerSyncRoomTest.kt @@ -2,7 +2,9 @@ package com.powersync.integrations.room import androidx.sqlite.driver.bundled.BundledSQLiteDriver import app.cash.turbine.turbineScope +import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity import co.touchlab.kermit.loggerConfigInit import com.powersync.PowerSyncDatabase import com.powersync.db.getString @@ -28,6 +30,7 @@ class PowerSyncRoomTest { @AfterTest fun tearDown() { + logger.i { "Closing Room database" } database.close() } @@ -35,7 +38,6 @@ class PowerSyncRoomTest { fun roomWritePowerSyncRead() = runTest { database.userDao().create(User(id = "test", name = "Test user")) - val logger = Logger(loggerConfigInit()) val powersync = PowerSyncDatabase.opened( @@ -61,7 +63,6 @@ class PowerSyncRoomTest { @Test fun roomWritePowerSyncWatch() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -88,12 +89,13 @@ class PowerSyncRoomTest { turbine.awaitItem() shouldHaveSize 1 turbine.cancel() } + + powersync.close() } @Test fun powersyncWriteRoomRead() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -108,12 +110,12 @@ class PowerSyncRoomTest { database.userDao().getAll() shouldHaveSize 0 powersync.execute("insert into user values (uuid(), ?)", listOf("PowerSync user")) database.userDao().getAll() shouldHaveSize 1 + powersync.close() } @Test fun powersyncWriteRoomWatch() = runTest { - val logger = Logger(loggerConfigInit()) val pool = RoomConnectionPool(database, TestDatabase.schema) val powersync = @@ -133,5 +135,11 @@ class PowerSyncRoomTest { turbine.awaitItem() shouldHaveSize 1 turbine.cancel() } + + powersync.close() } + + companion object { + private val logger = Logger(loggerConfigInit(CommonWriter())) + } } diff --git a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt index 17bc4f6a..1dffff4a 100644 --- a/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt +++ b/integrations/room/src/commonIntegrationTest/kotlin/com/powersync/integrations/room/TestDatabase.kt @@ -37,7 +37,7 @@ interface UserDao { suspend fun delete(user: User) } -@Database(entities = [User::class], version = 1) +@Database(entities = [User::class], version = 1, exportSchema = false) @ConstructedBy(TestDatabaseConstructor::class) abstract class TestDatabase : RoomDatabase() { abstract fun userDao(): UserDao diff --git a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt index 3f8aad30..6da84ca3 100644 --- a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt +++ b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt @@ -5,6 +5,7 @@ import androidx.room.Transactor import androidx.room.execSQL import androidx.room.useReaderConnection import androidx.room.useWriterConnection +import androidx.sqlite.SQLiteException import androidx.sqlite.SQLiteStatement import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.driver.SQLiteConnectionPool @@ -73,7 +74,18 @@ public class RoomConnectionPool( db.getCoroutineScope().launch { val tables = schema.rawTables.map { it.name }.toTypedArray() db.invalidationTracker.createFlow(*tables, emitInitialState = false).collect { - transferPendingRoomUpdatesToPowerSync() + try { + transferPendingRoomUpdatesToPowerSync() + } catch (e: SQLiteException) { + // It can happen that we get an update shortly before the database is closed. Since this is + // asynchronous, we'd then be using the database in a closed state, which fails. We handle that by + // stopping the flow collection. + if (e.message == "Connection pool is closed") { + return@collect + } + + throw e + } } } } @@ -91,7 +103,7 @@ public class RoomConnectionPool( val changed = it.usePrepared("SELECT powersync_update_hooks('get')") { stmt -> check(stmt.step()) - json.decodeFromString>(stmt.getText(0)) + Json.decodeFromString>(stmt.getText(0)) } val userTables = @@ -114,10 +126,6 @@ public class RoomConnectionPool( override suspend fun close() { // Noop, Room database managed independently } - - private companion object { - val json = Json {} - } } private class RoomTransactionLease( diff --git a/connectors/README.md b/integrations/supabase/README.md similarity index 57% rename from connectors/README.md rename to integrations/supabase/README.md index 0c9a09c4..76218687 100644 --- a/connectors/README.md +++ b/integrations/supabase/README.md @@ -1,16 +1,12 @@ -# PowerSync Backend Connectors +# PowerSync Supabase Connector -Convenience implementations of backend connectors that provide the connection between your application backend and the PowerSync managed database. +Convenience implementation of a backend connector that provide the connection between your application backend and the PowerSync managed database +by delegating to Supabase. It is used to: 1. Retrieve a token to connect to the PowerSync service. 2. Apply local changes on your backend application server (and from there, to your backend database). +The connector is fairly basic, and also serves as an example for getting started. -## Provided Connectors - -### Supabase (Postgres) - -A basic implementation of a PowerSync Backend Connector for Supabase, that serves as getting started example. - -See a step-by-step tutorial for connecting to Supabase, [here](https://docs.powersync.com/integration-guides/supabase-+-powersync). \ No newline at end of file +See a step-by-step tutorial for connecting to Supabase, [here](https://docs.powersync.com/integration-guides/supabase-+-powersync). diff --git a/connectors/supabase/build.gradle.kts b/integrations/supabase/build.gradle.kts similarity index 58% rename from connectors/supabase/build.gradle.kts rename to integrations/supabase/build.gradle.kts index 0410af6a..9004bbcb 100644 --- a/connectors/supabase/build.gradle.kts +++ b/integrations/supabase/build.gradle.kts @@ -7,6 +7,7 @@ plugins { alias(libs.plugins.android.library) alias(libs.plugins.kotlinter) id("com.powersync.plugins.sonatype") + id("com.powersync.plugins.sharedbuild") id("dokka-convention") } @@ -22,15 +23,37 @@ kotlin { } explicitApi() + applyDefaultHierarchyTemplate() sourceSets { commonMain.dependencies { - api(project(":core")) + api(projects.core) implementation(libs.kotlinx.coroutines.core) implementation(libs.supabase.client) api(libs.supabase.auth) api(libs.supabase.storage) } + + val commonIntegrationTest by creating { + dependsOn(commonTest.get()) + + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.io) + implementation(libs.test.turbine) + implementation(libs.test.coroutines) + implementation(libs.test.kotest.assertions) + + implementation(libs.sqldelight.coroutines) + } + } + + // The PowerSync SDK links the core extension, so we can just run tests as-is. + jvmTest.get().dependsOn(commonIntegrationTest) + + // We have special setup in this build configuration to make these tests link the PowerSync extension, so they + // can run integration tests along with the executable for unit testing. + nativeTest.orNull?.dependsOn(commonIntegrationTest) } } diff --git a/connectors/supabase/gradle.properties b/integrations/supabase/gradle.properties similarity index 100% rename from connectors/supabase/gradle.properties rename to integrations/supabase/gradle.properties diff --git a/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt b/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt new file mode 100644 index 00000000..14728a6b --- /dev/null +++ b/integrations/supabase/src/commonIntegrationTest/kotlin/com/powersync/connector/supabase/SupabaseConnectorTest.kt @@ -0,0 +1,67 @@ +package com.powersync.connector.supabase + +import com.powersync.PowerSyncDatabase +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudTransaction +import com.powersync.db.schema.Column +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.equals.shouldBeEqual +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.test.runTest +import kotlin.test.Test + +class SupabaseConnectorTest { + @Test + fun errorHandling() = + runTest { + val db = + PowerSyncDatabase.inMemory( + scope = this, + schema = + Schema( + Table( + "users", + listOf( + Column.text("name"), + ), + ), + ), + ) + + try { + db.writeTransaction { tx -> + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("a")) + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("b")) + tx.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("c")) + } + + var calledErrorHandler = false + val connector = + object : SupabaseConnector("", "", "") { + override suspend fun uploadCrudEntry(entry: CrudEntry): Unit = + throw Exception("Expected exception, failing in uploadCrudEntry") + + override suspend fun handleError( + tx: CrudTransaction, + entry: CrudEntry, + exception: Exception, + errorCode: String?, + ) { + calledErrorHandler = true + + tx.crud shouldHaveSize 3 + entry shouldBeEqual tx.crud[0] + exception.message shouldBe "Expected exception, failing in uploadCrudEntry" + tx.complete(null) + } + } + + connector.uploadData(db) + calledErrorHandler shouldBe true + } finally { + db.close() + } + } +} diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt similarity index 66% rename from connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt rename to integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt index 12913cda..15ebbc3e 100644 --- a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt +++ b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseConnector.kt @@ -5,6 +5,7 @@ import com.powersync.PowerSyncDatabase import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudTransaction import com.powersync.db.crud.UpdateType import com.powersync.db.runWrapped import io.github.jan.supabase.SupabaseClient @@ -27,12 +28,13 @@ import io.ktor.utils.io.InternalAPI import kotlinx.coroutines.flow.StateFlow import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonPrimitive +import kotlin.toString /** * Get a Supabase token to authenticate against the PowerSync instance. */ @OptIn(SupabaseInternal::class, InternalAPI::class) -public class SupabaseConnector( +public open class SupabaseConnector( public val supabaseClient: SupabaseClient, public val powerSyncEndpoint: String, private val storageBucket: String? = null, @@ -40,7 +42,7 @@ public class SupabaseConnector( private val json = Json { coerceInputValues = true } private var errorCode: String? = null - private object PostgresFatalCodes { + public companion object PostgresFatalCodes { // Using Regex patterns for Postgres error codes private val FATAL_RESPONSE_CODES = listOf( @@ -52,7 +54,7 @@ public class SupabaseConnector( "^42501$".toRegex(), ) - fun isFatalError(code: String): Boolean = + public fun isFatalError(code: String): Boolean = FATAL_RESPONSE_CODES.any { pattern -> pattern.matches(code) } @@ -172,6 +174,78 @@ public class SupabaseConnector( ) } + /** + * Uses the PostgREST APIs to upload a given [entry] to the backend database. + * + * This method should report errors during the upload as an exception that would be caught by [uploadData]. + */ + public open suspend fun uploadCrudEntry(entry: CrudEntry) { + val table = supabaseClient.from(entry.table) + + when (entry.op) { + UpdateType.PUT -> { + val data = + buildMap { + put("id", JsonPrimitive(entry.id)) + entry.opData?.jsonValues?.let { putAll(it) } + } + table.upsert(data) + } + UpdateType.PATCH -> { + table.update(entry.opData!!.jsonValues) { + filter { + eq("id", entry.id) + } + } + } + UpdateType.DELETE -> { + table.delete { + filter { + eq("id", entry.id) + } + } + } + } + } + + /** + * Handles an error during the upload. This method can be overridden to log errors or customize error handling. + * + * By default, it discards the rest of a transaction when the error code indicates that this is a fatal postgres + * error that can't be retried. Otherwise, it rethrows the exception so that the PowerSync SDK will retry. + * + * @param tx The full [CrudTransaction] we're in the process of uploading. + * @param entry The [CrudEntry] for which an upload has failed. + * @param exception The [Exception] thrown by the Supabase client. + * @param [errorCode] The postgres error code, if any. + * @throws Exception If the upload should be retried. If this method doesn't throw, it should mark [tx] as complete + * by invoking [CrudTransaction.complete]. In that case, the local write would be lost. + */ + public open suspend fun handleError( + tx: CrudTransaction, + entry: CrudEntry, + exception: Exception, + errorCode: String?, + ) { + if (errorCode != null && isFatalError(errorCode)) { + /** + * Instead of blocking the queue with these errors, + * discard the (rest of the) transaction. + * + * Note that these errors typically indicate a bug in the application. + * If protecting against data loss is important, save the failing records + * elsewhere instead of discarding, and/or notify the user. + */ + Logger.e("Data upload error: ${exception.message}") + Logger.e("Discarding entry: $entry") + tx.complete(null) + return + } + + Logger.e("Data upload error - retrying last entry: $entry, $exception") + throw exception + } + /** * Upload local changes to the app backend (in this case Supabase). * @@ -186,54 +260,16 @@ public class SupabaseConnector( try { for (entry in transaction.crud) { lastEntry = entry - - val table = supabaseClient.from(entry.table) - - when (entry.op) { - UpdateType.PUT -> { - val data = - buildMap { - put("id", JsonPrimitive(entry.id)) - entry.opData?.jsonValues?.let { putAll(it) } - } - table.upsert(data) - } - UpdateType.PATCH -> { - table.update(entry.opData!!.jsonValues) { - filter { - eq("id", entry.id) - } - } - } - UpdateType.DELETE -> { - table.delete { - filter { - eq("id", entry.id) - } - } - } - } + uploadCrudEntry(entry) } transaction.complete(null) } catch (e: Exception) { - if (errorCode != null && PostgresFatalCodes.isFatalError(errorCode.toString())) { - /** - * Instead of blocking the queue with these errors, - * discard the (rest of the) transaction. - * - * Note that these errors typically indicate a bug in the application. - * If protecting against data loss is important, save the failing records - * elsewhere instead of discarding, and/or notify the user. - */ - Logger.e("Data upload error: ${e.message}") - Logger.e("Discarding entry: $lastEntry") - transaction.complete(null) - return@runWrapped + if (lastEntry != null) { + handleError(transaction, lastEntry, e, errorCode) + } else { + throw e } - - Logger.e("Data upload error - retrying last entry: $lastEntry, $e") - throw e } } } diff --git a/connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt b/integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt similarity index 100% rename from connectors/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt rename to integrations/supabase/src/commonMain/kotlin/com/powersync/connector/supabase/SupabaseRemoteStorage.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 77204629..3a0d5b12 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -32,12 +32,12 @@ include(":internal:PowerSyncKotlin") include(":core") include(":core-tests-android") -include(":connectors:supabase") include(":integrations:room") include(":static-sqlite-driver") include(":integrations:sqldelight") include(":integrations:sqldelight-test-database") +include(":integrations:supabase") include(":compose")