Skip to content

Commit 2ea8a35

Browse files
authored
Merge pull request #105 from EthanYuan/fix-keychain-size-issue-vss
Implement Periodic Keychain Compaction to Optimize VSS Sync
2 parents e9a5f88 + c7fbfe9 commit 2ea8a35

File tree

5 files changed

+199
-14
lines changed

5 files changed

+199
-14
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mutiny-core/src/nodemanager.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -689,12 +689,23 @@ impl<S: MutinyStorage> NodeManager<S> {
689689
}
690690
}
691691

692+
// check keychain size
693+
let did_keychain_compact_this_round = match nm.wallet.try_compact_keychain().await {
694+
Ok(did_keychain_compact_this_round) => did_keychain_compact_this_round,
695+
Err(e) => {
696+
log_error!(nm.logger, "Failed to compact keychain: {e}");
697+
false
698+
}
699+
};
700+
692701
// wait for next sync round, checking graceful shutdown check each second.
693-
for _ in 0..sync_interval_secs {
694-
if nm.stop.load(Ordering::Relaxed) {
695-
return;
702+
if !did_keychain_compact_this_round {
703+
for _ in 0..sync_interval_secs {
704+
if nm.stop.load(Ordering::Relaxed) {
705+
return;
706+
}
707+
sleep(1_000).await;
696708
}
697-
sleep(1_000).await;
698709
}
699710
}
700711
});

mutiny-core/src/onchain.rs

Lines changed: 163 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use bdk_wallet::bitcoin::FeeRate;
1313
use bdk_wallet::psbt::PsbtUtils;
1414
use bdk_wallet::template::DescriptorTemplateOut;
1515
use bdk_wallet::{
16-
CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
16+
ChangeSet, CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
1717
};
1818
use bitcoin::bip32::{ChildNumber, DerivationPath, Xpriv};
1919
use bitcoin::consensus::serialize;
@@ -39,9 +39,15 @@ use crate::utils;
3939
use crate::utils::{now, sleep};
4040
use crate::TransactionDetails;
4141

42+
#[cfg(not(target_arch = "wasm32"))]
43+
use std::time::Instant;
44+
#[cfg(target_arch = "wasm32")]
45+
use web_time::Instant;
46+
4247
pub(crate) const FULL_SYNC_STOP_GAP: usize = 150;
4348
pub(crate) const RESTORE_SYNC_STOP_GAP: usize = 50;
4449
const PARALLEL_REQUESTS: usize = 10;
50+
const KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES: usize = 256 * 1024; // 256KB
4551

