Skip to content

Commit e7d0f0f

Browse files
authored
Make CI more reliable (#261)
1 parent 22acdd7 commit e7d0f0f

File tree

12 files changed

+184
-108
lines changed

12 files changed

+184
-108
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* Remove internal SQLDelight and SQLiter dependencies.
66
* Add `rawConnection` getter to `ConnectionContext`, which is a `SQLiteConnection` instance from
77
`androidx.sqlite` that can be used to step through statements in a custom way.
8+
* Fix an issue where `watch()` would run queries more often than intended.
89

910
## 1.5.1
1011

core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import com.powersync.testutils.MockedRemoteStorage
1717
import com.powersync.testutils.UserRow
1818
import com.powersync.testutils.databaseTest
1919
import com.powersync.testutils.getTempDir
20-
import com.powersync.testutils.waitFor
2120
import dev.mokkery.answering.throws
2221
import dev.mokkery.everySuspend
2322
import dev.mokkery.matcher.ArgMatchersScope
@@ -29,6 +28,7 @@ import dev.mokkery.verifySuspend
2928
import io.kotest.matchers.shouldBe
3029
import io.kotest.matchers.shouldNotBe
3130
import kotlinx.coroutines.flow.Flow
31+
import kotlinx.coroutines.flow.distinctUntilChanged
3232
import kotlinx.coroutines.flow.flowOf
3333
import kotlinx.coroutines.flow.onEach
3434
import kotlinx.io.files.Path
@@ -60,7 +60,17 @@ class AttachmentsTest {
6060
database
6161
.watch("SELECT * FROM attachments") {
6262
Attachment.fromCursor(it)
63-
}.onEach { logger.i { "attachments table results: $it" } }
63+
}
64+
// Because tests run on slow machines, it's possible for a schedule like the following
65+
// to happen:
66+
// 1. the attachment is initially saved with QUEUED_DOWNLOAD, triggering a query.
67+
// 2. the attachment is downloaded fast, but the query flow is paused.
68+
// 3. only now is the query scheduled by 1 actually running, reporting SYNCED.
69+
// 4. we delete the attachment.
70+
// 5. thanks to 2, the query runs again, again reporting SYNCED.
71+
// 6. Our test now fails because the second event should be ARCHIVED.
72+
.distinctUntilChanged()
73+
.onEach { logger.i { "attachments table results: $it" } }
6474

6575
suspend fun updateSchema(db: PowerSyncDatabase) {
6676
db.updateSchema(
@@ -278,10 +288,13 @@ class AttachmentsTest {
278288
""",
279289
)
280290

281-
waitFor {
282-
var nextRecord: Attachment? = attachmentQuery.awaitItem().firstOrNull()
283-
// The record should have been deleted
284-
nextRecord shouldBe null
291+
while (true) {
292+
val item = attachmentQuery.awaitItem()
293+
if (item.isEmpty()) {
294+
break
295+
}
296+
297+
logger.v { "Waiting for attachment record to be deleted (current $item)" }
285298
}
286299

287300
// The file should have been deleted from storage
@@ -346,10 +359,13 @@ class AttachmentsTest {
346359
database.get("SELECT photo_id FROM users") { it.getString("photo_id") }
347360

348361
// Wait for the record to be synced (mocked backend will allow it)
349-
waitFor {
350-
val record = attachmentQuery.awaitItem().first()
351-
record shouldNotBe null
352-
record.state shouldBe AttachmentState.SYNCED
362+
while (true) {
363+
val item = attachmentQuery.awaitItem().firstOrNull()
364+
if (item != null && item.state == AttachmentState.SYNCED) {
365+
break
366+
}
367+
368+
logger.v { "Waiting for attachment record to be synced (current $item)" }
353369
}
354370

355371
queue.deleteFile(
@@ -369,10 +385,14 @@ class AttachmentsTest {
369385
)
370386
}
371387

372-
waitFor {
373-
// Record should be deleted
374-
val record = attachmentQuery.awaitItem().firstOrNull()
375-
record shouldBe null
388+
// Record should be deleted
389+
while (true) {
390+
val item = attachmentQuery.awaitItem()
391+
if (item.isEmpty()) {
392+
break
393+
}
394+
395+
logger.v { "Waiting for attachment record to be deleted (current $item)" }
376396
}
377397

378398
// A delete should have been attempted for this file
@@ -559,14 +579,16 @@ class AttachmentsTest {
559579
""",
560580
)
561581

582+
// Depending on when the query updates, we'll see the attachment as queued for
583+
// download or archived.
562584
var attachmentRecord = attachmentQuery.awaitItem().first()
563585
attachmentRecord shouldNotBe null
564586

565-
attachmentRecord.state shouldBe AttachmentState.QUEUED_DOWNLOAD
587+
if (attachmentRecord.state == AttachmentState.QUEUED_DOWNLOAD) {
588+
attachmentRecord = attachmentQuery.awaitItem().first()
589+
}
566590

567591
// The download should fail. We don't specify a retry. The record should be archived.
568-
attachmentRecord = attachmentQuery.awaitItem().first()
569-
570592
attachmentRecord.state shouldBe AttachmentState.ARCHIVED
571593

572594
attachmentQuery.cancelAndIgnoreRemainingEvents()

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,10 @@ class DatabaseTest {
211211

212212
var changeSet = query.awaitItem()
213213
// The initial result
214-
changeSet.count() shouldBe 0
214+
changeSet shouldHaveSize 0
215215

216216
changeSet = query.awaitItem()
217-
changeSet.count() shouldBe 1
218-
changeSet.contains("users") shouldBe true
217+
changeSet shouldBe setOf("users")
219218

220219
query.cancel()
221220
}

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,11 @@ abstract class BaseSyncIntegrationTest(
105105

106106
database.close()
107107
turbine.waitFor { !it.connected }
108-
turbine.cancel()
108+
turbine.cancelAndIgnoreRemainingEvents()
109109
}
110110

111111
// Closing the database should have closed the channel.
112+
logger.v { "Database is closed, waiting to close HTTP stream" }
112113
waitFor { syncLines.isClosedForSend shouldBe true }
113114
}
114115

core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public open class SyncingService(
9696
*/
9797
try {
9898
val attachments = context.getActiveAttachments()
99+
logger.v { "Processing active attachments: $attachments" }
100+
99101
// Performs pending operations and updates attachment states
100102
handleSync(attachments, context)
101103

@@ -298,6 +300,8 @@ public open class SyncingService(
298300
*/
299301
public suspend fun deleteArchivedAttachments(context: AttachmentContext): Boolean =
300302
context.deleteArchivedAttachments { pendingDelete ->
303+
logger.v { "Deleting archived attachments: $pendingDelete" }
304+
301305
for (attachment in pendingDelete) {
302306
if (attachment.localUri == null) {
303307
continue

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ internal class PowerSyncDatabaseImpl(
8181

8282
private val resource = activeDatabaseGroup.first
8383

84-
private val internalDb = InternalDatabaseImpl(pool)
84+
private val internalDb = InternalDatabaseImpl(pool, logger)
8585

8686
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
8787

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.powersync.db.internal
22

3+
import co.touchlab.kermit.Logger
34
import com.powersync.ExperimentalPowerSyncAPI
4-
import com.powersync.PowerSyncException
55
import com.powersync.db.SqlCursor
66
import com.powersync.db.ThrowableLockCallback
77
import com.powersync.db.ThrowableTransactionCallback
@@ -15,8 +15,9 @@ import kotlinx.coroutines.Dispatchers
1515
import kotlinx.coroutines.IO
1616
import kotlinx.coroutines.flow.Flow
1717
import kotlinx.coroutines.flow.SharedFlow
18-
import kotlinx.coroutines.flow.channelFlow
19-
import kotlinx.coroutines.flow.filter
18+
import kotlinx.coroutines.flow.emitAll
19+
import kotlinx.coroutines.flow.flow
20+
import kotlinx.coroutines.flow.map
2021
import kotlinx.coroutines.flow.onSubscription
2122
import kotlinx.coroutines.flow.transform
2223
import kotlinx.coroutines.withContext
@@ -25,6 +26,7 @@ import kotlin.time.Duration.Companion.milliseconds
2526
@OptIn(ExperimentalPowerSyncAPI::class)
2627
internal class InternalDatabaseImpl(
2728
private val pool: SQLiteConnectionPool,
29+
private val logger: Logger,
2830
) : InternalDatabase {
2931
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
3032
private val dbContext = Dispatchers.IO
@@ -79,74 +81,74 @@ internal class InternalDatabaseImpl(
7981
tables: Set<String>,
8082
throttleMs: Long,
8183
triggerImmediately: Boolean,
82-
): Flow<Set<String>> =
83-
channelFlow {
84-
// Match all possible internal table combinations
85-
val watchedTables =
86-
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()
87-
88-
// Accumulate updates between throttles
89-
val batchedUpdates = AtomicMutableSet<String>()
84+
): Flow<Set<String>> {
85+
// Match all possible internal table combinations
86+
val watchedTables =
87+
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()
9088

91-
updatesOnTables()
92-
.onSubscription {
93-
if (triggerImmediately) {
94-
// Emit an initial event (if requested). No changes would be detected at this point
95-
send(setOf())
96-
}
97-
}.transform { updates ->
98-
val intersection = updates.intersect(watchedTables)
99-
if (intersection.isNotEmpty()) {
100-
// Transform table names using friendlyTableName
101-
val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet()
102-
batchedUpdates.addAll(friendlyTableNames)
103-
emit(Unit)
104-
}
105-
}
106-
// Throttling here is a feature which prevents watch queries from spamming updates.
107-
// Throttling by design discards and delays events within the throttle window. Discarded events
108-
// still trigger a trailing edge update.
109-
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
110-
.throttle(throttleMs.milliseconds)
111-
.collect {
112-
// Emit the transformed tables which have changed
113-
val copy = batchedUpdates.toSetAndClear()
114-
send(copy)
115-
}
89+
return rawChangedTables(watchedTables, throttleMs, triggerImmediately).map {
90+
it.mapTo(mutableSetOf(), ::friendlyTableName)
11691
}
92+
}
11793

11894
override fun <RowType : Any> watch(
11995
sql: String,
12096
parameters: List<Any?>?,
12197
throttleMs: Long,
12298
mapper: (SqlCursor) -> RowType,
12399
): Flow<List<RowType>> =
124-
// Use a channel flow here since we throttle (buffer used under the hood)
125-
// This causes some emissions to be from different scopes.
126-
channelFlow {
100+
flow {
127101
// Fetch the tables asynchronously with getAll
128102
val tables =
129103
getSourceTables(sql, parameters)
130104
.filter { it.isNotBlank() }
131105
.toSet()
132106

107+
val queries =
108+
rawChangedTables(tables, throttleMs, triggerImmediately = true).map {
109+
logger.v { "Fetching watch() query: $sql" }
110+
val rows = getAll(sql, parameters = parameters, mapper = mapper)
111+
logger.v { "watch query $sql done, emitting downstream" }
112+
rows
113+
}
114+
emitAll(queries)
115+
}
116+
117+
private fun rawChangedTables(
118+
tableNames: Set<String>,
119+
throttleMs: Long,
120+
triggerImmediately: Boolean,
121+
): Flow<Set<String>> =
122+
flow {
123+
val batchedUpdates = AtomicMutableSet<String>()
124+
133125
updatesOnTables()
134-
// onSubscription here is very important.
135-
// This ensures that the initial result and all updates are emitted.
136126
.onSubscription {
137-
send(getAll(sql, parameters = parameters, mapper = mapper))
138-
}.filter {
139-
// Only trigger updates on relevant tables
140-
it.intersect(tables).isNotEmpty()
127+
if (triggerImmediately) {
128+
// Emit an initial event (if requested). No changes would be detected at this point
129+
emit(initialUpdateSentinel)
130+
}
131+
}.transform { updates ->
132+
if (updates === initialUpdateSentinel) {
133+
// This should always be emitted despite being empty and not intersecting with
134+
// the tables we care about.
135+
emit(Unit)
136+
} else {
137+
val intersection = updates.intersect(tableNames)
138+
if (intersection.isNotEmpty()) {
139+
batchedUpdates.addAll(intersection)
140+
emit(Unit)
141+
}
142+
}
141143
}
142144
// Throttling here is a feature which prevents watch queries from spamming updates.
143145
// Throttling by design discards and delays events within the throttle window. Discarded events
144146
// still trigger a trailing edge update.
145147
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
146-
// Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results.
147148
.throttle(throttleMs.milliseconds)
148149
.collect {
149-
send(getAll(sql, parameters = parameters, mapper = mapper))
150+
val entries = batchedUpdates.toSetAndClear()
151+
emit(entries)
150152
}
151153
}
152154

@@ -259,6 +261,10 @@ internal class InternalDatabaseImpl(
259261
val p2: Long,
260262
val p3: Long,
261263
)
264+
265+
private companion object {
266+
val initialUpdateSentinel = emptySet<String>()
267+
}
262268
}
263269

264270
/**

core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class AtomicMutableSet<T> : SynchronizedObject() {
1111
return set.add(element)
1212
}
1313

14-
public fun addAll(elements: Set<T>): Boolean =
14+
public fun addAll(elements: Collection<T>): Boolean =
1515
synchronized(this) {
1616
return set.addAll(elements)
1717
}

core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,34 @@ import kotlinx.coroutines.flow.Flow
66
import kotlinx.coroutines.flow.buffer
77
import kotlinx.coroutines.flow.flow
88
import kotlin.time.Duration
9+
import kotlin.time.TimeSource
910

1011
/**
11-
* Throttles a flow with emissions on the leading and trailing edge.
12-
* Events, from the incoming flow, during the throttle window are discarded.
13-
* Events are discarded by using a conflated buffer.
14-
* This throttle method acts as a slow consumer, but backpressure is not a concern
15-
* due to the conflated buffer dropping events during the throttle window.
12+
* Throttles an upstream flow.
13+
*
14+
* When a new event is emitted on this (upstream) flow, it is passed on downstream. For each value
15+
* passed downstream, the resulting flow will pause for at least [window] (or longer if emitting
16+
* the value downstream takes longer).
17+
*
18+
* While this flow is paused, no further events are passed downstream. The latest upstream event
19+
* emitted during the pause state is buffered and handled once the pause is over.
20+
*
21+
* In other words, this flow will _drop events_, so it should only be used when the upstream flow
22+
* serves as a notification marker (meaning that something downstream needs to run in response to
23+
* events, but the actual event does not matter).
1624
*/
1725
internal fun <T> Flow<T>.throttle(window: Duration): Flow<T> =
1826
flow {
1927
// Use a buffer before throttle (ensure only the latest event is kept)
2028
val bufferedFlow = this@throttle.buffer(Channel.CONFLATED)
2129

2230
bufferedFlow.collect { value ->
23-
// Emit the event immediately (leading edge)
31+
// Pause for the downstream emit or the delay window, whatever is longer
32+
val pauseUntil = TimeSource.Monotonic.markNow() + window
2433
emit(value)
2534

26-
// Delay for the throttle window to avoid emitting too frequently
27-
delay(window)
35+
// Negating the duration because we want to pause until pauseUntil has passed.
36+
delay(-pauseUntil.elapsedNow())
2837

2938
// The next incoming event will be provided from the buffer.
3039
// The next collect will emit the trailing edge

core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ internal suspend inline fun waitFor(
1515
try {
1616
test()
1717
return
18-
} catch (_: Error) {
18+
} catch (e: Error) {
1919
// Treat exceptions as failed
20+
println("waitFor: failed with $e")
2021
}
2122
delay(interval)
2223
} while (begin.elapsedNow() < timeout)
2324

24-
throw Exception("Timeout reached")
25+
throw Exception("waitFor() Timeout reached")
2526
}

0 commit comments

Comments
 (0)