Skip to content

Commit 23d52b8

Browse files
committed
Use jsonb to store oplog and ps_data
1 parent 8a1efcc commit 23d52b8

File tree

4 files changed

+114
-23
lines changed

4 files changed

+114
-23
lines changed

crates/core/src/migrations.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError};
1212
use crate::fix_data::apply_v035_fix;
1313
use crate::sync::BucketPriority;
1414

15-
pub const LATEST_VERSION: i32 = 10;
15+
pub const LATEST_VERSION: i32 = 11;
1616

1717
pub fn powersync_migrate(
1818
ctx: *mut sqlite::context,
@@ -384,5 +384,26 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
384384
.into_db_result(local_db)?;
385385
}
386386

387+
if current_version < 11 && target_version >= 11 {
388+
local_db.exec_safe("PRAGMA writable_schema = ON;")?;
389+
local_db
390+
.exec_safe("UPDATE sqlite_schema SET sql = replace(sql, 'data TEXT', 'data ANY') WHERE name = 'ps_oplog'")?;
391+
local_db.exec_safe("PRAGMA writable_schema = RESET;")?;
392+
393+
local_db
394+
.exec_safe(
395+
"\
396+
INSERT INTO ps_migration(id, down_migrations) VALUES (11, json_array(
397+
json_object('sql', 'PRAGMA writable_schema = ON;'),
398+
json_object('sql', 'UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';'),
399+
json_object('sql', 'UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';'),
400+
json_object('sql', 'PRAGMA writable_schema = OFF;'),
401+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
402+
));
403+
",
404+
)
405+
.into_db_result(local_db)?;
406+
}
407+
387408
Ok(())
388409
}

crates/core/src/sync/operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ RETURNING op_id, hash",
4545

4646
// language=SQLite
4747
let insert_statement = db.prepare_v2("\
48-
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
48+
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, jsonb(?), ?)")?;
4949
insert_statement.bind_int64(1, bucket_id)?;
5050

5151
let updated_row_statement = db.prepare_v2(

crates/core/src/sync_local.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -145,33 +145,43 @@ impl<'a> SyncOperation<'a> {
145145
let mut untyped_delete_statement: Option<ManagedStmt> = None;
146146
let mut untyped_insert_statement: Option<ManagedStmt> = None;
147147

148+
let mut json2json: Option<ManagedStmt> = None;
149+
148150
while statement.step().into_db_result(self.db)? == ResultCode::ROW {
149151
let type_name = statement.column_text(0)?;
150152
let id = statement.column_text(1)?;
151-
let data = statement.column_text(2);
153+
// data can either be null (delete), or a JSON object as text or blob (put)
154+
let data_value = statement.column_value(2)?;
155+
let has_data = data_value.value_type() != ColumnType::Null;
152156

153157
if let Some(known) = self.schema.tables.get_mut(type_name) {
154158
if let Some(raw) = &mut known.raw {
155-
match data {
156-
Ok(data) => {
157-
let stmt = raw.put_statement(self.db)?;
158-
let parsed: serde_json::Value = serde_json::from_str(data)
159-
.map_err(PowerSyncError::json_local_error)?;
160-
stmt.bind_for_put(id, &parsed)?;
161-
stmt.stmt.exec()?;
162-
}
163-
Err(_) => {
164-
let stmt = raw.delete_statement(self.db)?;
165-
stmt.bind_for_delete(id)?;
166-
stmt.stmt.exec()?;
167-
}
159+
if has_data {
160+
// data_value could be jsonb, but we need json to parse it for raw statements.
161+
let convert_stmt = match &json2json {
162+
Some(stmt) => stmt,
163+
None => json2json.insert(self.db.prepare_v2("SELECT json(?)")?),
164+
};
165+
166+
convert_stmt.reset()?;
167+
convert_stmt.bind_value(1, data_value)?;
168+
convert_stmt.step()?;
169+
let data = convert_stmt.column_text(0)?;
170+
171+
let stmt = raw.put_statement(self.db)?;
172+
let parsed: serde_json::Value =
173+
serde_json::from_str(data).map_err(PowerSyncError::json_local_error)?;
174+
stmt.bind_for_put(id, &parsed)?;
175+
stmt.stmt.exec()?;
176+
} else {
177+
let stmt = raw.delete_statement(self.db)?;
178+
stmt.bind_for_delete(id)?;
179+
stmt.stmt.exec()?;
168180
}
169181
} else {
170182
let quoted = quote_internal_name(type_name, false);
171183

172-
// is_err() is essentially a NULL check here.
173-
// NULL data means no PUT operations found, so we delete the row.
174-
if data.is_err() {
184+
if !has_data {
175185
// DELETE
176186
if last_delete_table.as_deref() != Some(&quoted) {
177187
// Prepare statement when the table changed
@@ -204,12 +214,12 @@ impl<'a> SyncOperation<'a> {
204214
let insert_statement = last_insert_statement.as_mut().unwrap();
205215
insert_statement.reset()?;
206216
insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?;
207-
insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?;
217+
insert_statement.bind_value(2, data_value)?;
208218
insert_statement.exec()?;
209219
}
210220
}
211221
} else {
212-
if data.is_err() {
222+
if !has_data {
213223
// DELETE
214224
if untyped_delete_statement.is_none() {
215225
// Prepare statement on first use
@@ -240,7 +250,7 @@ impl<'a> SyncOperation<'a> {
240250
insert_statement.reset()?;
241251
insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?;
242252
insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?;
243-
insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?;
253+
insert_statement.bind_value(3, data_value)?;
244254
insert_statement.exec()?;
245255
}
246256
}

dart/test/utils/migration_fixtures.dart

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The current database version
2-
const databaseVersion = 10;
2+
const databaseVersion = 11;
33

44
/// This is the base database state that we expect at various schema versions.
55
/// Generated by loading the specific library version, and exporting the schema.
@@ -354,6 +354,54 @@ const expectedState = <int, String>{
354354
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
355355
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
356356
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
357+
''',
358+
11: r'''
359+
;CREATE TABLE ps_buckets(
360+
id INTEGER PRIMARY KEY,
361+
name TEXT NOT NULL,
362+
last_applied_op INTEGER NOT NULL DEFAULT 0,
363+
last_op INTEGER NOT NULL DEFAULT 0,
364+
target_op INTEGER NOT NULL DEFAULT 0,
365+
add_checksum INTEGER NOT NULL DEFAULT 0,
366+
op_checksum INTEGER NOT NULL DEFAULT 0,
367+
pending_delete INTEGER NOT NULL DEFAULT 0
368+
, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT
369+
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
370+
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
371+
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
372+
;CREATE TABLE ps_oplog(
373+
bucket INTEGER NOT NULL,
374+
op_id INTEGER NOT NULL,
375+
row_type TEXT,
376+
row_id TEXT,
377+
key TEXT,
378+
data ANY,
379+
hash INTEGER NOT NULL) STRICT
380+
;CREATE TABLE ps_sync_state (
381+
priority INTEGER NOT NULL PRIMARY KEY,
382+
last_synced_at TEXT NOT NULL
383+
) STRICT
384+
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
385+
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
386+
;CREATE TABLE ps_updated_rows(
387+
row_type TEXT,
388+
row_id TEXT,
389+
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
390+
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
391+
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
392+
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
393+
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
394+
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
395+
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
396+
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
397+
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
398+
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
399+
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
400+
;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]')
401+
;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]')
402+
;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]')
403+
;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]')
404+
;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"PRAGMA writable_schema = ON;"},{"sql":"UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';"},{"sql":"UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';"},{"sql":"PRAGMA writable_schema = OFF;"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]')
357405
''',
358406
};
359407

@@ -456,6 +504,17 @@ const data1 = <int, String>{
456504
(2, 3, 'lists', 'l1', '', '{}', 3)
457505
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
458506
('lists', 'l2')
507+
''',
508+
11: r'''
509+
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES
510+
(1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0),
511+
(2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0)
512+
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
513+
(1, 1, 'todos', 't1', '', '{}', 100),
514+
(1, 2, 'todos', 't2', '', '{}', 20),
515+
(2, 3, 'lists', 'l1', '', '{}', 3)
516+
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
517+
('lists', 'l2')
459518
'''
460519
};
461520

@@ -501,6 +560,7 @@ final dataDown1 = <int, String>{
501560
7: data1[5]!,
502561
8: data1[5]!,
503562
9: data1[9]!,
563+
10: data1[9]!,
504564
};
505565

506566
final finalData1 = data1[databaseVersion]!;

0 commit comments

Comments
 (0)