4652
#[derive(Clone)]
4753
pub struct OnChainWallet<S: MutinyStorage> {
@@ -53,6 +59,10 @@ pub struct OnChainWallet<S: MutinyStorage> {
5359
pub(crate) stop: Arc<AtomicBool>,
5460
logger: Arc<MutinyLogger>,
5561
ln_event_callback: Option<CommonLnEventCallback>,
62+
/// The Bitcoin output descriptors for the wallet’s keychains:
63+
/// 0: receive_descriptor
64+
/// 1: change_descriptor
65+
tr_descriptors: (DescriptorTemplateOut, DescriptorTemplateOut),
5666
}
5767

5868
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
@@ -112,17 +122,23 @@ impl<S: MutinyStorage> OnChainWallet<S> {
112122
None | Some(Ok(None)) => {
113123
// we don't have a bdk wallet, create one
114124
Wallet::create_with_params(
115-
CreateParams::new(receive_descriptor_template, change_descriptor_template)
116-
.network(network),
125+
CreateParams::new(
126+
receive_descriptor_template.clone(),
127+
change_descriptor_template.clone(),
128+
)
129+
.network(network),
117130
)?
118131
}
119132
Some(Err(bdk_wallet::LoadError::Mismatch(_))) => {
120133
// failed to read storage, means we have old encoding and need to delete and re-init wallet
121134
db.delete(&[KEYCHAIN_STORE_KEY])?;
122135
db.write_data(NEED_FULL_SYNC_KEY.to_string(), true, None)?;
123136
Wallet::create_with_params(
124-
CreateParams::new(receive_descriptor_template, change_descriptor_template)
125-
.network(network),
137+
CreateParams::new(
138+
receive_descriptor_template.clone(),
139+
change_descriptor_template.clone(),
140+
)
141+
.network(network),
126142
)?
127143
}
128144
Some(Err(e)) => {
@@ -140,6 +156,7 @@ impl<S: MutinyStorage> OnChainWallet<S> {
140156
stop,
141157
logger,
142158
ln_event_callback,
159+
tr_descriptors: (receive_descriptor_template, change_descriptor_template),
143160
})
144161
}
145162

@@ -253,7 +270,13 @@ impl<S: MutinyStorage> OnChainWallet<S> {
253270
pub async fn sync(&self) -> Result<(), MutinyError> {
254271
// if we need a full sync from a restore
255272
if self.storage.get(NEED_FULL_SYNC_KEY)?.unwrap_or_default() {
273+
let start = Instant::now();
256274
self.full_sync(RESTORE_SYNC_STOP_GAP).await?;
275+
log_info!(
276+
self.logger,
277+
"Full sync took {} seconds",
278+
start.elapsed().as_secs()
279+
);
257280
self.storage.delete(&[NEED_FULL_SYNC_KEY])?;
258281
}
259282
// get first wallet lock that only needs to read
@@ -835,6 +858,141 @@ impl<S: MutinyStorage> OnChainWallet<S> {
835858
log_debug!(self.logger, "Fee bump Transaction broadcast! TXID: {txid}");
836859
Ok(txid)
837860
}
861+
862+
pub fn new_wallet(&self) -> Result<Wallet, MutinyError> {
863+
let wallet = Wallet::create_with_params(
864+
CreateParams::new(self.tr_descriptors.0.clone(), self.tr_descriptors.1.clone())
865+
.network(self.network),
866+
)?;
867+
Ok(wallet)
868+
}
869+
870+
pub async fn try_compact_keychain(&self) -> Result<bool, MutinyError> {
871+
let start = Instant::now();
872+
873+
let changes = self.storage.read_changes()?.unwrap_or_default();
874+
let total_size = serde_json::to_vec(&changes).unwrap_or_default().len();
875+
if total_size < KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES {
876+
log_info!(
877+
self.logger,
878+
"Keychain size {} bytes is below threshold {} bytes, not compacting",
879+
total_size,
880+
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
881+
);
882+
return Ok(false);
883+
}
884+
log_info!(
885+
self.logger,
886+
"Keychain size threshold exceeded {} Bytes, spawning simplified compaction task.",
887+
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
888+
);
889+
self.log_keychain_size(&changes, false);
890+
891+
let mut new_wallet = self.new_wallet()?;
892+
let update = full_scan(&new_wallet, RESTORE_SYNC_STOP_GAP, self.blockchain.clone()).await?;
893+
894+
new_wallet
895+
.apply_update_at(update, Some(now().as_secs()))
896+
.map_err(|e| {
897+
log_error!(self.logger, "Could not apply wallet update: {e}");
898+
MutinyError::Other(anyhow!("Could not apply update: {e}"))
899+
})?;
900+
let mut wallet = self.wallet.try_write()?;
901+
let index = self.storage.activity_index();
902+
let mut index = index.try_write()?;
903+
let new_changeset = new_wallet.take_staged().ok_or(MutinyError::Other(anyhow!(
904+
"Failed to take staged changeset from new wallet"
905+
)))?;
906+
self.log_keychain_size(&new_changeset, true);
907+
self.storage.restore_changes(&new_changeset)?;
908+
*wallet = new_wallet;
909+
drop(wallet); // drop so we can read from wallet
910+
911+
// update the activity index, just get the list of transactions
912+
// and insert them into the index
913+
let index_items = self
914+
.list_transactions(false)?
915+
.into_iter()
916+
.map(|t| IndexItem {
917+
timestamp: match t.confirmation_time {
918+
ConfirmationTime::Confirmed { time, .. } => Some(time),
919+
ConfirmationTime::Unconfirmed { .. } => None,
920+
},
921+
key: format!("{ONCHAIN_PREFIX}{}", t.internal_id),
922+
})
923+
.collect::<Vec<_>>();
924+
925+
// remove old-onchain txs
926+
index.retain(|i| !i.key.starts_with(ONCHAIN_PREFIX));
927+
index.extend(index_items);
928+
929+
log_info!(self.logger, "Keychain compaction completed successfully.");
930+
log_info!(
931+
self.logger,
932+
"Keychain compaction took {} seconds",
933+
start.elapsed().as_secs()
934+
);
935+
936+
Ok(true)
937+
}
938+
939+
fn log_keychain_size(&self, keychain: &ChangeSet, is_post_compaction: bool) {
940+
let total_size = serde_json::to_vec(keychain).unwrap_or_default().len();
941+
let local_chain_size = serde_json::to_vec(&keychain.local_chain)
942+
.map(|v| v.len())
943+
.unwrap_or(0);
944+
let tx_graph_size = serde_json::to_vec(&keychain.tx_graph)
945+
.map(|v| v.len())
946+
.unwrap_or(0);
947+
let indexer_size = serde_json::to_vec(&keychain.indexer)
948+
.map(|v| v.len())
949+
.unwrap_or(0);
950+
951+
let prefix = if is_post_compaction {
952+
"POST-COMPACTION"
953+
} else {
954+
"PRE-COMPACTION"
955+
};
956+
957+
log_debug!(
958+
self.logger,
959+
"{} size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}",
960+
prefix,
961+
total_size,
962+
local_chain_size,
963+
tx_graph_size,
964+
indexer_size
965+
);
966+
}
967+
}
968+
969+
async fn full_scan(
970+
wallet: &Wallet,
971+
gap: usize,
972+
blockchain: Arc<AsyncClient>,
973+
) -> Result<Update, MutinyError> {
974+
// get first wallet lock that only needs to read
975+
let spks = wallet.all_unbounded_spk_iters();
976+
977+
let mut request_builder = FullScanRequestBuilder::default();
978+
for (kind, pks) in spks.into_iter() {
979+
request_builder = request_builder.spks_for_keychain(kind, pks)
980+
}
981+
982+
let FullScanResult {
983+
tx_update,
984+
last_active_indices,
985+
chain_update,
986+
} = blockchain
987+
.full_scan(request_builder, gap, PARALLEL_REQUESTS)
988+
.await?;
989+
let update = Update {
990+
last_active_indices,
991+
tx_update,
992+
chain: chain_update,
993+
};
994+
995+
Ok(update)
838996
}
839997

840998
fn get_tr_descriptors_for_extended_key(

mutiny-core/src/storage.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,25 @@ pub trait MutinyStorage: Clone + Sized + Send + Sync + 'static {
674674
}
675675
}
676676

677+
/// Restore changeset to the storage
678+
fn restore_changes(&self, changeset: &ChangeSet) -> Result<(), MutinyError> {
679+
let current_timestamp_secs = now().as_secs();
680+
let backup_key = format!("{}_backup_{}", KEYCHAIN_STORE_KEY, current_timestamp_secs);
681+
let _ = match self.get_data::<VersionedValue>(KEYCHAIN_STORE_KEY) {
682+
Ok(Some(versioned)) => self.write_data(backup_key, versioned, None),
683+
Ok(None) => Ok(()),
684+
Err(e) => {
685+
log_error!(self.logger(), "Error writing backup: {:?}", e);
686+
Err(e)
687+
}
688+
};
689+
690+
let version = current_timestamp_secs as u32;
691+
let value = serde_json::to_value(changeset)?;
692+
let value = VersionedValue { value, version };
693+
self.write_data(KEYCHAIN_STORE_KEY.to_string(), value, Some(version))
694+
}
695+
677696
/// Spawn background task to run db tasks
678697
fn spawn<Fut: Task>(&self, _fut: Fut);
679698
}
@@ -1209,9 +1228,6 @@ pub(crate) fn list_payment_info<S: MutinyStorage>(
12091228
.collect())
12101229
}
12111230

1212-
#[derive(Clone)]
1213-
pub struct OnChainStorage<S: MutinyStorage>(pub(crate) S);
1214-
12151231
pub(crate) fn get_payment_hash_from_key<'a>(key: &'a str, prefix: &str) -> &'a str {
12161232
key.trim_start_matches(prefix)
12171233
.splitn(2, '_') // To support the old format that had `_{node_id}` at the end

mutiny-wasm/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ cargo-features = ["per-package-target"]
22

33
[package]
44
name = "mutiny-wasm"
5-
version = "1.15.0"
5+
version = "1.16.0"
66
edition = "2021"
77
authors = ["utxostack"]
88
forced-target = "wasm32-unknown-unknown"

0 commit comments

Comments
 (0)