diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 440548d..faed173 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -12,7 +12,7 @@ use crate::error::{PSResult, PowerSyncError}; use crate::fix_data::apply_v035_fix; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 10; +pub const LATEST_VERSION: i32 = 11; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -384,5 +384,26 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array( .into_db_result(local_db)?; } + if current_version < 11 && target_version >= 11 { + local_db.exec_safe("PRAGMA writable_schema = ON;")?; + local_db + .exec_safe("UPDATE sqlite_schema SET sql = replace(sql, 'data TEXT', 'data ANY') WHERE name = 'ps_oplog'")?; + local_db.exec_safe("PRAGMA writable_schema = RESET;")?; + + local_db + .exec_safe( + "\ +INSERT INTO ps_migration(id, down_migrations) VALUES (11, json_array( + json_object('sql', 'PRAGMA writable_schema = ON;'), + json_object('sql', 'UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';'), + json_object('sql', 'UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';'), + json_object('sql', 'PRAGMA writable_schema = OFF;'), + json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11') +)); + ", + ) + .into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index 5e4ee01..7ae3651 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -45,7 +45,7 @@ RETURNING op_id, hash", // language=SQLite let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; +INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, jsonb(?), ?)")?; insert_statement.bind_int64(1, bucket_id)?; let updated_row_statement = db.prepare_v2( diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index ec4a886..ef4c33c 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -145,33 +145,43 @@ impl<'a> SyncOperation<'a> { let mut untyped_delete_statement: Option = None; let mut untyped_insert_statement: Option = None; + let mut jsonb2json: Option = None; + while statement.step().into_db_result(self.db)? == ResultCode::ROW { let type_name = statement.column_text(0)?; let id = statement.column_text(1)?; - let data = statement.column_text(2); + // data can either be null (delete), or a JSON object as text or blob (put) + let data_value = statement.column_value(2)?; + let has_data = data_value.value_type() != ColumnType::Null; if let Some(known) = self.schema.tables.get_mut(type_name) { if let Some(raw) = &mut known.raw { - match data { - Ok(data) => { - let stmt = raw.put_statement(self.db)?; - let parsed: serde_json::Value = serde_json::from_str(data) - .map_err(PowerSyncError::json_local_error)?; - stmt.bind_for_put(id, &parsed)?; - stmt.stmt.exec()?; - } - Err(_) => { - let stmt = raw.delete_statement(self.db)?; - stmt.bind_for_delete(id)?; - stmt.stmt.exec()?; - } + if has_data { + // data_value could be jsonb, but we need json to parse it for raw statements. + let convert_stmt = match &jsonb2json { + Some(stmt) => stmt, + None => jsonb2json.insert(self.db.prepare_v2("SELECT json(?)")?), + }; + + convert_stmt.reset()?; + convert_stmt.bind_value(1, data_value)?; + convert_stmt.step()?; + let data = convert_stmt.column_text(0)?; + + let stmt = raw.put_statement(self.db)?; + let parsed: serde_json::Value = + serde_json::from_str(data).map_err(PowerSyncError::json_local_error)?; + stmt.bind_for_put(id, &parsed)?; + stmt.stmt.exec()?; + } else { + let stmt = raw.delete_statement(self.db)?; + stmt.bind_for_delete(id)?; + stmt.stmt.exec()?; } } else { let quoted = quote_internal_name(type_name, false); - // is_err() is essentially a NULL check here. - // NULL data means no PUT operations found, so we delete the row. - if data.is_err() { + if !has_data { // DELETE if last_delete_table.as_deref() != Some("ed) { // Prepare statement when the table changed @@ -204,12 +214,12 @@ impl<'a> SyncOperation<'a> { let insert_statement = last_insert_statement.as_mut().unwrap(); insert_statement.reset()?; insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; + insert_statement.bind_value(2, data_value)?; insert_statement.exec()?; } } } else { - if data.is_err() { + if !has_data { // DELETE if untyped_delete_statement.is_none() { // Prepare statement on first use @@ -240,7 +250,7 @@ impl<'a> SyncOperation<'a> { insert_statement.reset()?; insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; + insert_statement.bind_value(3, data_value)?; insert_statement.exec()?; } } diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 29a338c..73ae33c 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 10; +const databaseVersion = 11; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -354,6 +354,54 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +''', + 11: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data ANY, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"PRAGMA writable_schema = ON;"},{"sql":"UPDATE ps_oplog SET data = json(data) WHERE typeof(data) = ''blob'';"},{"sql":"UPDATE sqlite_schema SET sql = replace(sql, ''data ANY'', ''data TEXT'') WHERE name = ''ps_oplog'';"},{"sql":"PRAGMA writable_schema = OFF;"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') ''', }; @@ -456,6 +504,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 11: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -501,6 +560,7 @@ final dataDown1 = { 7: data1[5]!, 8: data1[5]!, 9: data1[9]!, + 10: data1[9]!, }; final finalData1 = data1[databaseVersion]!;