From cbae24edb12bc5153116737a746eaa0c706b968f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Apr 2025 20:10:52 +0200 Subject: [PATCH 01/10] Fix checkpoints during uploads not being applied --- CHANGELOG.md | 1 + .../com/powersync/SyncIntegrationTest.kt | 109 +++++++++++++++++- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 3 +- .../kotlin/com/powersync/sync/SyncStream.kt | 96 ++++++++++----- .../com/powersync/sync/SyncStreamTest.kt | 13 ++- .../powersync/testutils/MockSyncService.kt | 11 ++ 6 files changed, 196 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca6dfc14..ff760b20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.0.0-BETA29 (unreleased) * Fix potential race condition between jobs in `connect()` and `disconnect()`. +* Fix race condition causing data received during uploads not to be applied. ## 1.0.0-BETA28 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index cfefbf7b..25693f10 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -11,6 +11,8 @@ import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.WriteCheckpointData +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.PowerSyncDatabaseImpl @@ -24,11 +26,15 @@ import com.powersync.testutils.factory import com.powersync.testutils.generatePrintLogWriter import com.powersync.testutils.waitFor import com.powersync.utils.JsonUtil +import dev.mokkery.answering.calls import dev.mokkery.answering.returns import dev.mokkery.everySuspend +import dev.mokkery.matcher.any import dev.mokkery.mock import dev.mokkery.verify +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -61,6 +67,7 @@ class SyncIntegrationTest { private lateinit var database: PowerSyncDatabaseImpl private lateinit var connector: PowerSyncBackendConnector private lateinit var syncLines: Channel + private lateinit var checkpointResponse: () -> WriteCheckpointResponse @BeforeTest fun setup() { @@ -79,6 +86,7 @@ class SyncIntegrationTest { everySuspend { invalidateCredentials() } returns Unit } syncLines = Channel() + checkpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) } runBlocking { database.disconnectAndClear(true) @@ -100,16 +108,18 @@ class SyncIntegrationTest { dbFilename = "testdb", ) as PowerSyncDatabaseImpl + @OptIn(DelicateCoroutinesApi::class) private fun syncStream(): SyncStream { - val client = MockSyncService(syncLines) + val client = MockSyncService(syncLines, { checkpointResponse() }) return SyncStream( bucketStorage = database.bucketStorage, connector = connector, httpEngine = client, - uploadCrud = { }, + uploadCrud = { connector.uploadData(database) }, retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = GlobalScope, ) } @@ -529,4 +539,99 @@ class SyncIntegrationTest { database.close() syncLines.close() } + + @Test + fun `handles checkpoints during uploads`() = runTest { + database.connectInternal(syncStream(), 1000L) + + suspend fun expectUserRows(amount: Int) { + val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } + assertEquals(amount, row.toInt()) + } + + val completeUpload = CompletableDeferred() + val uploadStarted = CompletableDeferred() + everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) -> + val batch = db.getCrudBatch() + if (batch == null) return@calls + + uploadStarted.complete(Unit) + completeUpload.await() + batch.complete.invoke(null) + } + + // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully + // connected). + database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org")) + syncLines.send(SyncLine.KeepAlive(1234)) + expectUserRows(1) + uploadStarted.await() + + // Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns. + syncLines.send(SyncLine.FullCheckpoint(Checkpoint( + writeCheckpoint = "1", + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)) + ))) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.downloading } + turbine.cancel() + } + + syncLines.send(SyncLine.SyncDataBucket( + bucket = "a", + data = listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from local", "email": ""}""" + ), + OplogEntry( + checksum = 0, + opId = "2", + op = OpType.PUT, + rowId = "2", + rowType = "users", + data = """{"id": "test1", "name": "additional entry", "email": ""}""" + ), + ), + after = null, + nextAfter = null, + hasMore = false, + )) + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2")) + + // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data. + waitFor { + assertNotNull( + logWriter.logs.find { + it.message.contains("Could not apply checkpoint due to local data") + }, + ) + } + expectUserCount(1) + + // Mark the upload as completed, this should trigger a write_checkpoint.json request + val requestedCheckpoint = CompletableDeferred() + checkpointResponse = { + requestedCheckpoint.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("")) + } + completeUpload.complete(Unit) + requestedCheckpoint.await() + + // This should apply the checkpoint + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { !it.downloading } + turbine.cancel() + } + + // Meaning that the two rows are now visible + expectUserCount(2) + } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 244d9516..929189b8 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -150,6 +150,7 @@ internal class PowerSyncDatabaseImpl( retryDelayMs = retryDelayMs, logger = logger, params = params.toJsonObject(), + scope = scope, ), crudThrottleMs, ) @@ -222,7 +223,7 @@ internal class PowerSyncDatabaseImpl( .filter { it.contains(InternalTable.CRUD.toString()) } .throttle(crudThrottleMs) .collect { - stream.triggerCrudUpload() + stream.triggerCrudUploadAsync().join() } } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 23b3e5ed..b2d8809a 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -1,7 +1,7 @@ package com.powersync.sync import co.touchlab.kermit.Logger -import co.touchlab.stately.concurrency.AtomicBoolean +import co.touchlab.stately.concurrency.AtomicReference import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketRequest import com.powersync.bucket.BucketStorage @@ -29,9 +29,13 @@ import io.ktor.http.contentType import io.ktor.utils.io.ByteReadChannel import io.ktor.utils.io.readUTF8Line import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch import kotlinx.datetime.Clock import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonObject @@ -43,9 +47,10 @@ internal class SyncStream( private val retryDelayMs: Long = 5000L, private val logger: Logger, private val params: JsonObject, + private val scope: CoroutineScope, httpEngine: HttpClientEngine? = null, ) { - private var isUploadingCrud = AtomicBoolean(false) + private var isUploadingCrud = AtomicReference(null) /** * The current sync status. This instance is updated as changes occur @@ -116,13 +121,18 @@ internal class SyncStream( } } - suspend fun triggerCrudUpload() { - if (!status.connected || isUploadingCrud.value) { - return + fun triggerCrudUploadAsync(): Job = scope.launch { + val thisIteration = PendingCrudUpload(CompletableDeferred()) + try { + if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) { + return@launch + } + + uploadAllCrud() + } finally { + isUploadingCrud.set(null) + thisIteration.done.complete(Unit) } - isUploadingCrud.value = true - uploadAllCrud() - isUploadingCrud.value = false } private suspend fun uploadAllCrud() { @@ -153,8 +163,13 @@ internal class SyncStream( break } } catch (e: Exception) { - logger.e { "Error uploading crud: ${e.message}" } status.update(uploading = false, uploadError = e) + + if (e is CancellationException) { + throw e + } + + logger.e { "Error uploading crud: ${e.message}" } delay(retryDelayMs) break } @@ -237,7 +252,6 @@ internal class SyncStream( validatedCheckpoint = null, appliedCheckpoint = null, bucketSet = initialBuckets.keys.toMutableSet(), - retry = false, ) bucketEntries.forEach { entry -> @@ -253,7 +267,12 @@ internal class SyncStream( streamingSyncRequest(req).collect { value -> val line = JsonUtil.json.decodeFromString(value) + state = handleInstruction(line, value, state) + + if (state.abortIteration) { + return@collect + } } status.update(downloading = false) @@ -314,30 +333,40 @@ internal class SyncStream( } private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState { - val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!) + val checkpoint = state.targetCheckpoint!! + var result = bucketStorage.syncLocalDatabase(checkpoint) + val pending = isUploadingCrud.get() + if (!result.checkpointValid) { // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off delay(50) - state.retry = true + state.abortIteration = true // TODO handle retries return state - } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. - // landing here the whole time - } else { - state.appliedCheckpoint = state.targetCheckpoint!!.clone() - logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + } else if (!result.ready && pending != null) { + // We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which + // prevented this checkpoint from applying. Wait for that to complete and try again. + logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." } + pending.done.await() + + result = bucketStorage.syncLocalDatabase(checkpoint) } - state.validatedCheckpoint = state.targetCheckpoint - status.update( - lastSyncedAt = Clock.System.now(), - downloading = false, - hasSynced = true, - clearDownloadError = true, - ) + if (result.checkpointValid && result.ready) { + state.appliedCheckpoint = checkpoint.clone() + logger.i { "validated checkpoint ${state.appliedCheckpoint}" } + + state.validatedCheckpoint = state.targetCheckpoint + status.update( + lastSyncedAt = Clock.System.now(), + downloading = false, + hasSynced = true, + clearDownloadError = true, + ) + } else { + logger.d { "Could not apply checkpoint. Waiting for next sync complete line" } + } return state } @@ -352,12 +381,12 @@ internal class SyncStream( // This means checksums failed. Start again with a new checkpoint. // TODO: better back-off delay(50) - state.retry = true + state.abortIteration = true // TODO handle retries return state } else if (!result.ready) { - // Checksums valid, but need more data for a consistent checkpoint. - // Continue waiting. + // Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this + // once we have a complete checkpoint if the problem persists. } else { logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } } @@ -441,10 +470,11 @@ internal class SyncStream( // Connection would be closed automatically right after this logger.i { "Token expiring reconnect" } connector.invalidateCredentials() - state.retry = true + state.abortIteration = true return state } - triggerCrudUpload() + // Don't await the upload job, we can keep receiving sync lines + triggerCrudUploadAsync() return state } } @@ -454,5 +484,7 @@ internal data class SyncStreamState( var validatedCheckpoint: Checkpoint?, var appliedCheckpoint: Checkpoint?, var bucketSet: MutableSet?, - var retry: Boolean, + var abortIteration: Boolean = false ) + +private class PendingCrudUpload(val done: CompletableDeferred) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 41573e3f..5eeb6d6b 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -11,6 +11,8 @@ import com.powersync.bucket.BucketStorage import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.WriteCheckpointData +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry @@ -28,7 +30,10 @@ import dev.mokkery.verify.VerifyMode.Companion.order import dev.mokkery.verifyNoMoreCalls import dev.mokkery.verifySuspend import io.ktor.client.engine.mock.MockEngine +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -105,6 +110,7 @@ class SyncStreamTest { uploadCrud = {}, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) syncStream.invalidateCredentials() @@ -141,10 +147,11 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) syncStream.status.update(connected = true) - syncStream.triggerCrudUpload() + syncStream.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -180,6 +187,7 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) // Launch streaming sync in a coroutine that we'll cancel after verification @@ -209,7 +217,7 @@ class SyncStreamTest { // TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything // Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point val syncLines = Channel() - val client = MockSyncService(syncLines) + val client = MockSyncService(syncLines, { WriteCheckpointResponse(WriteCheckpointData("1000")) }) syncStream = SyncStream( @@ -220,6 +228,7 @@ class SyncStreamTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), + scope = this, ) val job = launch { syncStream.streamingSync() } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 56ab90ec..42831a11 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -1,6 +1,7 @@ package com.powersync.testutils import app.cash.turbine.ReceiveTurbine +import com.powersync.bucket.WriteCheckpointResponse import com.powersync.sync.SyncLine import com.powersync.sync.SyncStatusData import com.powersync.utils.JsonUtil @@ -33,6 +34,7 @@ import kotlinx.serialization.encodeToString */ internal class MockSyncService( private val lines: ReceiveChannel, + private val generateCheckpoint: () -> WriteCheckpointResponse, ) : HttpClientEngineBase("sync-service") { override val config: HttpClientEngineConfig get() = Config @@ -70,6 +72,15 @@ internal class MockSyncService( job.channel, context, ) + } else if (data.url.encodedPath == "/write-checkpoint2.json") { + HttpResponseData( + HttpStatusCode.OK, + GMTDate(), + headersOf(), + HttpProtocolVersion.HTTP_1_1, + JsonUtil.json.encodeToString(generateCheckpoint()), + context, + ) } else { HttpResponseData( HttpStatusCode.BadRequest, From b3e767eef9b719e1d139947671236bc3253607ae Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Apr 2025 20:12:24 +0200 Subject: [PATCH 02/10] Reformat --- .../com/powersync/SyncIntegrationTest.kt | 174 +++++++++--------- .../kotlin/com/powersync/sync/SyncStream.kt | 29 +-- .../com/powersync/sync/SyncStreamTest.kt | 3 - 3 files changed, 107 insertions(+), 99 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 25693f10..58091f52 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -541,97 +541,105 @@ class SyncIntegrationTest { } @Test - fun `handles checkpoints during uploads`() = runTest { - database.connectInternal(syncStream(), 1000L) + fun `handles checkpoints during uploads`() = + runTest { + database.connectInternal(syncStream(), 1000L) - suspend fun expectUserRows(amount: Int) { - val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } - assertEquals(amount, row.toInt()) - } + suspend fun expectUserRows(amount: Int) { + val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } + assertEquals(amount, row.toInt()) + } - val completeUpload = CompletableDeferred() - val uploadStarted = CompletableDeferred() - everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) -> - val batch = db.getCrudBatch() - if (batch == null) return@calls + val completeUpload = CompletableDeferred() + val uploadStarted = CompletableDeferred() + everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) -> + val batch = db.getCrudBatch() + if (batch == null) return@calls - uploadStarted.complete(Unit) - completeUpload.await() - batch.complete.invoke(null) - } + uploadStarted.complete(Unit) + completeUpload.await() + batch.complete.invoke(null) + } - // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully - // connected). - database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org")) - syncLines.send(SyncLine.KeepAlive(1234)) - expectUserRows(1) - uploadStarted.await() - - // Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns. - syncLines.send(SyncLine.FullCheckpoint(Checkpoint( - writeCheckpoint = "1", - lastOpId = "2", - checksums = listOf(BucketChecksum("a", checksum = 0)) - ))) - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.downloading } - turbine.cancel() - } + // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully + // connected). + database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "local@example.org")) + syncLines.send(SyncLine.KeepAlive(1234)) + expectUserRows(1) + uploadStarted.await() - syncLines.send(SyncLine.SyncDataBucket( - bucket = "a", - data = listOf( - OplogEntry( - checksum = 0, - opId = "1", - op = OpType.PUT, - rowId = "1", - rowType = "users", - data = """{"id": "test1", "name": "from local", "email": ""}""" + // Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns. + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + writeCheckpoint = "1", + lastOpId = "2", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), ), - OplogEntry( - checksum = 0, - opId = "2", - op = OpType.PUT, - rowId = "2", - rowType = "users", - data = """{"id": "test1", "name": "additional entry", "email": ""}""" + ) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.downloading } + turbine.cancel() + } + + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from local", "email": ""}""", + ), + OplogEntry( + checksum = 0, + opId = "2", + op = OpType.PUT, + rowId = "2", + rowType = "users", + data = """{"id": "test1", "name": "additional entry", "email": ""}""", + ), + ), + after = null, + nextAfter = null, + hasMore = false, ), - ), - after = null, - nextAfter = null, - hasMore = false, - )) - syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2")) - - // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data. - waitFor { - assertNotNull( - logWriter.logs.find { - it.message.contains("Could not apply checkpoint due to local data") - }, ) - } - expectUserCount(1) + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2")) + + // Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data. + waitFor { + assertNotNull( + logWriter.logs.find { + it.message.contains("Could not apply checkpoint due to local data") + }, + ) + } + expectUserCount(1) - // Mark the upload as completed, this should trigger a write_checkpoint.json request - val requestedCheckpoint = CompletableDeferred() - checkpointResponse = { - requestedCheckpoint.complete(Unit) - WriteCheckpointResponse(WriteCheckpointData("")) - } - completeUpload.complete(Unit) - requestedCheckpoint.await() - - // This should apply the checkpoint - turbineScope { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { !it.downloading } - turbine.cancel() - } + // Mark the upload as completed, this should trigger a write_checkpoint.json request + val requestedCheckpoint = CompletableDeferred() + checkpointResponse = { + requestedCheckpoint.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("")) + } + completeUpload.complete(Unit) + requestedCheckpoint.await() - // Meaning that the two rows are now visible - expectUserCount(2) - } + // This should apply the checkpoint + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { !it.downloading } + turbine.cancel() + } + + // Meaning that the two rows are now visible + expectUserCount(2) + } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index b2d8809a..b4d26eaa 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -121,19 +121,20 @@ internal class SyncStream( } } - fun triggerCrudUploadAsync(): Job = scope.launch { - val thisIteration = PendingCrudUpload(CompletableDeferred()) - try { - if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) { - return@launch - } + fun triggerCrudUploadAsync(): Job = + scope.launch { + val thisIteration = PendingCrudUpload(CompletableDeferred()) + try { + if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) { + return@launch + } - uploadAllCrud() - } finally { - isUploadingCrud.set(null) - thisIteration.done.complete(Unit) + uploadAllCrud() + } finally { + isUploadingCrud.set(null) + thisIteration.done.complete(Unit) + } } - } private suspend fun uploadAllCrud() { var checkedCrudItem: CrudEntry? = null @@ -484,7 +485,9 @@ internal data class SyncStreamState( var validatedCheckpoint: Checkpoint?, var appliedCheckpoint: Checkpoint?, var bucketSet: MutableSet?, - var abortIteration: Boolean = false + var abortIteration: Boolean = false, ) -private class PendingCrudUpload(val done: CompletableDeferred) +private class PendingCrudUpload( + val done: CompletableDeferred, +) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 5eeb6d6b..069139bd 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -30,10 +30,7 @@ import dev.mokkery.verify.VerifyMode.Companion.order import dev.mokkery.verifyNoMoreCalls import dev.mokkery.verifySuspend import io.ktor.client.engine.mock.MockEngine -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest From 67d0be55858eed7765df1240b0d06e06c8617d50 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 09:52:28 +0200 Subject: [PATCH 03/10] Only reset upload lock when held --- .../com/powersync/SyncIntegrationTest.kt | 25 +++++++++--------- .../kotlin/com/powersync/sync/SyncStream.kt | 8 +++++- .../kotlin/com/powersync/TestConnector.kt | 26 +++++++++++++++++++ 3 files changed, 46 insertions(+), 13 deletions(-) create mode 100644 core/src/commonTest/kotlin/com/powersync/TestConnector.kt diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 58091f52..ebf5204c 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -26,15 +26,13 @@ import com.powersync.testutils.factory import com.powersync.testutils.generatePrintLogWriter import com.powersync.testutils.waitFor import com.powersync.utils.JsonUtil -import dev.mokkery.answering.calls import dev.mokkery.answering.returns import dev.mokkery.everySuspend -import dev.mokkery.matcher.any import dev.mokkery.mock import dev.mokkery.verify import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -109,7 +107,7 @@ class SyncIntegrationTest { ) as PowerSyncDatabaseImpl @OptIn(DelicateCoroutinesApi::class) - private fun syncStream(): SyncStream { + private fun CoroutineScope.syncStream(): SyncStream { val client = MockSyncService(syncLines, { checkpointResponse() }) return SyncStream( bucketStorage = database.bucketStorage, @@ -119,7 +117,7 @@ class SyncIntegrationTest { retryDelayMs = 10, logger = logger, params = JsonObject(emptyMap()), - scope = GlobalScope, + scope = this, ) } @@ -543,6 +541,8 @@ class SyncIntegrationTest { @Test fun `handles checkpoints during uploads`() = runTest { + val testConnector = TestConnector() + connector = testConnector database.connectInternal(syncStream(), 1000L) suspend fun expectUserRows(amount: Int) { @@ -552,13 +552,13 @@ class SyncIntegrationTest { val completeUpload = CompletableDeferred() val uploadStarted = CompletableDeferred() - everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) -> - val batch = db.getCrudBatch() - if (batch == null) return@calls - - uploadStarted.complete(Unit) - completeUpload.await() - batch.complete.invoke(null) + testConnector.uploadDataCallback = { db -> + println("upload data callback called") + db.getCrudBatch()?.let { batch -> + uploadStarted.complete(Unit) + completeUpload.await() + batch.complete.invoke(null) + } } // Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully @@ -629,6 +629,7 @@ class SyncIntegrationTest { requestedCheckpoint.complete(Unit) WriteCheckpointResponse(WriteCheckpointData("")) } + println("marking update as completed") completeUpload.complete(Unit) requestedCheckpoint.await() diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index b4d26eaa..60225952 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -124,14 +124,20 @@ internal class SyncStream( fun triggerCrudUploadAsync(): Job = scope.launch { val thisIteration = PendingCrudUpload(CompletableDeferred()) + var holdingUploadLock = false + try { if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) { return@launch } + holdingUploadLock = true uploadAllCrud() } finally { - isUploadingCrud.set(null) + if (holdingUploadLock) { + isUploadingCrud.set(null) + } + thisIteration.done.complete(Unit) } } diff --git a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt new file mode 100644 index 00000000..404fe389 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt @@ -0,0 +1,26 @@ +package com.powersync + +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials + +class TestConnector: PowerSyncBackendConnector() { + var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = { + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + } + var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = { + val tx = it.getNextCrudTransaction() + tx?.complete(null) + } + + override suspend fun fetchCredentials(): PowerSyncCredentials? { + return fetchCredentialsCallback() + } + + override suspend fun uploadData(database: PowerSyncDatabase) { + uploadDataCallback(database) + } +} From 02fbd7c971da4cc8c1955176dd33481419588465 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 09:56:02 +0200 Subject: [PATCH 04/10] Format --- core/src/commonTest/kotlin/com/powersync/TestConnector.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt index 404fe389..1319e637 100644 --- a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt +++ b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt @@ -3,7 +3,7 @@ package com.powersync import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials -class TestConnector: PowerSyncBackendConnector() { +class TestConnector : PowerSyncBackendConnector() { var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = { PowerSyncCredentials( token = "test-token", @@ -16,9 +16,7 @@ class TestConnector: PowerSyncBackendConnector() { tx?.complete(null) } - override suspend fun fetchCredentials(): PowerSyncCredentials? { - return fetchCredentialsCallback() - } + override suspend fun fetchCredentials(): PowerSyncCredentials? = fetchCredentialsCallback() override suspend fun uploadData(database: PowerSyncDatabase) { uploadDataCallback(database) From 9f33764b472e6ba9eac0bb4f71fad85bc9a8c494 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 10:17:34 +0200 Subject: [PATCH 05/10] Close databases in DatabaseTest --- .../commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 7e4614ca..99c76883 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -69,6 +69,7 @@ class DatabaseTest { runBlocking { if (!database.closed) { database.disconnectAndClear(true) + database.close() } } com.powersync.testutils.cleanup("testdb") From 1fd6d18c3b3cc95d0d754cb2247b8a154993f078 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 11:02:45 +0200 Subject: [PATCH 06/10] Remove debug log --- .../kotlin/com/powersync/SyncIntegrationTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index ebf5204c..69f82ef1 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -553,7 +553,6 @@ class SyncIntegrationTest { val completeUpload = CompletableDeferred() val uploadStarted = CompletableDeferred() testConnector.uploadDataCallback = { db -> - println("upload data callback called") db.getCrudBatch()?.let { batch -> uploadStarted.complete(Unit) completeUpload.await() From 5999ad6a61c55b3d0c645072b172b7747f5b431b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 12:13:21 +0200 Subject: [PATCH 07/10] Remove another debug log line --- .../kotlin/com/powersync/SyncIntegrationTest.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 69f82ef1..4a811aa2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -626,9 +626,8 @@ class SyncIntegrationTest { val requestedCheckpoint = CompletableDeferred() checkpointResponse = { requestedCheckpoint.complete(Unit) - WriteCheckpointResponse(WriteCheckpointData("")) + WriteCheckpointResponse(WriteCheckpointData("1")) } - println("marking update as completed") completeUpload.complete(Unit) requestedCheckpoint.await() From 50314606a195ab58def400e0f13eb4703337fc28 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 17:13:53 +0200 Subject: [PATCH 08/10] Also show causes I guess? --- core/build.gradle.kts | 1 + 1 file changed, 1 insertion(+) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 96f77274..ce8c9127 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -345,6 +345,7 @@ tasks.withType { testLogging { events("PASSED", "FAILED", "SKIPPED") exceptionFormat = TestExceptionFormat.FULL + showCauses = true showStandardStreams = true showStackTraces = true } From 482082cdda0af95fcba9874b4825c300a7d75bb7 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 17:23:56 +0200 Subject: [PATCH 09/10] Upload results on failure --- .github/workflows/test.yml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0cc6c341..e9912bc8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,4 +44,14 @@ jobs: ./gradlew \ -PGITHUB_PUBLISH_TOKEN=${{ secrets.GITHUB_TOKEN }} \ ${{ matrix.targets }} - shell: bash \ No newline at end of file + shell: bash + + # Credit: https://github.com/gradle/actions/issues/76#issuecomment-2007584323 + - name: Upload reports on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: report-for-${{ matrix.os }} + path: | + **/build/reports/ + **/build/test-results/ From 924c45db554d422193d660ac2ba024e78407ff97 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 4 Apr 2025 17:34:59 +0200 Subject: [PATCH 10/10] Ignore events after cancelling turbine --- .../com/powersync/SyncIntegrationTest.kt | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 4a811aa2..65cfe580 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -139,7 +139,7 @@ class SyncIntegrationTest { database.close() turbine.waitFor { !it.connected } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } // Closing the database should have closed the channel @@ -159,7 +159,7 @@ class SyncIntegrationTest { database.disconnect() turbine.waitFor { !it.connected } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } // Disconnecting should have closed the channel @@ -178,7 +178,7 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.connected } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } assertFailsWith("Cannot update schema while connected") { @@ -276,7 +276,7 @@ class SyncIntegrationTest { turbine.waitFor { it.hasSynced == true } expectUserCount(4) - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } syncLines.close() @@ -349,7 +349,7 @@ class SyncIntegrationTest { syncLines.send(SyncLine.CheckpointComplete(lastOpId = "1")) turbine.waitFor { !it.downloading } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } database.close() @@ -369,7 +369,7 @@ class SyncIntegrationTest { database.disconnect() turbine.waitFor { !it.connecting && !it.connected } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } database.close() @@ -414,7 +414,7 @@ class SyncIntegrationTest { assertEquals(1, rows.size) } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } database.close() @@ -486,8 +486,8 @@ class SyncIntegrationTest { db2.disconnect() turbine2.waitFor { !it.connecting } - turbine1.cancel() - turbine2.cancel() + turbine1.cancelAndIgnoreRemainingEvents() + turbine2.cancelAndIgnoreRemainingEvents() } db2.close() @@ -512,7 +512,7 @@ class SyncIntegrationTest { database.disconnect() turbine.waitFor { !it.connecting } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } database.close() @@ -531,7 +531,7 @@ class SyncIntegrationTest { database.connect(connector, 1000L, retryDelayMs = 5000) turbine.waitFor { it.connecting } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } database.close() @@ -580,7 +580,7 @@ class SyncIntegrationTest { turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.downloading } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } syncLines.send( @@ -635,7 +635,7 @@ class SyncIntegrationTest { turbineScope { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { !it.downloading } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } // Meaning that the two rows are now visible