Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bindings/rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ impl Connection {
conn.set_busy_timeout(duration);
Ok(())
}

/// Create a point-in-time snapshot of the database to the specified path.
/// Note: Does not work with in-memory databases or MVCC mode.
pub fn snapshot(&self, output_path: &str) -> Result<()> {
let conn = self.get_inner_connection()?;
conn.snapshot(output_path)?;
Ok(())
}
}

impl Debug for Connection {
Expand Down
18 changes: 3 additions & 15 deletions cli/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use crate::{
},
config::Config,
helper::LimboHelper,
input::{
get_io, get_writer, ApplyWriter, DbLocation, NoopProgress, OutputMode, ProgressSink,
Settings, StderrProgress,
},
input::{get_io, get_writer, DbLocation, NoopProgress, OutputMode, ProgressSink, Settings},
manual,
opcodes_dictionary::OPCODE_DESCRIPTIONS,
read_state_machine::ReadState,
Expand Down Expand Up @@ -1789,17 +1786,8 @@ impl Limbo {
}

fn clone_database(&mut self, output_file: &str) -> anyhow::Result<()> {
use std::path::Path;
if Path::new(output_file).exists() {
anyhow::bail!("Refusing to overwrite existing file: {output_file}");
}
let io: Arc<dyn turso_core::IO> = Arc::new(turso_core::PlatformIO::new()?);
let db = Database::open_file(io.clone(), output_file)?;
let target = db.connect()?;

let mut applier = ApplyWriter::new(&target);
Self::dump_database_from_conn(false, self.conn.clone(), &mut applier, StderrProgress)?;
applier.finish()?;
self.conn.snapshot(output_file)?;
let _ = self.writeln(format!("Database cloned to '{output_file}'"));
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions cli/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ pub enum Command {
/// Show names of indexes
#[command(name = "indexes", display_name = ".indexes")]
ListIndexes(IndexesArgs),
/// Toggle query execution timing on/off
#[command(name = "timer", display_name = ".timer")]
Timer(TimerArgs),
/// Toggle column headers on/off in list mode
#[command(name = "headers", display_name = ".headers")]
Headers(HeadersArgs),
/// Clone the current database to a new file
#[command(name = "clone", display_name = ".clone")]
Clone(CloneArgs),
/// Display manual pages for features
Expand Down
42 changes: 21 additions & 21 deletions cli/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ pub struct ApplyWriter<'a> {
}

impl<'a> ApplyWriter<'a> {
pub fn new(target: &'a Arc<turso_core::Connection>) -> Self {
Self {
target,
buf: Vec::new(),
}
}
// pub fn new(target: &'a Arc<turso_core::Connection>) -> Self {
// Self {
// target,
// buf: Vec::new(),
// }
// }

// Find the next statement terminator ;\n or ;\r\n in a byte buffer.
// Returns (end_idx_inclusive, drain_len), where drain_len includes the newline(s).
Expand Down Expand Up @@ -231,21 +231,21 @@ impl<'a> ApplyWriter<'a> {
}

// Handle final trailing statement that ends with ';' followed only by ASCII whitespace.
pub fn finish(mut self) -> io::Result<()> {
// Skip if buffer empty or no ';'
if let Some(semicolon_pos) = self.buf.iter().rposition(|&b| b == b';') {
// Are all bytes after ';' ASCII whitespace?
if self.buf[semicolon_pos + 1..]
.iter()
.all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
{
let stmt_bytes = self.buf[..=semicolon_pos].to_vec();
self.buf.clear();
self.exec_stmt_bytes(&stmt_bytes)?;
}
}
Ok(())
}
// pub fn finish(mut self) -> io::Result<()> {
// // Skip if buffer empty or no ';'
// if let Some(semicolon_pos) = self.buf.iter().rposition(|&b| b == b';') {
// // Are all bytes after ';' ASCII whitespace?
// if self.buf[semicolon_pos + 1..]
// .iter()
// .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
// {
// let stmt_bytes = self.buf[..=semicolon_pos].to_vec();
// self.buf.clear();
// self.exec_stmt_bytes(&stmt_bytes)?;
// }
// }
// Ok(())
// }

fn exec_stmt_bytes(&self, stmt_bytes: &[u8]) -> io::Result<()> {
// SQL must be UTF-8. If not, surface a clear error.
Expand Down
103 changes: 103 additions & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2012,6 +2012,109 @@ impl Connection {
}
}

/// Create a point-in-time snapshot of the database by copying the database file.
#[cfg(feature = "fs")]
pub fn snapshot(self: &Arc<Self>, output_path: &str) -> Result<()> {
self._snapshot_copy_file(output_path)
}

/// Create a point-in-time snapshot of the database by copying the database file.
///
/// This function:
/// 1. Performs a TRUNCATE checkpoint to flush all WAL data to the database file
/// 2. Acquires a read lock to prevent other checkpoints during copy
/// 3. Copies the database file (new writes go to WAL, not DB file)
/// 4. Releases the read lock
#[cfg(feature = "fs")]
fn _snapshot_copy_file(self: &Arc<Self>, output_path: &str) -> Result<()> {
use crate::util::MEMORY_PATH;
use std::path::Path;

if self.is_closed() {
return Err(LimboError::InternalError("Connection closed".to_string()));
}

// FIXME: enable mvcc
if self.mvcc_enabled() {
return Err(LimboError::InternalError(
"Snapshot not yet supported with MVCC mode".to_string(),
));
}

// Cannot snapshot in-memory databases
if self.db.path == MEMORY_PATH {
return Err(LimboError::InvalidArgument(
"Cannot snapshot in-memory database".to_string(),
));
}

let path = Path::new(output_path);
if path.exists() {
return Err(LimboError::InvalidArgument(format!(
"Output file already exists: {output_path}"
)));
}
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
LimboError::InternalError(format!("Failed to create parent directory: {e}"))
})?;
}

let pager = self.pager.load();
let Some(wal) = pager.wal.as_ref() else {
return Err(LimboError::InternalError(
"Cannot snapshot database without WAL".to_string(),
));
};

// There is a race condition after checkpointing and before copying database file.
// Another checkpoint may run in this window and we may end up with corrupted db file
// To prevent checkpoint to run in this window, we will try to acquire read lock with slot 0
// Holding slot 0 blocks ALL checkpoints (they need exclusive read_locks[0]).

// Retry loop: TRUNCATE checkpoint + begin_read_tx until we acquire slot 0.
const MAX_RETRIES: u32 = 10;
for attempt in 0..MAX_RETRIES {
// TRUNCATE checkpoint - flushes all WAL data to DB file and empties WAL
let _ = pager.blocking_checkpoint(
CheckpointMode::Truncate {
upper_bound_inclusive: None,
},
self.get_sync_mode(),
)?;

// Immediately try to acquire read lock
pager.begin_read_tx()?;

// Check if we got slot 0 (WAL was still empty, no writer snuck in)
if wal.get_read_lock_slot() == Some(0) {
break; // Success - slot 0 blocks all checkpoints
}

// A writer snuck in between TRUNCATE and begin_read_tx - we got slot 1-4
// This doesn't fully protect against other checkpoints, so retry
wal.end_read_tx();

if attempt == MAX_RETRIES - 1 {
return Err(LimboError::Busy);
}
}

// Copy the database file
let result = (|| -> Result<()> {
let source_path = Path::new(&self.db.path);
std::fs::copy(source_path, path).map_err(|e| {
LimboError::InternalError(format!("Failed to copy database file: {e}"))
})?;
Ok(())
})();

// Release read lock
wal.end_read_tx();

result
}

