diff --git a/CHANGELOG.md b/CHANGELOG.md index 675358e1..dc37f86b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ # Changelog -## 1.2.3 (unreleased) +## 1.3.0 (unreleased) +* Support tables created outside of PowerSync with the `RawTable` API. + For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables). * Fix `runWrapped` catching cancellation exceptions. ## 1.2.2 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 4ef7d80a..ca744539 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -2,6 +2,7 @@ package com.powersync.sync import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException import com.powersync.TestConnector @@ -13,6 +14,9 @@ import com.powersync.bucket.OplogEntry import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.db.PowerSyncDatabaseImpl +import com.powersync.db.schema.PendingStatement +import com.powersync.db.schema.PendingStatementParameter +import com.powersync.db.schema.RawTable import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest @@ -650,7 +654,7 @@ abstract class BaseSyncIntegrationTest( class LegacySyncIntegrationTest : BaseSyncIntegrationTest(false) class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { - // The legacy sync implementation doesn't prefetch credentials. + // The legacy sync implementation doesn't prefetch credentials and doesn't support raw tables. @OptIn(LegacySyncImplementation::class) @Test @@ -688,4 +692,107 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { turbine.cancel() } } + + @Test + @OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) + fun rawTables() = + databaseTest(createInitialDatabase = false) { + val db = + openDatabase( + Schema( + listOf( + RawTable( + name = "lists", + put = + PendingStatement( + "INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)", + listOf(PendingStatementParameter.Id, PendingStatementParameter.Column("name")), + ), + delete = + PendingStatement( + "DELETE FROM lists WHERE id = ?", + listOf(PendingStatementParameter.Id), + ), + ), + ), + ), + ) + + db.execute("CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)") + turbineScope(timeout = 10.0.seconds) { + val query = + db + .watch("SELECT * FROM lists", throttleMs = 0L) { + it.getString(0) to it.getString(1) + }.testIn(this) + query.awaitItem() shouldBe emptyList() + + db.connect(connector, options = options) + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = + JsonUtil.json.encodeToString( + mapOf( + "name" to "custom list", + ), + ), + op = OpType.PUT, + opId = "1", + rowId = "my_list", + rowType = "lists", + ), + ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe listOf("my_list" to "custom list") + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0L, + data = null, + op = OpType.REMOVE, + opId = "2", + rowId = "my_list", + rowType = "lists", + ), + ), + after = null, + nextAfter = null, + ), + ) + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe emptyList() + query.cancelAndIgnoreRemainingEvents() + } + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index b2d980da..adbf2b37 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -114,12 +114,12 @@ internal class ActiveDatabaseTest( everySuspend { invalidateCredentials() } returns Unit } - fun openDatabase(): PowerSyncDatabaseImpl { + fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } val db = createPowerSyncDatabaseImpl( factory = factory, - schema = Schema(UserRow.table), + schema = schema, dbFilename = databaseName, dbDirectory = testDirectory, logger = logger, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index c67056bf..6d804cd2 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -2,6 +2,7 @@ package com.powersync.bucket import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction +import com.powersync.db.schema.SerializableSchema import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch @@ -59,6 +60,7 @@ internal sealed interface PowerSyncControlArguments { @Serializable class Start( val parameters: JsonObject, + val schema: SerializableSchema, ) : PowerSyncControlArguments data object Stop : PowerSyncControlArguments diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 3a868940..51880ee1 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -169,6 +169,7 @@ internal class PowerSyncDatabaseImpl( uploadScope = scope, createClient = createClient, options = options, + schema = schema, ) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index bcbc7e57..af47b95f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -12,7 +12,6 @@ import com.powersync.utils.JsonUtil import com.powersync.utils.throttle import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow @@ -25,7 +24,6 @@ import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlin.time.Duration.Companion.milliseconds -@OptIn(FlowPreview::class) internal class InternalDatabaseImpl( private val factory: DatabaseDriverFactory, private val scope: CoroutineScope, diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt new file mode 100644 index 00000000..ff9118d7 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/BaseTable.kt @@ -0,0 +1,13 @@ +package com.powersync.db.schema + +public sealed interface BaseTable { + /** + * The name of the table. + */ + public val name: String + + /** + * Check that there are no issues in the table definition. + */ + public fun validate() +} diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt new file mode 100644 index 00000000..995402b3 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/RawTable.kt @@ -0,0 +1,94 @@ +package com.powersync.db.schema + +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.sync.SyncOptions +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.put + +/** + * A table that is managed by the user instead of being auto-created and migrated by the PowerSync + * SDK. + * + * These tables give application developers full control over the table (including table and + * column constraints). The [put] and [delete] statements the sync client uses to apply operations + * to the local database also need to be set explicitly. + * + * A main benefit of raw tables is that, since they're not backed by JSON views, complex queries on + * them can be much more efficient. + * However, it's the responsibility of the developer to create these raw tables, migrate them when necessary and to + * write triggers detecting local writes. For more information, see + * [the documentation page](https://docs.powersync.com/usage/use-case-examples/raw-tables). + * + * Note that raw tables are only supported when [SyncOptions.newClientImplementation] is enabled. + */ +@ExperimentalPowerSyncAPI +public class RawTable( + override val name: String, + public val put: PendingStatement, + /** + * The statement to run when the sync client wants to delete a row. + */ + public val delete: PendingStatement, +) : BaseTable { + override fun validate() { + // We don't currently have any validation for raw tables + } + + internal fun serialize(): JsonElement = + buildJsonObject { + put("name", name) + put("put", put.serialize()) + put("delete", delete.serialize()) + } +} + +@ExperimentalPowerSyncAPI +public class PendingStatement( + public val sql: String, + public val parameters: List, +) { + internal fun serialize(): JsonElement = + buildJsonObject { + put("sql", sql) + put( + "params", + buildJsonArray { + for (param in parameters) { + add( + when (param) { + is PendingStatementParameter.Column -> + buildJsonObject { + put("Column", param.name) + } + PendingStatementParameter.Id -> JsonPrimitive("Id") + }, + ) + } + }, + ) + } +} + +/** + * A parameter that can be used in a [PendingStatement]. + */ +@ExperimentalPowerSyncAPI +public sealed interface PendingStatementParameter { + /** + * Resolves to the id of the affected row. + */ + public object Id : PendingStatementParameter + + /** + * Resolves to the value of a column in the added row. + * + * This is only available for [RawTable.put] - in [RawTable.delete] statements, only the [Id] + * can be used as a value. + */ + public class Column( + public val name: String, + ) : PendingStatementParameter +} diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt index 61c92b39..72f592eb 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Schema.kt @@ -1,6 +1,9 @@ package com.powersync.db.schema +import com.powersync.ExperimentalPowerSyncAPI +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement /** * The schema used by the database. @@ -8,17 +11,35 @@ import kotlinx.serialization.Serializable * The implementation uses the schema as a "VIEW" on top of JSON data. * No migrations are required on the client. */ -public data class Schema( +@ConsistentCopyVisibility +@OptIn(ExperimentalPowerSyncAPI::class) +public data class Schema internal constructor( val tables: List, + val rawTables: List, ) { + public constructor(tables: List) : this( + tables.filterIsInstance
(), + tables.filterIsInstance(), + ) + init { validate() } + internal val allTables: Sequence get() = + sequence { + yieldAll(tables) + yieldAll(rawTables) + } + + // Kept for binary compatibility, the new constructor taking a BaseTable vararg will be used when recompiling. + @Deprecated(level = DeprecationLevel.HIDDEN, message = "Use constructor taking BaseTable args instead") + public constructor(vararg tables: Table) : this(tables.asList()) + /** * Secondary constructor to create a schema with a variable number of tables. */ - public constructor(vararg tables: Table) : this(tables.asList()) + public constructor(vararg tables: BaseTable) : this(tables.asList()) /** * Validates the schema by ensuring there are no duplicate table names @@ -28,7 +49,7 @@ public data class Schema( */ public fun validate() { val tableNames = mutableSetOf() - tables.forEach { table -> + allTables.forEach { table -> if (!tableNames.add(table.name)) { throw AssertionError("Duplicate table name: ${table.name}") } @@ -56,9 +77,15 @@ public data class Schema( @Serializable internal data class SerializableSchema( val tables: List, + @SerialName("raw_tables") + val rawTables: List, ) +@OptIn(ExperimentalPowerSyncAPI::class) internal fun Schema.toSerializable(): SerializableSchema = with(this) { - SerializableSchema(tables.map { it.toSerializable() }) + SerializableSchema( + tables = tables.map { it.toSerializable() }, + rawTables = rawTables.map { it.serialize() }, + ) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt index 8e321adf..d8e5a03b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/schema/Table.kt @@ -16,7 +16,7 @@ public data class Table( /** * The synced table name, matching sync rules. */ - var name: String, + override var name: String, /** * List of columns. */ @@ -53,7 +53,7 @@ public data class Table( * CRUD entries. */ val ignoreEmptyUpdates: Boolean = false, -) { +) : BaseTable { init { /** * Need to set the column definition for each index column. @@ -141,10 +141,7 @@ public data class Table( ) ) - /** - * Check that there are no issues in the table definition. - */ - public fun validate() { + public override fun validate() { if (columns.size > MAX_AMOUNT_OF_COLUMNS) { throw AssertionError("Table $name has more than $MAX_AMOUNT_OF_COLUMNS columns, which is not supported") } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 7ba8967d..80b8d7ad 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -12,6 +12,8 @@ import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry +import com.powersync.db.schema.Schema +import com.powersync.db.schema.toSerializable import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig @@ -62,6 +64,7 @@ internal class SyncStream( private val params: JsonObject, private val uploadScope: CoroutineScope, private val options: SyncOptions, + private val schema: Schema, createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient, ) { private var isUploadingCrud = AtomicReference(null) @@ -310,7 +313,12 @@ internal class SyncStream( } suspend fun start() { - invokeControl(PowerSyncControlArguments.Start(params)) + invokeControl( + PowerSyncControlArguments.Start( + parameters = params, + schema = schema.toSerializable(), + ), + ) var hadSyncLine = false for (line in controlInvocations) { @@ -433,6 +441,10 @@ internal class SyncStream( val scope: CoroutineScope, ) { suspend fun streamingSyncIteration() { + check(schema.rawTables.isEmpty()) { + "Raw tables are only supported by the Rust sync client." + } + val bucketEntries = bucketStorage.getBucketStates() val initialBuckets = mutableMapOf() diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 41cee90a..e9dedfe0 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -11,6 +11,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType +import com.powersync.db.schema.Schema import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock @@ -82,6 +83,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema(), ) syncStream.invalidateCredentials() @@ -120,6 +122,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema(), ) syncStream.status.update { copy(connected = true) } @@ -160,6 +163,7 @@ class SyncStreamTest { params = JsonObject(emptyMap()), uploadScope = this, options = SyncOptions(), + schema = Schema(), ) // Launch streaming sync in a coroutine that we'll cancel after verification