Skip to content

Commit d1dd055

Browse files
authored
Update PowerSync core extension (#259)
1 parent 7650661 commit d1dd055

File tree

10 files changed

+100
-56
lines changed

10 files changed

+100
-56
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ jobs:
4646
if: runner.os == 'macOS'
4747
uses: maxim-lobanov/setup-xcode@v1
4848
with:
49-
xcode-version: latest-stable
49+
# TODO: Update to latest-stable once GH installs iOS 26 simulators
50+
xcode-version: '^16.4.0'
5051

5152
- name: Build and run tests with Gradle
5253
run: |

core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import com.powersync.attachments.createAttachmentsTable
1212
import com.powersync.db.getString
1313
import com.powersync.db.schema.Schema
1414
import com.powersync.db.schema.Table
15+
import com.powersync.testutils.ActiveDatabaseTest
1516
import com.powersync.testutils.MockedRemoteStorage
1617
import com.powersync.testutils.UserRow
1718
import com.powersync.testutils.databaseTest
@@ -27,7 +28,9 @@ import dev.mokkery.spy
2728
import dev.mokkery.verifySuspend
2829
import io.kotest.matchers.shouldBe
2930
import io.kotest.matchers.shouldNotBe
31+
import kotlinx.coroutines.flow.Flow
3032
import kotlinx.coroutines.flow.flowOf
33+
import kotlinx.coroutines.flow.onEach
3134
import kotlinx.io.files.Path
3235
import kotlin.test.Test
3336
import kotlin.time.Duration.Companion.seconds
@@ -53,6 +56,12 @@ class AttachmentsTest {
5356
)
5457
}
5558

