From bc6cc85e2be95c6fed995645b29d14758ac65434 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 5 Jul 2025 20:35:10 -0600 Subject: [PATCH 1/3] Support raw tables --- CHANGELOG.md | 3 +- .../com/powersync/sync/SyncIntegrationTest.kt | 90 ++++++++++++++++++- .../com/powersync/testutils/TestUtils.kt | 4 +- .../com/powersync/bucket/BucketStorage.kt | 2 + .../com/powersync/db/PowerSyncDatabaseImpl.kt | 1 + .../db/internal/InternalDatabaseImpl.kt | 1 - .../com/powersync/db/schema/BaseTable.kt | 13 +++ .../com/powersync/db/schema/RawTable.kt | 88 ++++++++++++++++++ .../kotlin/com/powersync/db/schema/Schema.kt | 27 +++++- .../kotlin/com/powersync/db/schema/Table.kt | 9 +- .../kotlin/com/powersync/sync/SyncStream.kt | 9 +- .../com/powersync/sync/SyncStreamTest.kt | 4 + 12 files changed, 236 insertions(+), 15 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 675358e1..63ca89af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ # Changelog -## 1.2.3 (unreleased) +## 1.3.0 (unreleased) +* Support tables created outside of PowerSync with the `RawTable` API. * Fix `runWrapped` catching cancellation exceptions. ## 1.2.2 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 4ef7d80a..ad9f8f92 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -2,6 +2,7 @@ package com.powersync.sync import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException import com.powersync.TestConnector @@ -13,6 +14,9 @@ import com.powersync.bucket.OplogEntry import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.db.PowerSyncDatabaseImpl +import com.powersync.db.schema.PendingStatement +import com.powersync.db.schema.PendingStatementParameter +import com.powersync.db.schema.RawTable import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest @@ -650,7 +654,7 @@ abstract class BaseSyncIntegrationTest( class LegacySyncIntegrationTest : BaseSyncIntegrationTest(false) class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { - // The legacy sync implementation doesn't prefetch credentials. + // The legacy sync implementation doesn't prefetch credentials and doesn't support raw tables. @OptIn(LegacySyncImplementation::class) @Test @@ -688,4 +692,88 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { turbine.cancel() } } + + @Test + @OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) + fun rawTables() = databaseTest(createInitialDatabase = false) { + val db = openDatabase(Schema(listOf( + RawTable( + name = "lists", + put = PendingStatement( + "INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)", + listOf(PendingStatementParameter.Id, PendingStatementParameter.Column("name")) + ), + delete = PendingStatement( + "DELETE FROM lists WHERE id = ?", listOf(PendingStatementParameter.Id) + ) + ) + ))) + + db.execute("CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)") + turbineScope(timeout = 10.0.seconds) { + val query = db.watch("SELECT * FROM lists", throttleMs = 0L) { + it.getString(0) to it.getString(1) + }.testIn(this) + query.awaitItem() shouldBe emptyList() + + db.connect(connector, options = options) + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ))) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = + JsonUtil.json.encodeToString( + mapOf( + "name" to "custom list", + ), + ), + op = OpType.PUT, + opId = "1", + rowId = "my_list", + rowType = "lists", + ), + ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe listOf("my_list" to "custom list") + + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ))) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = null, + op = OpType.REMOVE, + opId = "2", + rowId = "my_list", + rowType = "lists", + ), + ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe emptyList() + query.cancelAndIgnoreRemainingEvents() + } + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index b2d980da..adbf2b37 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -114,12 +114,12 @@ internal class ActiveDatabaseTest( everySuspend { invalidateCredentials() } returns Unit } - fun openDatabase(): PowerSyncDatabaseImpl { + fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } val db = createPowerSyncDatabaseImpl( factory = factory, - schema = Schema(UserRow.table), + schema = schema, dbFilename = databaseName, dbDirectory = testDirectory, logger = logger, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index c67056bf..d838bde6 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -2,6 +2,7 @@ package com.powersync.bucket import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction +import com.powersync.db.schema.SerializableSchema import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch @@ -59,6 +60,7 @@ internal sealed interface PowerSyncControlArguments { @Serializable class Start( val parameters: JsonObject, + val schema: SerializableSchema ) : PowerSyncControlArguments data object Stop : PowerSyncControlArguments diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 3a868940..51880ee1 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -169,6 +169,7 @@ internal class PowerSyncDatabaseImpl( uploadScope = scope, createClient = createClient, options = options, + schema = schema, ) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index bcbc7e57..f5ed675e 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -25,7 +25,6 @@ import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlin.time.Duration.Companion.milliseconds -@OptIn(FlowPreview::class) internal class InternalDatabaseImpl( private val factory: DatabaseDriverFactory, private val scope: CoroutineScope, diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt new file mode 100644 index 00000000..ff9118d7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt @@ -0,0 +1,13 @@ +package com.powersync.db.schema + +public sealed interface BaseTable { + /** + * The name of the table. + */ + public val name: String + + /** + * Check that there are no issues in the table definition. + */ + public fun validate() +} diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt new file mode 100644 index 00000000..3e218da5 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt @@ -0,0 +1,88 @@ +package com.powersync.db.schema + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.sync.SyncOptions +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonObjectBuilder +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.put + +/** + * A table that is managed by the user instead of being auto-created and migrated by the PowerSync + * SDK. + * + * These tables give application developers full control over the table (including table and + * column constraints). The [put] and [delete] statements the sync client uses to apply operations + * to the local database also need to be set explicitly. + * + * A main benefit of raw tables is that, since they're not backed by JSON views, complex queries on + * them can be much more efficient. + * + * Note that raw tables are only supported when [SyncOptions.newClientImplementation] is enabled. + */ +@ExperimentalPowerSyncAPI +public class RawTable( + override val name: String, + public val put: PendingStatement, + /** + * The statement to run when the sync client wants to delete a row. + */ + public val delete: PendingStatement, +): BaseTable { + override fun validate() { + // We don't currently have any validation for raw tables + } + + internal fun serialize(): JsonElement { + return buildJsonObject { + put("name", name) + put("put", put.serialize()) + put("delete", delete.serialize()) + } + } +} + +@ExperimentalPowerSyncAPI +public class PendingStatement( + public val sql: String, + public val parameters: List, +) { + internal fun serialize(): JsonElement { + return buildJsonObject { + put("sql", sql) + put("params", buildJsonArray { + for (param in parameters) { + add(when(param) { + is PendingStatementParameter.Column -> buildJsonObject { + put("Column", param.name) + } + PendingStatementParameter.Id -> JsonPrimitive("Id") + }) + } + }) + } + } +} + +/** + * A parameter that can be used in a [PendingStatement]. + */ +@ExperimentalPowerSyncAPI +public sealed interface PendingStatementParameter { + /** + * Resolves to the id of the affected row. + */ + public object Id: PendingStatementParameter + + /** + * Resolves to the value of a column in the added row. + * + * This is only available for [RawTable.put] - in [RawTable.delete] statements, only the [Id] + * can be used as a value. + */ + public class Column(public val name: String): PendingStatementParameter +} + diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt index 61c92b39..a99e8c95 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt @@ -1,6 +1,9 @@ package com.powersync.db.schema +import com.powersync.ExperimentalPowerSyncAPI +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement /** * The schema used by the database. @@ -8,13 +11,25 @@ import kotlinx.serialization.Serializable * The implementation uses the schema as a "VIEW" on top of JSON data. * No migrations are required on the client. */ -public data class Schema( +@OptIn(ExperimentalPowerSyncAPI::class) +public data class Schema internal constructor( val tables: List, + val rawTables: List, ) { + public constructor(tables: List): this( + tables.filterIsInstance
(), + tables.filterIsInstance() + ) + init { validate() } + internal val allTables: Sequence get() = sequence { + yieldAll(tables) + yieldAll(rawTables) + } + /** * Secondary constructor to create a schema with a variable number of tables. */ @@ -28,7 +43,7 @@ public data class Schema( */ public fun validate() { val tableNames = mutableSetOf() - tables.forEach { table -> + allTables.forEach { table -> if (!tableNames.add(table.name)) { throw AssertionError("Duplicate table name: ${table.name}") } @@ -56,9 +71,15 @@ public data class Schema( @Serializable internal data class SerializableSchema( val tables: List, + @SerialName("raw_tables") + val rawTables: List, ) +@OptIn(ExperimentalPowerSyncAPI::class) internal fun Schema.toSerializable(): SerializableSchema = with(this) { - SerializableSchema(tables.map { it.toSerializable() }) + SerializableSchema( + tables = tables.map { it.toSerializable() }, + rawTables = rawTables.map { it.serialize() } + ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt index 8e321adf..92d7f7bc 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt @@ -16,7 +16,7 @@ public data class Table( /** * The synced table name, matching sync rules. */ - var name: String, + override var name: String, /** * List of columns. */ @@ -53,7 +53,7 @@ public data class Table( * CRUD entries. */ val ignoreEmptyUpdates: Boolean = false, -) { +): BaseTable { init { /** * Need to set the column definition for each index column. @@ -141,10 +141,7 @@ public data class Table( ) ) - /** - * Check that there are no issues in the table definition. - */ - public fun validate() { + public override fun validate() { if (columns.size > MAX_AMOUNT_OF_COLUMNS) { throw AssertionError("Table $name has more than $MAX_AMOUNT_OF_COLUMNS columns, which is not supported") } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 7ba8967d..cf4c2c25 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -12,6 +12,9 @@ import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry +import com.powersync.db.schema.Schema +import com.powersync.db.schema.SerializableSchema +import com.powersync.db.schema.toSerializable import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig @@ -62,6 +65,7 @@ internal class SyncStream( private val params: JsonObject, private val uploadScope: CoroutineScope, private val options: SyncOptions, + private val schema: Schema, createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient, ) { private var isUploadingCrud = AtomicReference(null) @@ -310,7 +314,10 @@ internal class SyncStream( } suspend fun start() { - invokeControl(PowerSyncControlArguments.Start(params)) + invokeControl(PowerSyncControlArguments.Start( + parameters = params, + schema = schema.toSerializable(), + )) var hadSyncLine = false for (line in controlInvocations) { diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 41cee90a..b111cd40 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -11,6 +11,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType +import com.powersync.db.schema.Schema import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock @@ -82,6 +83,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema(), ) syncStream.invalidateCredentials() @@ -120,6 +122,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema() ) syncStream.status.update { copy(connected = true) } @@ -160,6 +163,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema() ) // Launch streaming sync in a coroutine that we'll cancel after verification From b27f52afa3787c4078ece5f5bfa488d2564841f5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 11 Jul 2025 16:48:35 +0200 Subject: [PATCH 2/3] Reformat --- .../com/powersync/sync/SyncIntegrationTest.kt | 163 ++++++++++-------- .../com/powersync/bucket/BucketStorage.kt | 2 +- .../db/internal/InternalDatabaseImpl.kt | 1 - .../com/powersync/db/schema/RawTable.kt | 47 ++--- .../kotlin/com/powersync/db/schema/Schema.kt | 15 +- .../kotlin/com/powersync/db/schema/Table.kt | 2 +- .../kotlin/com/powersync/sync/SyncStream.kt | 11 +- .../com/powersync/sync/SyncStreamTest.kt | 4 +- 8 files changed, 134 insertions(+), 111 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index ad9f8f92..ca744539 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -695,85 +695,104 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { @Test @OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) - fun rawTables() = databaseTest(createInitialDatabase = false) { - val db = openDatabase(Schema(listOf( - RawTable( - name = "lists", - put = PendingStatement( - "INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)", - listOf(PendingStatementParameter.Id, PendingStatementParameter.Column("name")) - ), - delete = PendingStatement( - "DELETE FROM lists WHERE id = ?", listOf(PendingStatementParameter.Id) - ) - ) - ))) - - db.execute("CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)") - turbineScope(timeout = 10.0.seconds) { - val query = db.watch("SELECT * FROM lists", throttleMs = 0L) { - it.getString(0) to it.getString(1) - }.testIn(this) - query.awaitItem() shouldBe emptyList() - - db.connect(connector, options = options) - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId = "1", - checksums = listOf(BucketChecksum("a", checksum = 0)), - ))) - syncLines.send( - SyncLine.SyncDataBucket( - bucket = "a", - data = + fun rawTables() = + databaseTest(createInitialDatabase = false) { + val db = + openDatabase( + Schema( listOf( - OplogEntry( - checksum = 0L, - data = - JsonUtil.json.encodeToString( - mapOf( - "name" to "custom list", - ), + RawTable( + name = "lists", + put = + PendingStatement( + "INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)", + listOf(PendingStatementParameter.Id, PendingStatementParameter.Column("name")), + ), + delete = + PendingStatement( + "DELETE FROM lists WHERE id = ?", + listOf(PendingStatementParameter.Id), ), - op = OpType.PUT, - opId = "1", - rowId = "my_list", - rowType = "lists", ), ), - after = null, - nextAfter = null, - ), - ) - syncLines.send(SyncLine.CheckpointComplete("1")) - - query.awaitItem() shouldBe listOf("my_list" to "custom list") + ), + ) - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - lastOpId = "2", - checksums = listOf(BucketChecksum("a", checksum = 0)), - ))) - syncLines.send( - SyncLine.SyncDataBucket( - bucket = "a", - data = - listOf( - OplogEntry( - checksum = 0L, - data = null, - op = OpType.REMOVE, - opId = "2", - rowId = "my_list", - rowType = "lists", + db.execute("CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)") + turbineScope(timeout = 10.0.seconds) { + val query = + db + .watch("SELECT * FROM lists", throttleMs = 0L) { + it.getString(0) to it.getString(1) + }.testIn(this) + query.awaitItem() shouldBe emptyList() + + db.connect(connector, options = options) + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = + JsonUtil.json.encodeToString( + mapOf( + "name" to "custom list", + ), + ), + op = OpType.PUT, + opId = "1", + rowId = "my_list", + rowType = "lists", + ), ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe listOf("my_list" to "custom list") + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)), ), - after = null, - nextAfter = null, - ), - ) - syncLines.send(SyncLine.CheckpointComplete("1")) + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = null, + op = OpType.REMOVE, + opId = "2", + rowId = "my_list", + rowType = "lists", + ), + ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) - query.awaitItem() shouldBe emptyList() - query.cancelAndIgnoreRemainingEvents() + query.awaitItem() shouldBe emptyList() + query.cancelAndIgnoreRemainingEvents() + } } - } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index d838bde6..6d804cd2 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -60,7 +60,7 @@ internal sealed interface PowerSyncControlArguments { @Serializable class Start( val parameters: JsonObject, - val schema: SerializableSchema + val schema: SerializableSchema, ) : PowerSyncControlArguments data object Stop : PowerSyncControlArguments diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index f5ed675e..af47b95f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -12,7 +12,6 @@ import com.powersync.utils.JsonUtil import com.powersync.utils.throttle import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt index 3e218da5..387d7375 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt @@ -2,9 +2,7 @@ package com.powersync.db.schema import com.powersync.ExperimentalPowerSyncAPI import com.powersync.sync.SyncOptions -import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement -import kotlinx.serialization.json.JsonObjectBuilder import kotlinx.serialization.json.JsonPrimitive import kotlinx.serialization.json.buildJsonArray import kotlinx.serialization.json.buildJsonObject @@ -31,18 +29,17 @@ public class RawTable( * The statement to run when the sync client wants to delete a row. */ public val delete: PendingStatement, -): BaseTable { +) : BaseTable { override fun validate() { // We don't currently have any validation for raw tables } - internal fun serialize(): JsonElement { - return buildJsonObject { + internal fun serialize(): JsonElement = + buildJsonObject { put("name", name) put("put", put.serialize()) put("delete", delete.serialize()) } - } } @ExperimentalPowerSyncAPI @@ -50,21 +47,26 @@ public class PendingStatement( public val sql: String, public val parameters: List, ) { - internal fun serialize(): JsonElement { - return buildJsonObject { + internal fun serialize(): JsonElement = + buildJsonObject { put("sql", sql) - put("params", buildJsonArray { - for (param in parameters) { - add(when(param) { - is PendingStatementParameter.Column -> buildJsonObject { - put("Column", param.name) - } - PendingStatementParameter.Id -> JsonPrimitive("Id") - }) - } - }) + put( + "params", + buildJsonArray { + for (param in parameters) { + add( + when (param) { + is PendingStatementParameter.Column -> + buildJsonObject { + put("Column", param.name) + } + PendingStatementParameter.Id -> JsonPrimitive("Id") + }, + ) + } + }, + ) } - } } /** @@ -75,7 +77,7 @@ public sealed interface PendingStatementParameter { /** * Resolves to the id of the affected row. */ - public object Id: PendingStatementParameter + public object Id : PendingStatementParameter /** * Resolves to the value of a column in the added row. @@ -83,6 +85,7 @@ public sealed interface PendingStatementParameter { * This is only available for [RawTable.put] - in [RawTable.delete] statements, only the [Id] * can be used as a value. */ - public class Column(public val name: String): PendingStatementParameter + public class Column( + public val name: String, + ) : PendingStatementParameter } - diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt index a99e8c95..8b1c5f37 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt @@ -16,19 +16,20 @@ public data class Schema internal constructor( val tables: List
, val rawTables: List, ) { - public constructor(tables: List): this( + public constructor(tables: List) : this( tables.filterIsInstance
(), - tables.filterIsInstance() + tables.filterIsInstance(), ) init { validate() } - internal val allTables: Sequence get() = sequence { - yieldAll(tables) - yieldAll(rawTables) - } + internal val allTables: Sequence get() = + sequence { + yieldAll(tables) + yieldAll(rawTables) + } /** * Secondary constructor to create a schema with a variable number of tables. @@ -80,6 +81,6 @@ internal fun Schema.toSerializable(): SerializableSchema = with(this) { SerializableSchema( tables = tables.map { it.toSerializable() }, - rawTables = rawTables.map { it.serialize() } + rawTables = rawTables.map { it.serialize() }, ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt index 92d7f7bc..d8e5a03b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt @@ -53,7 +53,7 @@ public data class Table( * CRUD entries. */ val ignoreEmptyUpdates: Boolean = false, -): BaseTable { +) : BaseTable { init { /** * Need to set the column definition for each index column. diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index cf4c2c25..0143ab5d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -13,7 +13,6 @@ import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.db.schema.Schema -import com.powersync.db.schema.SerializableSchema import com.powersync.db.schema.toSerializable import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient @@ -314,10 +313,12 @@ internal class SyncStream( } suspend fun start() { - invokeControl(PowerSyncControlArguments.Start( - parameters = params, - schema = schema.toSerializable(), - )) + invokeControl( + PowerSyncControlArguments.Start( + parameters = params, + schema = schema.toSerializable(), + ), + ) var hadSyncLine = false for (line in controlInvocations) { diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index b111cd40..e9dedfe0 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -122,7 +122,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), - schema = Schema() + schema = Schema(), ) syncStream.status.update { copy(connected = true) } @@ -163,7 +163,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), - schema = Schema() + schema = Schema(), ) // Launch streaming sync in a coroutine that we'll cancel after verification From fb816c2b11453af895d817a18013e8def53b29de Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 15 Jul 2025 12:06:51 +0200 Subject: [PATCH 3/3] Docs update --- CHANGELOG.md | 1 + .../commonMain/kotlin/com/powersync/db/schema/RawTable.kt | 3 +++ .../commonMain/kotlin/com/powersync/db/schema/Schema.kt | 7 ++++++- .../src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 4 ++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63ca89af..dc37f86b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.3.0 (unreleased) * Support tables created outside of PowerSync with the `RawTable` API. + For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables). * Fix `runWrapped` catching cancellation exceptions. ## 1.2.2 diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt index 387d7375..995402b3 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt @@ -18,6 +18,9 @@ import kotlinx.serialization.json.put * * A main benefit of raw tables is that, since they're not backed by JSON views, complex queries on * them can be much more efficient. + * However, it's the responsibility of the developer to create these raw tables, migrate them when necessary and to + * write triggers detecting local writes. For more information, see + * [the documentation page](https://docs.powersync.com/usage/use-case-examples/raw-tables). * * Note that raw tables are only supported when [SyncOptions.newClientImplementation] is enabled. */ diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt index 8b1c5f37..72f592eb 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt @@ -11,6 +11,7 @@ import kotlinx.serialization.json.JsonElement * The implementation uses the schema as a "VIEW" on top of JSON data. * No migrations are required on the client. */ +@ConsistentCopyVisibility @OptIn(ExperimentalPowerSyncAPI::class) public data class Schema internal constructor( val tables: List
, @@ -31,10 +32,14 @@ public data class Schema internal constructor( yieldAll(rawTables) } + // Kept for binary compatibility, the new constructor taking a BaseTable vararg will be used when recompiling. + @Deprecated(level = DeprecationLevel.HIDDEN, message = "Use constructor taking BaseTable args instead") + public constructor(vararg tables: Table) : this(tables.asList()) + /** * Secondary constructor to create a schema with a variable number of tables. */ - public constructor(vararg tables: Table) : this(tables.asList()) + public constructor(vararg tables: BaseTable) : this(tables.asList()) /** * Validates the schema by ensuring there are no duplicate table names diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 0143ab5d..80b8d7ad 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -441,6 +441,10 @@ internal class SyncStream( val scope: CoroutineScope, ) { suspend fun streamingSyncIteration() { + check(schema.rawTables.isEmpty()) { + "Raw tables are only supported by the Rust sync client." + } + val bucketEntries = bucketStorage.getBucketStates() val initialBuckets = mutableMapOf()