Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,31 @@ impl RelationalDB {
}
}

/// Duration after which expired unused views are cleaned up.
/// Value is chosen arbitrarily; can be tuned later if needed.
const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(100);

/// Spawn a background task that periodically cleans up expired views.
pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandle {
tokio::spawn(async move {
let db = &db;
loop {
if let Err(e) = db.with_auto_commit(Workload::Internal, |tx| {
tx.clear_expired_views(VIEWS_EXPIRATION).map_err(DBError::from)
}) {
log::error!(
"[{}] DATABASE: failed to clear expired views: {}",
db.database_identity(),
e
);
}

tokio::time::sleep(VIEWS_EXPIRATION).await;
}
})
.abort_handle()
}

impl RelationalDB {
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
Ok(self.inner.create_table_mut_tx(tx, schema)?)
Expand Down Expand Up @@ -2343,6 +2368,7 @@ mod tests {
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::rc::Rc;
use std::time::Instant;

use super::tests_utils::begin_mut_tx;
use super::*;
Expand All @@ -2362,6 +2388,7 @@ mod tests {
ST_SEQUENCE_ID, ST_TABLE_ID,
};
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::bsatn::to_vec;
use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::Identity;
Expand Down Expand Up @@ -2395,6 +2422,27 @@ mod tests {
TableSchema::from_module_def(&def, table, (), TableId::SENTINEL)
}

fn view_module_def() -> ModuleDef {
let mut builder = RawModuleDefV9Builder::new();

let return_type_ref = builder.add_algebraic_type(
[],
"my_view_return_type",
AlgebraicType::product([("b", AlgebraicType::U8)]),
true,
);
builder.add_view(
"my_view",
0,
true,
false,
ProductType::unit(),
AlgebraicType::array(AlgebraicType::Ref(return_type_ref)),
);
let raw = builder.finish();
raw.try_into().expect("table validation failed")
}

fn table_auto_inc() -> TableSchema {
table(
"MyTable",
Expand Down Expand Up @@ -2479,6 +2527,94 @@ mod tests {
Ok(())
}

#[test]
fn test_views() -> ResultTest<()> {
let stdb = TestDB::durable()?;
let module_def = view_module_def();
let view_def = module_def.view("my_view").unwrap();

let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();

let to_bstan = |pv: &ProductValue| {
Bytes::from(to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed"))
};
let row_pv = |v: u8| ProductValue::from_iter(vec![AlgebraicValue::U8(v)]);

let project_views = |stdb: &TestDB, table_id: TableId, sender: Identity| {
let tx = begin_tx(stdb);
stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into())
.unwrap()
.map(|row| ProductValue {
elements: row.to_product_value().elements.iter().skip(1).cloned().collect(),
})
.collect::<Vec<_>>()
};

// Create the view
let mut tx = begin_mut_tx(&stdb);
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
stdb.commit_tx(tx)?;

let apply_view_update = |sender: Identity, val: u8| -> ResultTest<()> {
let mut tx = begin_mut_tx(&stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bstan(&row_pv(val)), typespace)?;
stdb.commit_tx(tx)?;
Ok(())
};

// Sender 1
let sender1 = Identity::ONE;
apply_view_update(sender1, 42)?;
let mut tx = begin_mut_tx(&stdb);
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender1)?;
stdb.commit_tx(tx)?;

assert_eq!(
project_views(&stdb, table_id, sender1)[0],
row_pv(42),
"Materialized view row does not match inserted row"
);

// Sender 2
let sender2 = Identity::ZERO;
let before_sender2 = Instant::now();
apply_view_update(sender2, 84)?;

assert_eq!(
project_views(&stdb, table_id, sender2)[0],
row_pv(84),
"Materialized view row does not match inserted row"
);

// Clear expired views
let mut tx = begin_mut_tx(&stdb);
tx.clear_expired_views(Instant::now().saturating_duration_since(before_sender2))?;
stdb.commit_tx(tx)?;

assert!(
project_views(&stdb, table_id, sender1).is_empty(),
"Sender 1 rows should be cleared"
);
assert_eq!(
project_views(&stdb, table_id, sender2)[0],
row_pv(84),
"Sender 2 rows should not be cleared"
);

// Validate st_view_subs state
let tx = begin_mut_tx(&stdb);
let st_view_row = tx.lookup_st_view_subs(view_id)?;
assert_eq!(st_view_row.len(), 1, "Sender 1 should be removed from st_view_subs");
assert_eq!(
st_view_row[0].identity.0, sender2,
"Sender 1 should be removed from st_view_subs"
);

Ok(())
}

#[test]
fn test_table_name() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
14 changes: 8 additions & 6 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
use crate::database_logger::DatabaseLogger;
use crate::db::persistence::PersistenceProvider;
use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
use crate::db::{self, spawn_tx_metrics_recorder};
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
use crate::host::module_host::ModuleRuntime as _;
Expand Down Expand Up @@ -740,6 +740,9 @@ struct Host {
/// Handle to the task responsible for recording metrics for each transaction.
/// The task is aborted when [`Host`] is dropped.
tx_metrics_recorder_task: AbortHandle,
/// Handle to the task responsible for cleaning up old views.
/// The task is aborted when [`Host`] is dropped.
view_cleanup_task: AbortHandle,
}

impl Host {
Expand Down Expand Up @@ -864,19 +867,17 @@ impl Host {

scheduler_starter.start(&module_host)?;
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone());

let module = watch::Sender::new(module_host);
//TODO(shub): Below code interfere with `exit_module` code,
// I suspect channel internally holds a reference to the module,
// even after we drop the sender.
//
// replica_ctx.subscriptions.init(module.subscribe());

Ok(Host {
module,
replica_ctx,
scheduler,
disk_metrics_recorder_task,
tx_metrics_recorder_task,
view_cleanup_task,
})
}

Expand Down Expand Up @@ -1053,6 +1054,7 @@ impl Drop for Host {
fn drop(&mut self) {
self.disk_metrics_recorder_task.abort();
self.tx_metrics_recorder_task.abort();
self.view_cleanup_task.abort();
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ impl CommittedState {
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
}

pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) {
self.read_sets.remove_view(view_id)
pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option<Identity>) {
self.read_sets.remove_view(view_id, sender)
}

pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData {
Expand Down
83 changes: 77 additions & 6 deletions crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ impl ViewReadSets {
}

/// Removes keys for `view_id` from the read set
pub fn remove_view(&mut self, view_id: ViewId) {
pub fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
self.tables.retain(|_, readset| {
readset.remove_view(view_id);
readset.remove_view(view_id, sender);
!readset.is_empty()
});
}
Expand Down Expand Up @@ -144,9 +144,14 @@ impl TableReadSet {
self.table_scans.is_empty()
}

/// Removes keys for `view_id` from the read set
fn remove_view(&mut self, view_id: ViewId) {
self.table_scans.retain(|info| info.view_id != view_id);
/// Removes keys for `view_id` from the read set, optionally filtering by `sender`
fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
if let Some(identity) = sender {
self.table_scans
.retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity)));
} else {
self.table_scans.retain(|call| call.view_id != view_id);
}
}

/// Merge or union two read sets for this table
Expand Down Expand Up @@ -221,7 +226,13 @@ impl MutTxId {
/// Removes keys for `view_id` from the committed read set.
/// Used for dropping views in an auto-migration.
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {
self.committed_state_write_lock.drop_view_from_read_sets(view_id)
self.committed_state_write_lock.drop_view_from_read_sets(view_id, None)
}

/// Removes a specific view call from the committed read set.
pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) {
self.committed_state_write_lock
.drop_view_from_read_sets(view_id, Some(sender))
}
}

Expand Down Expand Up @@ -1959,6 +1970,66 @@ impl MutTxId {
Ok(())
}

/// Clean up Views that have no subscribers and haven't been called within the expiration duration.
///
/// It looks for rows in `st_view_sub` where:
/// - `has_subscribers` is `false`,
/// - `last_called` timestamp is older than the expiration threshold.
///
/// for each such row, it clears the backing table, readset entry and deletes the subscription row.
pub fn clear_expired_views(&mut self, expiration_duration: Duration) -> Result<()> {
let now = Timestamp::now();
let expiration_threshold = now - expiration_duration;

// Collect rows that meet expiration criteria
let expired_sub_rows: Vec<(StViewSubRow, RowPointer)> = self
.iter_by_col_eq(
ST_VIEW_SUB_ID,
StViewSubFields::HasSubscribers,
&AlgebraicValue::from(false),
)?
.map(|row_ref| {
StViewSubRow::try_from(row_ref)
.map(|row| (row, row_ref.pointer()))
.expect("Failed to deserialize st_view_sub row")
})
.filter(|(row, _)| {
!row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold
})
.collect();

// For each expired view subscription, clear the backing table and delete the subscription
for (row, sub_row_ptr) in expired_sub_rows {
let view_id = row.view_id;
let sender: Identity = row.identity.into();
// Get the view's backing table_id from st_view
let StViewRow {
table_id, is_anonymous, ..
} = self.lookup_st_view(view_id)?;
let table_id = table_id.expect("views have backing table");

if is_anonymous {
self.clear_table(table_id)?;
self.drop_view_from_committed_read_set(view_id);
} else {
let rows_to_delete = self
.iter_by_col_eq(table_id, 0, &sender.into())?
.map(|res| res.pointer())
.collect::<Vec<_>>();

for row_ptr in rows_to_delete {
self.delete(table_id, row_ptr)?;
}

self.drop_view_with_sender_from_committed_read_set(view_id, sender);
}

// Finally, delete the st_view_sub row
self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?;
}
Ok(())
}

/// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view.
pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> {
use StViewSubFields::*;
Expand Down
Loading