59+
private fun ActiveDatabaseTest.watchAttachmentsTable(): Flow<List<Attachment>> =
60+
database
61+
.watch("SELECT * FROM attachments") {
62+
Attachment.fromCursor(it)
63+
}.onEach { logger.i { "attachments table results: $it" } }
64+
5665
suspend fun updateSchema(db: PowerSyncDatabase) {
5766
db.updateSchema(
5867
Schema(
@@ -76,11 +85,7 @@ class AttachmentsTest {
7685
val remote = spy<RemoteStorage>(MockedRemoteStorage())
7786

7887
// Monitor the attachments table for testing
79-
val attachmentQuery =
80-
database
81-
// language=SQL
82-
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
83-
.testIn(this)
88+
val attachmentQuery = watchAttachmentsTable().testIn(this)
8489

8590
val queue =
8691
AttachmentQueue(
@@ -93,6 +98,7 @@ class AttachmentsTest {
9398
* immediately be deleted.
9499
*/
95100
archivedCacheLimit = 0,
101+
logger = logger,
96102
)
97103

98104
doOnCleanup {
@@ -175,7 +181,7 @@ class AttachmentsTest {
175181
val exists = queue.localStorage.fileExists(localUri)
176182
exists shouldBe false
177183

178-
attachmentQuery.cancel()
184+
attachmentQuery.cancelAndIgnoreRemainingEvents()
179185
}
180186
}
181187

@@ -187,11 +193,7 @@ class AttachmentsTest {
187193
val remote = spy<RemoteStorage>(MockedRemoteStorage())
188194

189195
// Monitor the attachments table for testing
190-
val attachmentQuery =
191-
database
192-
// language=SQL
193-
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
194-
.testIn(this)
196+
val attachmentQuery = watchAttachmentsTable().testIn(this)
195197

196198
val queue =
197199
AttachmentQueue(
@@ -204,6 +206,7 @@ class AttachmentsTest {
204206
* immediately be deleted.
205207
*/
206208
archivedCacheLimit = 0,
209+
logger = logger,
207210
)
208211

209212
doOnCleanup {
@@ -284,7 +287,7 @@ class AttachmentsTest {
284287
// The file should have been deleted from storage
285288
queue.localStorage.fileExists(localUri) shouldBe false
286289

287-
attachmentQuery.cancel()
290+
attachmentQuery.cancelAndIgnoreRemainingEvents()
288291
}
289292
}
290293

@@ -296,11 +299,7 @@ class AttachmentsTest {
296299
val remote = spy<RemoteStorage>(MockedRemoteStorage())
297300

298301
// Monitor the attachments table for testing
299-
val attachmentQuery =
300-
database
301-
// language=SQL
302-
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
303-
.testIn(this)
302+
val attachmentQuery = watchAttachmentsTable().testIn(this)
304303

305304
val queue =
306305
AttachmentQueue(
@@ -314,6 +313,7 @@ class AttachmentsTest {
314313
*/
315314
archivedCacheLimit = 0,
316315
syncThrottleDuration = 0.seconds,
316+
logger = logger,
317317
)
318318

319319
doOnCleanup {
@@ -382,7 +382,7 @@ class AttachmentsTest {
382382
)
383383
}
384384

385-
attachmentQuery.cancel()
385+
attachmentQuery.cancelAndIgnoreRemainingEvents()
386386
}
387387
}
388388

@@ -395,11 +395,7 @@ class AttachmentsTest {
395395
val remote = spy<RemoteStorage>(MockedRemoteStorage())
396396

397397
// Monitor the attachments table for testing
398-
val attachmentQuery =
399-
database
400-
// language=SQL
401-
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
402-
.testIn(this)
398+
val attachmentQuery = watchAttachmentsTable().testIn(this)
403399

404400
val queue =
405401
AttachmentQueue(
@@ -411,6 +407,7 @@ class AttachmentsTest {
411407
* Keep some items in the cache
412408
*/
413409
archivedCacheLimit = 10,
410+
logger = logger,
414411
)
415412

416413
doOnCleanup {
@@ -493,7 +490,7 @@ class AttachmentsTest {
493490
attachmentRecord = attachmentQuery.awaitItem().first()
494491
attachmentRecord.state shouldBe AttachmentState.SYNCED
495492

496-
attachmentQuery.cancel()
493+
attachmentQuery.cancelAndIgnoreRemainingEvents()
497494
}
498495
}
499496

@@ -509,10 +506,7 @@ class AttachmentsTest {
509506
}
510507

511508
// Monitor the attachments table for testing
512-
val attachmentQuery =
513-
database
514-
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
515-
.testIn(this)
509+
val attachmentQuery = watchAttachmentsTable().testIn(this)
516510

517511
val queue =
518512
AttachmentQueue(
@@ -537,6 +531,7 @@ class AttachmentsTest {
537531
exception: Exception,
538532
): Boolean = false
539533
},
534+
logger = logger,
540535
)
541536
doOnCleanup {
542537
queue.stopSyncing()
@@ -574,7 +569,7 @@ class AttachmentsTest {
574569

575570
attachmentRecord.state shouldBe AttachmentState.ARCHIVED
576571

577-
attachmentQuery.cancel()
572+
attachmentQuery.cancelAndIgnoreRemainingEvents()
578573
}
579574
}
580575
}

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,4 +913,19 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
913913
query.cancelAndIgnoreRemainingEvents()
914914
}
915915
}
916+
917+
@Test
918+
fun `ends iteration on http close`() =
919+
databaseTest {
920+
turbineScope(timeout = 10.0.seconds) {
921+
val turbine = database.currentStatus.asFlow().testIn(this)
922+
database.connect(TestConnector(), options = getOptions())
923+
turbine.waitFor { it.connected }
924+
925+
syncLines.close()
926+
turbine.waitFor { !it.connected }
927+
928+
turbine.cancelAndIgnoreRemainingEvents()
929+
}
930+
}
916931
}

core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ public open class AttachmentQueue(
327327
* Use a lock here to prevent conflicting state updates.
328328
*/
329329
attachmentsService.withContext { attachmentsContext ->
330+
logger.v { "processWatchedAttachments($items)" }
331+
330332
/**
331333
* Need to get all the attachments which are tracked in the DB.
332334
* We might need to restore an archived attachment.

core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package com.powersync.attachments.implementation
22

33
import co.touchlab.kermit.Logger
4+
import com.powersync.ExperimentalPowerSyncAPI
45
import com.powersync.PowerSyncDatabase
56
import com.powersync.attachments.Attachment
67
import com.powersync.attachments.AttachmentContext
78
import com.powersync.attachments.AttachmentState
89
import com.powersync.db.getString
910
import com.powersync.db.internal.ConnectionContext
10-
import kotlinx.serialization.json.Json
11+
import com.powersync.db.runWrapped
1112
import kotlin.time.Clock
1213

1314
/**
@@ -65,6 +66,8 @@ public open class AttachmentContextImpl(
6566
}
6667

6768
db.writeTransaction { tx ->
69+
logger.v { "saveAttachments($attachments)" }
70+
6871
for (attachment in attachments) {
6972
upsertAttachment(attachment, tx)
7073
}
@@ -131,6 +134,7 @@ public open class AttachmentContextImpl(
131134
* @returns true if all items have been deleted. Returns false if there might be more archived
132135
* items remaining.
133136
*/
137+
@OptIn(ExperimentalPowerSyncAPI::class)
134138
public override suspend fun deleteArchivedAttachments(callback: suspend (attachments: List<Attachment>) -> Unit): Boolean {
135139
// First fetch the attachments in order to allow other cleanup
136140
val limit = 1000
@@ -154,12 +158,21 @@ public open class AttachmentContextImpl(
154158
),
155159
) { Attachment.fromCursor(it) }
156160
callback(attachments)
157-
db.execute(
158-
"DELETE FROM $table WHERE id IN (SELECT value FROM json_each(?));",
159-
listOf(
160-
Json.encodeToString(attachments.map { it.id }),
161-
),
162-
)
161+
162+
runWrapped {
163+
db.useConnection(readOnly = false) { conn ->
164+
conn.usePrepared("DELETE FROM $table WHERE id = ?") { stmt ->
165+
for (attachment in attachments) {
166+
stmt.bindText(1, attachment.id)
167+
stmt.step()
168+
169+
stmt.reset()
170+
stmt.clearBindings()
171+
}
172+
}
173+
}
174+
}
175+
163176
return attachments.size < limit
164177
}
165178

core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public open class SyncingService(
7878
merge(
7979
// Handles manual triggers for sync events
8080
syncTriggerFlow.asSharedFlow(),
81-
// Triggers the sync process whenever an underlaying change to the
81+
// Triggers the sync process whenever an underlying change to the
8282
// attachments table happens
8383
attachmentsService
8484
.watchActiveAttachments(),

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.powersync.sync.Instruction
88
import com.powersync.sync.LegacySyncImplementation
99
import com.powersync.sync.SyncDataBatch
1010
import com.powersync.sync.SyncLocalDatabaseResult
11+
import com.powersync.utils.JsonUtil
1112
import kotlinx.serialization.Serializable
1213
import kotlinx.serialization.json.JsonObject
1314

@@ -55,25 +56,49 @@ internal interface BucketStorage {
5556
}
5657

5758
internal sealed interface PowerSyncControlArguments {
59+
/**
60+
* Returns the arguments for the `powersync_control` SQL invocation.
61+
*/
62+
val sqlArguments: Pair<String, Any?>
63+
5864
@Serializable
5965
class Start(
6066
val parameters: JsonObject,
6167
val schema: SerializableSchema,
62-
) : PowerSyncControlArguments
68+
) : PowerSyncControlArguments {
69+
override val sqlArguments: Pair<String, Any?>
70+
get() = "start" to JsonUtil.json.encodeToString(this)
71+
}
6372

64-
data object Stop : PowerSyncControlArguments
73+
data object Stop : PowerSyncControlArguments {
74+
override val sqlArguments: Pair<String, Any?> = "stop" to null
75+
}
6576

6677
data class TextLine(
6778
val line: String,
68-
) : PowerSyncControlArguments
79+
) : PowerSyncControlArguments {
80+
override val sqlArguments: Pair<String, Any?> = "line_text" to line
81+
}
6982

7083
class BinaryLine(
71-
val line: ByteArray,
84+
line: ByteArray,
7285
) : PowerSyncControlArguments {
7386
override fun toString(): String = "BinaryLine"
87+
88+
override val sqlArguments: Pair<String, Any?> = "line_binary" to line
89+
}
90+
91+
data object CompletedUpload : PowerSyncControlArguments {
92+
override val sqlArguments: Pair<String, Any?> = "completed_upload" to null
7493
}
7594

76-
data object CompletedUpload : PowerSyncControlArguments
95+
data object ConnectionEstablished : PowerSyncControlArguments {
96+
override val sqlArguments: Pair<String, Any?> = "connection" to "established"
97+
}
98+
99+
data object ResponseStreamEnd : PowerSyncControlArguments {
100+
override val sqlArguments: Pair<String, Any?> = "connection" to "end"
101+
}
77102
}
78103

79104
@Serializable

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -358,17 +358,7 @@ internal class BucketStorageImpl(
358358
db.writeTransaction { tx ->
359359
logger.v { "powersync_control: $args" }
360360

361-
val (op: String, data: Any?) =
362-
when (args) {
363-
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
364-
PowerSyncControlArguments.Stop -> "stop" to null
365-
366-
PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null
367-
368-
is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
369-
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
370-
}
371-
361+
val (op: String, data: Any?) = args.sqlArguments
372362
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
373363
}
374364
}

0 commit comments

Comments
 (0)