diff --git a/bindings/rust/src/connection.rs b/bindings/rust/src/connection.rs index f0f7be8f76..7af8ae9e97 100644 --- a/bindings/rust/src/connection.rs +++ b/bindings/rust/src/connection.rs @@ -229,6 +229,14 @@ impl Connection { conn.set_busy_timeout(duration); Ok(()) } + + /// Create a point-in-time snapshot of the database to the specified path. + /// Note: Does not work with in-memory databases or MVCC mode. + pub fn snapshot(&self, output_path: &str) -> Result<()> { + let conn = self.get_inner_connection()?; + conn.snapshot(output_path)?; + Ok(()) + } } impl Debug for Connection { diff --git a/cli/app.rs b/cli/app.rs index 22eadc924d..15c32bbb40 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -6,10 +6,7 @@ use crate::{ }, config::Config, helper::LimboHelper, - input::{ - get_io, get_writer, ApplyWriter, DbLocation, NoopProgress, OutputMode, ProgressSink, - Settings, StderrProgress, - }, + input::{get_io, get_writer, DbLocation, NoopProgress, OutputMode, ProgressSink, Settings}, manual, opcodes_dictionary::OPCODE_DESCRIPTIONS, read_state_machine::ReadState, @@ -1789,17 +1786,8 @@ impl Limbo { } fn clone_database(&mut self, output_file: &str) -> anyhow::Result<()> { - use std::path::Path; - if Path::new(output_file).exists() { - anyhow::bail!("Refusing to overwrite existing file: {output_file}"); - } - let io: Arc = Arc::new(turso_core::PlatformIO::new()?); - let db = Database::open_file(io.clone(), output_file)?; - let target = db.connect()?; - - let mut applier = ApplyWriter::new(&target); - Self::dump_database_from_conn(false, self.conn.clone(), &mut applier, StderrProgress)?; - applier.finish()?; + self.conn.snapshot(output_file)?; + let _ = self.writeln(format!("Database cloned to '{output_file}'")); Ok(()) } diff --git a/cli/commands/mod.rs b/cli/commands/mod.rs index bab71f5f87..efc92a0abc 100644 --- a/cli/commands/mod.rs +++ b/cli/commands/mod.rs @@ -87,11 +87,13 @@ pub enum Command { /// Show names of indexes #[command(name = "indexes", display_name = ".indexes")] ListIndexes(IndexesArgs), + /// Toggle query execution timing on/off #[command(name = "timer", display_name = ".timer")] Timer(TimerArgs), /// Toggle column headers on/off in list mode #[command(name = "headers", display_name = ".headers")] Headers(HeadersArgs), + /// Clone the current database to a new file #[command(name = "clone", display_name = ".clone")] Clone(CloneArgs), /// Display manual pages for features diff --git a/cli/input.rs b/cli/input.rs index 1a0ce034b2..3667dc377a 100644 --- a/cli/input.rs +++ b/cli/input.rs @@ -191,12 +191,12 @@ pub struct ApplyWriter<'a> { } impl<'a> ApplyWriter<'a> { - pub fn new(target: &'a Arc) -> Self { - Self { - target, - buf: Vec::new(), - } - } + // pub fn new(target: &'a Arc) -> Self { + // Self { + // target, + // buf: Vec::new(), + // } + // } // Find the next statement terminator ;\n or ;\r\n in a byte buffer. // Returns (end_idx_inclusive, drain_len), where drain_len includes the newline(s). @@ -231,21 +231,21 @@ impl<'a> ApplyWriter<'a> { } // Handle final trailing statement that ends with ';' followed only by ASCII whitespace. - pub fn finish(mut self) -> io::Result<()> { - // Skip if buffer empty or no ';' - if let Some(semicolon_pos) = self.buf.iter().rposition(|&b| b == b';') { - // Are all bytes after ';' ASCII whitespace? - if self.buf[semicolon_pos + 1..] - .iter() - .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n')) - { - let stmt_bytes = self.buf[..=semicolon_pos].to_vec(); - self.buf.clear(); - self.exec_stmt_bytes(&stmt_bytes)?; - } - } - Ok(()) - } + // pub fn finish(mut self) -> io::Result<()> { + // // Skip if buffer empty or no ';' + // if let Some(semicolon_pos) = self.buf.iter().rposition(|&b| b == b';') { + // // Are all bytes after ';' ASCII whitespace? + // if self.buf[semicolon_pos + 1..] + // .iter() + // .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n')) + // { + // let stmt_bytes = self.buf[..=semicolon_pos].to_vec(); + // self.buf.clear(); + // self.exec_stmt_bytes(&stmt_bytes)?; + // } + // } + // Ok(()) + // } fn exec_stmt_bytes(&self, stmt_bytes: &[u8]) -> io::Result<()> { // SQL must be UTF-8. If not, surface a clear error. diff --git a/core/lib.rs b/core/lib.rs index 24538a269c..9d6c1ccc68 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -2012,6 +2012,109 @@ impl Connection { } } + /// Create a point-in-time snapshot of the database by copying the database file. + #[cfg(feature = "fs")] + pub fn snapshot(self: &Arc, output_path: &str) -> Result<()> { + self._snapshot_copy_file(output_path) + } + + /// Create a point-in-time snapshot of the database by copying the database file. + /// + /// This function: + /// 1. Performs a TRUNCATE checkpoint to flush all WAL data to the database file + /// 2. Acquires a read lock to prevent other checkpoints during copy + /// 3. Copies the database file (new writes go to WAL, not DB file) + /// 4. Releases the read lock + #[cfg(feature = "fs")] + fn _snapshot_copy_file(self: &Arc, output_path: &str) -> Result<()> { + use crate::util::MEMORY_PATH; + use std::path::Path; + + if self.is_closed() { + return Err(LimboError::InternalError("Connection closed".to_string())); + } + + // FIXME: enable mvcc + if self.mvcc_enabled() { + return Err(LimboError::InternalError( + "Snapshot not yet supported with MVCC mode".to_string(), + )); + } + + // Cannot snapshot in-memory databases + if self.db.path == MEMORY_PATH { + return Err(LimboError::InvalidArgument( + "Cannot snapshot in-memory database".to_string(), + )); + } + + let path = Path::new(output_path); + if path.exists() { + return Err(LimboError::InvalidArgument(format!( + "Output file already exists: {output_path}" + ))); + } + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|e| { + LimboError::InternalError(format!("Failed to create parent directory: {e}")) + })?; + } + + let pager = self.pager.load(); + let Some(wal) = pager.wal.as_ref() else { + return Err(LimboError::InternalError( + "Cannot snapshot database without WAL".to_string(), + )); + }; + + // There is a race condition after checkpointing and before copying database file. + // Another checkpoint may run in this window and we may end up with corrupted db file + // To prevent checkpoint to run in this window, we will try to acquire read lock with slot 0 + // Holding slot 0 blocks ALL checkpoints (they need exclusive read_locks[0]). + + // Retry loop: TRUNCATE checkpoint + begin_read_tx until we acquire slot 0. + const MAX_RETRIES: u32 = 10; + for attempt in 0..MAX_RETRIES { + // TRUNCATE checkpoint - flushes all WAL data to DB file and empties WAL + let _ = pager.blocking_checkpoint( + CheckpointMode::Truncate { + upper_bound_inclusive: None, + }, + self.get_sync_mode(), + )?; + + // Immediately try to acquire read lock + pager.begin_read_tx()?; + + // Check if we got slot 0 (WAL was still empty, no writer snuck in) + if wal.get_read_lock_slot() == Some(0) { + break; // Success - slot 0 blocks all checkpoints + } + + // A writer snuck in between TRUNCATE and begin_read_tx - we got slot 1-4 + // This doesn't fully protect against other checkpoints, so retry + wal.end_read_tx(); + + if attempt == MAX_RETRIES - 1 { + return Err(LimboError::Busy); + } + } + + // Copy the database file + let result = (|| -> Result<()> { + let source_path = Path::new(&self.db.path); + std::fs::copy(source_path, path).map_err(|e| { + LimboError::InternalError(format!("Failed to copy database file: {e}")) + })?; + Ok(()) + })(); + + // Release read lock + wal.end_read_tx(); + + result + } + /// Close a connection and checkpoint. pub fn close(&self) -> Result<()> { if self.is_closed() { diff --git a/core/storage/wal.rs b/core/storage/wal.rs index cce0e643e2..a4aa362786 100644 --- a/core/storage/wal.rs +++ b/core/storage/wal.rs @@ -261,6 +261,9 @@ pub trait Wal: Debug + Send + Sync { /// End a write transaction. fn end_write_tx(&self); + /// Get the current read lock slot index, or None if no read lock is held. + fn get_read_lock_slot(&self) -> Option; + /// Find the latest frame containing a page. /// /// optional frame_watermark parameter can be passed to force WAL to find frame not larger than watermark value @@ -1092,6 +1095,16 @@ impl Wal for WalFile { } } + /// Get the current read lock slot index, or None if no read lock is held. + fn get_read_lock_slot(&self) -> Option { + let slot = self.max_frame_read_lock_index.load(Ordering::Acquire); + if slot == NO_LOCK_HELD { + None + } else { + Some(slot) + } + } + /// Begin a write transaction #[instrument(skip_all, level = Level::DEBUG)] fn begin_write_tx(&self) -> Result<()> { diff --git a/sdk-kit/src/bindings.rs b/sdk-kit/src/bindings.rs index 389625f177..68ed2851bd 100644 --- a/sdk-kit/src/bindings.rs +++ b/sdk-kit/src/bindings.rs @@ -1,4 +1,4 @@ -/* automatically generated by rust-bindgen 0.71.1 */ +/* automatically generated by rust-bindgen 0.72.1 */ #[doc = " SAFETY: slice with non-null ptr must points to the valid memory range [ptr..ptr + len)\n ownership of the slice is not transferred - so its either caller owns the data or turso\n as the owner doesn't change - there is no method to free the slice reference - because:\n 1. if tursodb owns it - it will clean it in appropriate time\n 2. if caller owns it - it must clean it in appropriate time with appropriate method and tursodb doesn't know how to properly free the data"] #[repr(C)] @@ -255,6 +255,14 @@ unsafe extern "C" { error_opt_out: *mut *const ::std::os::raw::c_char, ) -> turso_status_code_t; } +unsafe extern "C" { + #[doc = " Create a point-in-time snapshot of the database to the specified path\n Note: Does not work with in-memory databases or MVCC mode"] + pub fn turso_connection_snapshot( + self_: *const turso_connection_t, + output_path: *const ::std::os::raw::c_char, + error_opt_out: *mut *const ::std::os::raw::c_char, + ) -> turso_status_code_t; +} unsafe extern "C" { #[doc = " Execute single statement\n execute returns TURSO_DONE if execution completed\n execute returns TURSO_IO if async_io was set and execution needs IO in order to make progress"] pub fn turso_statement_execute( diff --git a/sdk-kit/src/capi.rs b/sdk-kit/src/capi.rs index 1ec8df80a7..12641393b2 100644 --- a/sdk-kit/src/capi.rs +++ b/sdk-kit/src/capi.rs @@ -207,6 +207,27 @@ pub extern "C" fn turso_connection_close( } } +#[no_mangle] +#[signature(c)] +pub extern "C" fn turso_connection_snapshot( + connection: *const c::turso_connection_t, + output_path: *const std::ffi::c_char, + error_opt_out: *mut *const std::ffi::c_char, +) -> c::turso_status_code_t { + let output_path = match unsafe { str_from_c_str(output_path) } { + Ok(path) => path, + Err(err) => return unsafe { err.to_capi(error_opt_out) }, + }; + let connection = match unsafe { TursoConnection::ref_from_capi(connection) } { + Ok(connection) => connection, + Err(err) => return unsafe { err.to_capi(error_opt_out) }, + }; + match connection.snapshot(output_path) { + Ok(()) => c::turso_status_code_t::TURSO_OK, + Err(err) => unsafe { err.to_capi(error_opt_out) }, + } +} + #[no_mangle] #[signature(c)] pub extern "C" fn turso_statement_run_io( diff --git a/sdk-kit/src/rsapi.rs b/sdk-kit/src/rsapi.rs index 4756141e29..096dac7147 100644 --- a/sdk-kit/src/rsapi.rs +++ b/sdk-kit/src/rsapi.rs @@ -696,6 +696,12 @@ impl TursoConnection { Ok(()) } + /// Create a point-in-time snapshot of the database to the specified path + pub fn snapshot(&self, output_path: &str) -> Result<(), TursoError> { + self.connection.snapshot(output_path)?; + Ok(()) + } + /// helper method to get C raw container to the TursoConnection instance /// this method is used in the capi wrappers pub fn to_capi(self: Arc) -> *mut capi::c::turso_connection_t { diff --git a/sdk-kit/turso.h b/sdk-kit/turso.h index 7062ba6d1e..9943c609bc 100644 --- a/sdk-kit/turso.h +++ b/sdk-kit/turso.h @@ -194,6 +194,16 @@ turso_status_code_t turso_connection_close( /** Optional return error parameter (can be null) */ const char **error_opt_out); +/** Create a point-in-time snapshot of the database to the specified path + * Note: Does not work with in-memory databases or MVCC mode + */ +turso_status_code_t turso_connection_snapshot( + const turso_connection_t *self, + /* zero-terminated C string path for output file */ + const char *output_path, + /** Optional return error parameter (can be null) */ + const char **error_opt_out); + /** Execute single statement * execute returns TURSO_DONE if execution completed * execute returns TURSO_IO if async_io was set and execution needs IO in order to make progress diff --git a/testing/cli_tests/cli_test_cases.py b/testing/cli_tests/cli_test_cases.py index 10ad907658..db6103abc1 100755 --- a/testing/cli_tests/cli_test_cases.py +++ b/testing/cli_tests/cli_test_cases.py @@ -314,18 +314,22 @@ def test_uri_readonly(): def test_copy_db_file(): + sourcepath = "testing/test_source.db" testpath = "testing/test_copy.db" - if Path(testpath).exists(): - os.unlink(Path(testpath)) - time.sleep(0.2) # make sure closed + for p in [sourcepath, testpath]: + if Path(p).exists(): + os.unlink(Path(p)) + time.sleep(0.2) # make sure closed time.sleep(0.3) + # Must use file-based database, not in-memory (snapshot doesn't support in-memory) turso = TestTursoShell(init_commands="") + turso.execute_dot(f".open {sourcepath}") turso.execute_dot("create table testing(a,b,c);") turso.run_test_fn(".schema", lambda x: "CREATE TABLE testing (a, b, c)" in x, "test-database-has-expected-schema") for i in range(100): turso.execute_dot(f"insert into testing (a,b,c) values ({i},{i + 1}, {i + 2});") turso.run_test_fn("SELECT COUNT(*) FROM testing;", lambda x: "100" == x, "test-database-has-expected-count") - turso.run_test_fn(f".clone {testpath}", lambda res: "testing... done" in res) + turso.run_test_fn(f".clone {testpath}", lambda res: f"Database cloned to '{testpath}'" in res) turso.execute_dot(f".open {testpath}") turso.run_test_fn(".schema", lambda x: "CREATE TABLE testing" in x, "test-copied-database-has-expected-schema") @@ -334,6 +338,7 @@ def test_copy_db_file(): def test_copy_memory_db_to_file(): + """Test that cloning in-memory database returns an error (not supported)""" testpath = "testing/memory.db" if Path(testpath).exists(): os.unlink(Path(testpath)) @@ -341,19 +346,15 @@ def test_copy_memory_db_to_file(): turso = TestTursoShell(init_commands="") turso.execute_dot("create table testing(a,b,c);") - for i in range(100): + for i in range(10): turso.execute_dot(f"insert into testing (a, b, c) values ({i},{i + 1}, {i + 2});") - turso.run_test_fn(f".clone {testpath}", lambda res: "testing... done" in res) - turso.quit() - time.sleep(0.3) - sqlite = TestTursoShell(exec_name="sqlite3", flags=f" {testpath}") - sqlite.run_test_fn( - ".schema", lambda x: "CREATE TABLE testing (a, b, c)" in x, "test-copied-database-has-expected-schema" - ) - sqlite.run_test_fn( - "SELECT COUNT(*) FROM testing;", lambda x: "100" == x, "test-copied-database-has-expected-user-count" + # Snapshot/clone of in-memory database is not supported + turso.run_test_fn( + f".clone {testpath}", + lambda res: "Cannot snapshot in-memory database" in res, + "clone-memory-db-returns-error" ) - sqlite.quit() + turso.quit() def test_parse_error(): diff --git a/tests/integration/functions/mod.rs b/tests/integration/functions/mod.rs index 3ba68c8334..e4237674dc 100644 --- a/tests/integration/functions/mod.rs +++ b/tests/integration/functions/mod.rs @@ -1,4 +1,5 @@ mod test_cdc; mod test_function_rowid; +mod test_snapshot; mod test_sum; mod test_wal_api; diff --git a/tests/integration/functions/test_snapshot.rs b/tests/integration/functions/test_snapshot.rs new file mode 100644 index 0000000000..f418079b5f --- /dev/null +++ b/tests/integration/functions/test_snapshot.rs @@ -0,0 +1,64 @@ +use tempfile::TempDir; + +use crate::common::{ExecRows, TempDatabase}; + +#[turso_macros::test()] +fn test_snapshot_basic(db: TempDatabase) { + let conn = db.connect_limbo(); + + // Create table and insert data + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y TEXT)") + .unwrap(); + conn.execute("INSERT INTO t VALUES (1, 'hello'), (2, 'world')") + .unwrap(); + + // Create snapshot + let temp_dir = TempDir::new().unwrap(); + let snapshot_path = temp_dir.path().join("snapshot.db"); + + conn.snapshot(snapshot_path.to_str().unwrap()).unwrap(); + + // Verify snapshot exists + assert!(snapshot_path.exists()); + + // Open snapshot and verify data + let snapshot_db = TempDatabase::new_with_existent(&snapshot_path); + let snapshot_conn = snapshot_db.connect_limbo(); + let rows: Vec<(i64, String)> = snapshot_conn.exec_rows("SELECT x, y FROM t ORDER BY x"); + assert_eq!( + rows, + vec![(1, "hello".to_string()), (2, "world".to_string())] + ); +} + +#[turso_macros::test()] +fn test_snapshot_with_pending_wal_data(db: TempDatabase) { + let conn = db.connect_limbo(); + conn.execute("CREATE TABLE t(x INTEGER PRIMARY KEY, y BLOB)") + .unwrap(); + + // Insert enough data to create multiple WAL frames + for i in 0..100 { + conn.execute(format!("INSERT INTO t VALUES ({i}, randomblob(1000))")) + .unwrap(); + } + + // Verify WAL has frames before snapshot + let wal_state = conn.wal_state().unwrap(); + assert!( + wal_state.max_frame > 0, + "WAL should have frames before snapshot" + ); + + // Create snapshot + let temp_dir = TempDir::new().unwrap(); + let snapshot_path = temp_dir.path().join("snapshot.db"); + + conn.snapshot(snapshot_path.to_str().unwrap()).unwrap(); + + // Verify all data is in snapshot + let snapshot_db = TempDatabase::new_with_existent(&snapshot_path); + let snapshot_conn = snapshot_db.connect_limbo(); + let count: Vec<(i64,)> = snapshot_conn.exec_rows("SELECT COUNT(*) FROM t"); + assert_eq!(count[0].0, 100); +}