diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 0e7d0872a..49f038997 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use bdk_chain::Merge; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::io::{self, Error, ErrorKind}; use lightning::util::persist::{KVStore, KVStoreSync}; @@ -181,7 +182,7 @@ impl KVStoreSync for VssStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { debug_assert!(false, "Failed to access internal runtime"); @@ -203,6 +204,7 @@ impl KVStoreSync for VssStore { primary_namespace, secondary_namespace, key, + lazy, ) .await }; @@ -275,7 +277,7 @@ impl KVStore for VssStore { }) } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Pin> + Send>> { let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); @@ -292,6 +294,7 @@ impl KVStore for VssStore { primary_namespace, secondary_namespace, key, + lazy, ) .await }) @@ -321,6 +324,7 @@ struct VssStoreInner { // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. locks: Mutex>>>, + pending_lazy_deletes: Mutex>, } impl VssStoreInner { @@ -347,7 +351,8 @@ impl VssStoreInner { let client = VssClient::new_with_headers(base_url, retry_policy, header_provider); let locks = Mutex::new(HashMap::new()); - Self { client, store_id, storable_builder, key_obfuscator, locks } + let pending_lazy_deletes = Mutex::new(Vec::new()); + Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes } } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { @@ -451,6 +456,12 @@ impl VssStoreInner { "write", )?; + let delete_items = self + .pending_lazy_deletes + .try_lock() + .ok() + .and_then(|mut guard| guard.take()) + .unwrap_or_default(); self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); @@ -464,10 +475,15 @@ impl VssStoreInner { version: vss_version, value: storable.encode_to_vec(), }], - delete_items: vec![], + delete_items: delete_items.clone(), }; self.client.put_object(&request).await.map_err(|e| { + // Restore delete items so they'll be retried on next write. + if !delete_items.is_empty() { + self.pending_lazy_deletes.lock().unwrap().extend(delete_items); + } + let msg = format!( "Failed to write to key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -482,7 +498,7 @@ impl VssStoreInner { async fn remove_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: String, secondary_namespace: String, key: String, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -491,13 +507,19 @@ impl VssStoreInner { "remove", )?; + let obfuscated_key = + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); + + let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] }; + if lazy { + let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap(); + pending_lazy_deletes.push(key_value); + return Ok(()); + } + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { - let obfuscated_key = - self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); - let request = DeleteObjectRequest { - store_id: self.store_id.clone(), - key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), - }; + let request = + DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) }; self.client.delete_object(&request).await.map_err(|e| { let msg = format!( @@ -644,4 +666,87 @@ mod tests { do_read_write_remove_list_persist(&vss_store); drop(vss_store) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn vss_lazy_delete() { + let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); + let mut rng = rng(); + let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + let mut vss_seed = [0u8; 32]; + rng.fill_bytes(&mut vss_seed); + let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); + let vss_store = + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); + + let primary_namespace = "test_namespace"; + let secondary_namespace = ""; + let key_to_delete = "key_to_delete"; + let key_for_trigger = "key_for_trigger"; + let data_to_delete = b"data_to_delete".to_vec(); + let trigger_data = b"trigger_data".to_vec(); + + // Write the key that we'll later lazily delete + KVStore::write( + &vss_store, + primary_namespace, + secondary_namespace, + key_to_delete, + data_to_delete.clone(), + ) + .await + .unwrap(); + + // Verify the key exists + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) + .await + .unwrap(); + assert_eq!(read_data, data_to_delete); + + // Perform a lazy delete + KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true) + .await + .unwrap(); + + // Verify the key still exists (lazy delete doesn't immediately remove it) + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) + .await + .unwrap(); + assert_eq!(read_data, data_to_delete); + + // Verify the key is still in the list + let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); + assert!(keys.contains(&key_to_delete.to_string())); + + // Trigger the actual deletion by performing a write operation + KVStore::write( + &vss_store, + primary_namespace, + secondary_namespace, + key_for_trigger, + trigger_data.clone(), + ) + .await + .unwrap(); + + // Now verify the key is actually deleted + let read_result = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await; + assert!(read_result.is_err()); + assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound); + + // Verify the key is no longer in the list + let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); + assert!(!keys.contains(&key_to_delete.to_string())); + + // Verify the trigger key still exists + let read_data = + KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger) + .await + .unwrap(); + assert_eq!(read_data, trigger_data); + } }