/// Close a connection and checkpoint.
pub fn close(&self) -> Result<()> {
if self.is_closed() {
Expand Down
13 changes: 13 additions & 0 deletions core/storage/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ pub trait Wal: Debug + Send + Sync {
/// End a write transaction.
fn end_write_tx(&self);

/// Get the current read lock slot index, or None if no read lock is held.
fn get_read_lock_slot(&self) -> Option<usize>;

/// Find the latest frame containing a page.
///
/// optional frame_watermark parameter can be passed to force WAL to find frame not larger than watermark value
Expand Down Expand Up @@ -1092,6 +1095,16 @@ impl Wal for WalFile {
}
}

/// Get the current read lock slot index, or None if no read lock is held.
fn get_read_lock_slot(&self) -> Option<usize> {
let slot = self.max_frame_read_lock_index.load(Ordering::Acquire);
if slot == NO_LOCK_HELD {
None
} else {
Some(slot)
}
}

/// Begin a write transaction
#[instrument(skip_all, level = Level::DEBUG)]
fn begin_write_tx(&self) -> Result<()> {
Expand Down
10 changes: 9 additions & 1 deletion sdk-kit/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* automatically generated by rust-bindgen 0.71.1 */
/* automatically generated by rust-bindgen 0.72.1 */

#[doc = " SAFETY: slice with non-null ptr must points to the valid memory range [ptr..ptr + len)\n ownership of the slice is not transferred - so its either caller owns the data or turso\n as the owner doesn't change - there is no method to free the slice reference - because:\n 1. if tursodb owns it - it will clean it in appropriate time\n 2. if caller owns it - it must clean it in appropriate time with appropriate method and tursodb doesn't know how to properly free the data"]
#[repr(C)]
Expand Down Expand Up @@ -255,6 +255,14 @@ unsafe extern "C" {
error_opt_out: *mut *const ::std::os::raw::c_char,
) -> turso_status_code_t;
}
unsafe extern "C" {
#[doc = " Create a point-in-time snapshot of the database to the specified path\n Note: Does not work with in-memory databases or MVCC mode"]
pub fn turso_connection_snapshot(
self_: *const turso_connection_t,
output_path: *const ::std::os::raw::c_char,
error_opt_out: *mut *const ::std::os::raw::c_char,
) -> turso_status_code_t;
}
unsafe extern "C" {
#[doc = " Execute single statement\n execute returns TURSO_DONE if execution completed\n execute returns TURSO_IO if async_io was set and execution needs IO in order to make progress"]
pub fn turso_statement_execute(
Expand Down
21 changes: 21 additions & 0 deletions sdk-kit/src/capi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,27 @@ pub extern "C" fn turso_connection_close(
}
}

#[no_mangle]
#[signature(c)]
pub extern "C" fn turso_connection_snapshot(
connection: *const c::turso_connection_t,
output_path: *const std::ffi::c_char,
error_opt_out: *mut *const std::ffi::c_char,
) -> c::turso_status_code_t {
let output_path = match unsafe { str_from_c_str(output_path) } {
Ok(path) => path,
Err(err) => return unsafe { err.to_capi(error_opt_out) },
};
let connection = match unsafe { TursoConnection::ref_from_capi(connection) } {
Ok(connection) => connection,
Err(err) => return unsafe { err.to_capi(error_opt_out) },
};
match connection.snapshot(output_path) {
Ok(()) => c::turso_status_code_t::TURSO_OK,
Err(err) => unsafe { err.to_capi(error_opt_out) },
}
}

#[no_mangle]
#[signature(c)]
pub extern "C" fn turso_statement_run_io(
Expand Down
6 changes: 6 additions & 0 deletions sdk-kit/src/rsapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,12 @@ impl TursoConnection {
Ok(())
}

/// Create a point-in-time snapshot of the database to the specified path
pub fn snapshot(&self, output_path: &str) -> Result<(), TursoError> {
self.connection.snapshot(output_path)?;
Ok(())
}

/// helper method to get C raw container to the TursoConnection instance
/// this method is used in the capi wrappers
pub fn to_capi(self: Arc<Self>) -> *mut capi::c::turso_connection_t {
Expand Down
10 changes: 10 additions & 0 deletions sdk-kit/turso.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ turso_status_code_t turso_connection_close(
/** Optional return error parameter (can be null) */
const char **error_opt_out);

/** Create a point-in-time snapshot of the database to the specified path
* Note: Does not work with in-memory databases or MVCC mode
*/
turso_status_code_t turso_connection_snapshot(
const turso_connection_t *self,
/* zero-terminated C string path for output file */
const char *output_path,
/** Optional return error parameter (can be null) */
const char **error_opt_out);

/** Execute single statement
* execute returns TURSO_DONE if execution completed
* execute returns TURSO_IO if async_io was set and execution needs IO in order to make progress
Expand Down
31 changes: 16 additions & 15 deletions testing/cli_tests/cli_test_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,22 @@ def test_uri_readonly():


def test_copy_db_file():
sourcepath = "testing/test_source.db"
testpath = "testing/test_copy.db"
if Path(testpath).exists():
os.unlink(Path(testpath))
time.sleep(0.2) # make sure closed
for p in [sourcepath, testpath]:
if Path(p).exists():
os.unlink(Path(p))
time.sleep(0.2) # make sure closed
time.sleep(0.3)
# Must use file-based database, not in-memory (snapshot doesn't support in-memory)
turso = TestTursoShell(init_commands="")
turso.execute_dot(f".open {sourcepath}")
turso.execute_dot("create table testing(a,b,c);")
turso.run_test_fn(".schema", lambda x: "CREATE TABLE testing (a, b, c)" in x, "test-database-has-expected-schema")
for i in range(100):
turso.execute_dot(f"insert into testing (a,b,c) values ({i},{i + 1}, {i + 2});")
turso.run_test_fn("SELECT COUNT(*) FROM testing;", lambda x: "100" == x, "test-database-has-expected-count")
turso.run_test_fn(f".clone {testpath}", lambda res: "testing... done" in res)
turso.run_test_fn(f".clone {testpath}", lambda res: f"Database cloned to '{testpath}'" in res)

turso.execute_dot(f".open {testpath}")
turso.run_test_fn(".schema", lambda x: "CREATE TABLE testing" in x, "test-copied-database-has-expected-schema")
Expand All @@ -334,26 +338,23 @@ def test_copy_db_file():


def test_copy_memory_db_to_file():
"""Test that cloning in-memory database returns an error (not supported)"""
testpath = "testing/memory.db"
if Path(testpath).exists():
os.unlink(Path(testpath))
time.sleep(0.2) # make sure closed

turso = TestTursoShell(init_commands="")
turso.execute_dot("create table testing(a,b,c);")
for i in range(100):
for i in range(10):
turso.execute_dot(f"insert into testing (a, b, c) values ({i},{i + 1}, {i + 2});")
turso.run_test_fn(f".clone {testpath}", lambda res: "testing... done" in res)
turso.quit()
time.sleep(0.3)
sqlite = TestTursoShell(exec_name="sqlite3", flags=f" {testpath}")
sqlite.run_test_fn(
".schema", lambda x: "CREATE TABLE testing (a, b, c)" in x, "test-copied-database-has-expected-schema"
)
sqlite.run_test_fn(
"SELECT COUNT(*) FROM testing;", lambda x: "100" == x, "test-copied-database-has-expected-user-count"
# Snapshot/clone of in-memory database is not supported
turso.run_test_fn(
f".clone {testpath}",
lambda res: "Cannot snapshot in-memory database" in res,
"clone-memory-db-returns-error"
)
sqlite.quit()
turso.quit()


def test_parse_error():
Expand Down
Loading
Loading