Skip to content

Commit 6f32934

Browse files
wip: table update hooks
1 parent dbd9c09 commit 6f32934

File tree

4 files changed

+221
-13
lines changed

4 files changed

+221
-13
lines changed

Sources/PowerSync/Kotlin/KotlinSQLiteConnectionPool.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ final class SwiftSQLiteConnectionPoolAdapter: PowerSyncKotlin.SwiftPoolAdapter {
99
self.pool = pool
1010
}
1111

12+
func getPendingUpdates() -> Set<String> {
13+
return pool.getPendingUpdates()
14+
}
15+
1216
func __closePool() async throws {
1317
do {
1418
try pool.close()

Sources/PowerSync/Protocol/SQLiteConnectionPool.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import Foundation
33
/// An implementation of a connection pool providing asynchronous access to a single writer and multiple readers.
44
/// This is the underlying pool implementation on which the higher-level PowerSync Swift SDK is built on.
55
public protocol SQLiteConnectionPoolProtocol {
6+
func getPendingUpdates() -> Set<String>
7+
68
/// Calls the callback with a read-only connection temporarily leased from the pool.
79
func read(
810
onConnection: @Sendable @escaping (OpaquePointer) -> Void,

Sources/PowerSyncGRDB/GRDBPool.swift

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,58 @@ func configurePowerSync(
8282
}
8383
}
8484

85-
class GRDBConnectionPool: SQLiteConnectionPoolProtocol {
85+
final class PowerSyncTransactionObserver: TransactionObserver {
86+
let onChange: (_ tableName: String) -> Void
87+
88+
init(
89+
onChange: @escaping (_ tableName: String) -> Void
90+
) {
91+
self.onChange = onChange
92+
}
93+
94+
func observes(eventsOfKind _: DatabaseEventKind) -> Bool {
95+
// We want all the events for the PowerSync SDK
96+
return true
97+
}
98+
99+
func databaseDidChange(with event: DatabaseEvent) {
100+
onChange(event.tableName)
101+
}
102+
103+
func databaseDidCommit(_: GRDB.Database) {}
104+
105+
func databaseDidRollback(_: GRDB.Database) {}
106+
}
107+
108+
final class GRDBConnectionPool: SQLiteConnectionPoolProtocol {
86109
let pool: DatabasePool
110+
var pendingUpdates: Set<String>
111+
private let pendingUpdatesQueue = DispatchQueue(
112+
label: "co.powersync.pendingUpdatesQueue"
113+
)
87114

88115
init(
89116
pool: DatabasePool
90117
) {
91118
self.pool = pool
119+
self.pendingUpdates = Set()
120+
pool.add(
121+
transactionObserver: PowerSyncTransactionObserver { tableName in
122+
// push the update
123+
self.pendingUpdatesQueue.sync {
124+
self.pendingUpdates.insert(tableName)
125+
}
126+
},
127+
extent: .databaseLifetime
128+
)
129+
}
130+
131+
func getPendingUpdates() -> Set<String> {
132+
self.pendingUpdatesQueue.sync {
133+
let copy = self.pendingUpdates
134+
self.pendingUpdates.removeAll()
135+
return copy
136+
}
92137
}
93138

94139
func read(

Tests/PowerSyncGRDBTests/BasicTest.swift

Lines changed: 169 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@
44

55
import XCTest
66

7+
struct User: Codable, Identifiable, FetchableRecord, PersistableRecord {
8+
var id: String
9+
var name: String
10+
11+
static var databaseTableName = "users"
12+
13+
enum Columns {
14+
static let name = Column(CodingKeys.name)
15+
}
16+
}
17+
718
final class GRDBTests: XCTestCase {
819
private var database: PowerSyncDatabaseProtocol!
920
private var schema: Schema!
@@ -47,7 +58,7 @@ final class GRDBTests: XCTestCase {
4758
try await super.tearDown()
4859
}
4960

50-
func testValidValues() async throws {
61+
func testBasicOperations() async throws {
5162
// Create users with the PowerSync SDK
5263
let initialUserName = "Bob"
5364

@@ -66,17 +77,6 @@ final class GRDBTests: XCTestCase {
6677
XCTAssertTrue(initialUserNames.first == initialUserName)
6778

6879
// Now define a GRDB struct for query purposes
69-
struct User: Codable, Identifiable, FetchableRecord, PersistableRecord {
70-
var id: String
71-
var name: String
72-
73-
static var databaseTableName = "users"
74-
75-
enum Columns {
76-
static let name = Column(CodingKeys.name)
77-
}
78-
}
79-
8080
// Query the Users with GRDB, this should have the same result as with PowerSync
8181
let grdbUserNames = try await pool.read { database in
8282
try User.fetchAll(database)
@@ -98,4 +98,161 @@ final class GRDBTests: XCTestCase {
9898
XCTAssert(grdbUserNames2.count == 2)
9999
XCTAssert(grdbUserNames2[1].name == "another")
100100
}
101+
102+
func testPowerSyncUpdates() async throws {
103+
let expectation = XCTestExpectation(description: "Watch changes")
104+
105+
// Create an actor to handle concurrent mutations
106+
actor ResultsStore {
107+
private var results: Set<String> = []
108+
109+
func append(_ names: [String]) {
110+
results.formUnion(names)
111+
}
112+
113+
func getResults() -> Set<String> {
114+
results
115+
}
116+
117+
func count() -> Int {
118+
results.count
119+
}
120+
}
121+
122+
let resultsStore = ResultsStore()
123+
124+
let watchTask = Task {
125+
let stream = try database.watch(
126+
options: WatchOptions(
127+
sql: "SELECT name FROM users ORDER BY id",
128+
mapper: { cursor in
129+
try cursor.getString(index: 0)
130+
}
131+
))
132+
for try await names in stream {
133+
await resultsStore.append(names)
134+
if await resultsStore.count() == 2 {
135+
expectation.fulfill()
136+
}
137+
}
138+
}
139+
140+
try await database.execute(
141+
sql: "INSERT INTO users(id, name) VALUES(uuid(), ?)",
142+
parameters: ["one"]
143+
)
144+
145+
try await database.execute(
146+
sql: "INSERT INTO users(id, name) VALUES(uuid(), ?)",
147+
parameters: ["two"]
148+
)
149+
await fulfillment(of: [expectation], timeout: 5)
150+
watchTask.cancel()
151+
}
152+
153+
func testPowerSyncUpdatesFromGRDB() async throws {
154+
let expectation = XCTestExpectation(description: "Watch changes")
155+
156+
// Create an actor to handle concurrent mutations
157+
actor ResultsStore {
158+
private var results: Set<String> = []
159+
160+
func append(_ names: [String]) {
161+
results.formUnion(names)
162+
}
163+
164+
func getResults() -> Set<String> {
165+
results
166+
}
167+
168+
func count() -> Int {
169+
results.count
170+
}
171+
}
172+
173+
let resultsStore = ResultsStore()
174+
175+
let watchTask = Task {
176+
let stream = try database.watch(
177+
options: WatchOptions(
178+
sql: "SELECT name FROM users ORDER BY id",
179+
mapper: { cursor in
180+
try cursor.getString(index: 0)
181+
}
182+
))
183+
for try await names in stream {
184+
await resultsStore.append(names)
185+
if await resultsStore.count() == 2 {
186+
expectation.fulfill()
187+
}
188+
}
189+
}
190+
191+
try await pool.write { database in
192+
try User(
193+
id: UUID().uuidString,
194+
name: "one",
195+
).insert(database)
196+
}
197+
198+
try await pool.write { database in
199+
try User(
200+
id: UUID().uuidString,
201+
name: "two",
202+
).insert(database)
203+
}
204+
205+
await fulfillment(of: [expectation], timeout: 5)
206+
watchTask.cancel()
207+
}
208+
209+
func testGRDBUpdatesFromPowerSync() async throws {
210+
let expectation = XCTestExpectation(description: "Watch changes")
211+
212+
// Create an actor to handle concurrent mutations
213+
actor ResultsStore {
214+
private var results: Set<String> = []
215+
216+
func append(_ names: [String]) {
217+
results.formUnion(names)
218+
}
219+
220+
func getResults() -> Set<String> {
221+
results
222+
}
223+
224+
func count() -> Int {
225+
results.count
226+
}
227+
}
228+
229+
let resultsStore = ResultsStore()
230+
231+
let watchTask = Task {
232+
let observation = ValueObservation.tracking {
233+
try User.order(User.Columns.name.asc).fetchAll($0)
234+
}
235+
236+
for try await users in observation.values(in: pool) {
237+
print("users \(users)")
238+
await resultsStore.append(users.map { $0.name })
239+
if await resultsStore.count() == 2 {
240+
expectation.fulfill()
241+
}
242+
}
243+
}
244+
245+
try await database.execute(
246+
sql: "INSERT INTO users(id, name) VALUES(uuid(), ?)",
247+
parameters: ["one"]
248+
)
249+
250+
try await database.execute(
251+
sql: "INSERT INTO users(id, name) VALUES(uuid(), ?)",
252+
parameters: ["two"]
253+
)
254+
255+
await fulfillment(of: [expectation], timeout: 5)
256+
watchTask.cancel()
257+
}
101258
}

0 commit comments

Comments
 (0)