Skip to content

feat: check vss state via lnd channels snapshot #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 6, 2025
Merged
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mutiny-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ argon2 = { version = "0.5.0", features = ["password-hash", "alloc"] }
bincode = "1.3.3"
hex-conservative = "0.1.1"
async-lock = "3.2.0"
once_cell = "1.18.0"

base64 = "0.13.0"
pbkdf2 = "0.11"
Expand Down
3 changes: 1 addition & 2 deletions mutiny-core/src/authclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use bitcoin::hashes::{hex::prelude::*, sha256, Hash};
use bitcoin::key::rand::Rng;
use bitcoin::secp256k1::rand::thread_rng;
use jwt_compact::UntrustedToken;
use lightning::{log_debug, log_error, log_info};
use lightning::{log_trace, util::logger::*};
use lightning::{log_debug, log_error, log_info, log_trace, util::logger::*};
use reqwest::Client;
use reqwest::{Method, StatusCode, Url};
use serde_json::Value;
Expand Down
2 changes: 2 additions & 0 deletions mutiny-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub enum MutinyError {
/// Returned when trying to start Mutiny while it is already running.
#[error("Mutiny is already running.")]
AlreadyRunning,
#[error("The stored LND snapshot is outdated.")]
LndSnapshotOutdated,
/// Returned when trying to stop Mutiny while it is not running.
#[error("Mutiny is not running.")]
NotRunning,
Expand Down
13 changes: 10 additions & 3 deletions mutiny-core/src/ldkstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use crate::node::Router;
use crate::node::{default_user_config, ChainMonitor};
use crate::nodemanager::ChannelClosure;
use crate::storage::{IndexItem, MutinyStorage, VersionedValue};
use crate::utils;
use crate::utils::{sleep, spawn};
use crate::utils::{self, now, sleep, spawn};
use crate::{chain::MutinyChain, scorer::HubPreferentialScorer};
use anyhow::anyhow;
use bitcoin::hashes::hex::FromHex;
Expand Down Expand Up @@ -47,7 +46,7 @@ const CHANNEL_OPENING_PARAMS_PREFIX: &str = "chan_open_params/";
pub const CHANNEL_CLOSURE_PREFIX: &str = "channel_closure/";
pub const CHANNEL_CLOSURE_BUMP_PREFIX: &str = "channel_closure_bump/";
const FAILED_SPENDABLE_OUTPUT_DESCRIPTOR_KEY: &str = "failed_spendable_outputs";
pub const BROADCAST_TX_1_IN_MULTI_OUT: &str = "broadcast_tx_1_in_multi_out/";
pub const ACTIVE_NODE_ID_KEY: &str = "active_node_id";

pub(crate) type PhantomChannelManager<S: MutinyStorage> = LdkChannelManager<
Arc<ChainMonitor<S>>,
Expand Down Expand Up @@ -464,6 +463,14 @@ impl<S: MutinyStorage> MutinyNodePersister<S> {
Ok(())
}

pub(crate) fn persist_node_id(&self, node_id: String) -> Result<(), MutinyError> {
self.storage.write_data(
ACTIVE_NODE_ID_KEY.to_string(),
node_id,
Some(now().as_secs() as u32),
)
}

/// Persists the failed spendable outputs to storage.
/// Previously failed spendable outputs are not overwritten.
///
Expand Down
105 changes: 92 additions & 13 deletions mutiny-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ use crate::error::MutinyError;
pub use crate::gossip::{GOSSIP_SYNC_TIME_KEY, NETWORK_GRAPH_KEY, PROB_SCORER_KEY};
pub use crate::keymanager::generate_seed;
pub use crate::ldkstorage::{
BROADCAST_TX_1_IN_MULTI_OUT, CHANNEL_CLOSURE_BUMP_PREFIX, CHANNEL_CLOSURE_PREFIX,
CHANNEL_MANAGER_KEY, MONITORS_PREFIX_KEY,
ACTIVE_NODE_ID_KEY, CHANNEL_CLOSURE_BUMP_PREFIX, CHANNEL_CLOSURE_PREFIX, CHANNEL_MANAGER_KEY,
MONITORS_PREFIX_KEY,
};
use crate::lsp::lndchannel::fetch_lnd_channels_snapshot;
use crate::messagehandler::CommonLnEventCallback;
use crate::nodemanager::NodeManager;
use crate::nodemanager::{ChannelClosure, MutinyBip21RawMaterials};
pub use crate::onchain::BroadcastTx1InMultiOut;
use crate::storage::get_invoice_by_hash;
use crate::utils::sleep;
use crate::utils::spawn;
use crate::storage::{get_invoice_by_hash, LND_CHANNELS_SNAPSHOT_KEY};
use crate::utils::{now, sleep, spawn, spawn_with_handle, StopHandle};
use crate::vss::VSS_MANAGER;
use crate::{authclient::MutinyAuthClient, logging::MutinyLogger};
use crate::{
event::{HTLCStatus, MillisatAmount, PaymentInfo},
Expand All @@ -67,7 +69,7 @@ use crate::{
PAYMENT_INBOUND_PREFIX_KEY, PAYMENT_OUTBOUND_PREFIX_KEY, TRANSACTION_DETAILS_PREFIX_KEY,
},
};
use anyhow::Context;

use bdk_chain::ConfirmationTime;
use bip39::Mnemonic;
pub use bitcoin;
Expand All @@ -77,6 +79,7 @@ use bitcoin::{bip32::Xpriv, Transaction};
use bitcoin::{hashes::sha256, Network, Txid};
use bitcoin::{hashes::Hash, Address};

use anyhow::Context;
use futures_util::lock::Mutex;
use hex_conservative::{DisplayHex, FromHex};
use itertools::Itertools;
Expand All @@ -87,10 +90,8 @@ use lightning::util::logger::Logger;
use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
pub use lightning_invoice;
use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription};

use messagehandler::CommonLnEventCallback;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use utils::{spawn_with_handle, StopHandle};

use std::collections::HashMap;
use std::collections::HashSet;
Expand All @@ -99,7 +100,6 @@ use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;

#[cfg(target_arch = "wasm32")]
use web_time::Instant;

Expand Down Expand Up @@ -548,6 +548,7 @@ pub struct MutinyWalletConfigBuilder {
skip_device_lock: bool,
pub safe_mode: bool,
skip_hodl_invoices: bool,
check_lnd_snapshot: bool,
}

impl MutinyWalletConfigBuilder {
Expand All @@ -572,6 +573,7 @@ impl MutinyWalletConfigBuilder {
skip_device_lock: false,
safe_mode: false,
skip_hodl_invoices: true,
check_lnd_snapshot: false,
}
}

Expand Down Expand Up @@ -647,6 +649,10 @@ impl MutinyWalletConfigBuilder {
self.skip_hodl_invoices = false;
}

pub fn do_check_lnd_snapshot(&mut self) {
self.check_lnd_snapshot = true;
}

pub fn build(self) -> MutinyWalletConfig {
let network = self.network.expect("network is required");

Expand All @@ -670,6 +676,7 @@ impl MutinyWalletConfigBuilder {
skip_device_lock: self.skip_device_lock,
safe_mode: self.safe_mode,
skip_hodl_invoices: self.skip_hodl_invoices,
check_lnd_snapshot: false,
}
}
}
Expand All @@ -695,6 +702,7 @@ pub struct MutinyWalletConfig {
skip_device_lock: bool,
pub safe_mode: bool,
skip_hodl_invoices: bool,
check_lnd_snapshot: bool,
}

pub struct MutinyWalletBuilder<S: MutinyStorage> {
Expand Down Expand Up @@ -817,7 +825,7 @@ impl<S: MutinyStorage> MutinyWalletBuilder<S> {
let network = self
.network
.map_or_else(|| Err(MutinyError::InvalidArgumentsError), Ok)?;
let config = self.config.unwrap_or(
let config = self.config.clone().unwrap_or(
MutinyWalletConfigBuilder::new(self.xprivkey)
.with_network(network)
.build(),
Expand All @@ -844,12 +852,15 @@ impl<S: MutinyStorage> MutinyWalletBuilder<S> {

// Need to prevent other devices from running at the same time
log_debug!(logger, "checking device lock");
let lsp_url = config.lsp_url.clone();
if !config.skip_device_lock {
let start = Instant::now();
if let Some(lock) = self.storage.get_device_lock()? {
log_info!(logger, "Current device lock: {lock:?}");
}
self.storage.set_device_lock(&logger)?;
self.storage
.set_device_lock(&logger, lsp_url.clone(), config.check_lnd_snapshot)
.await?;
log_debug!(
logger,
"Device lock set: took {}ms",
Expand Down Expand Up @@ -905,15 +916,83 @@ impl<S: MutinyStorage> MutinyWalletBuilder<S> {
loop {
if stop_signal.stopping() {
log_debug!(logger_clone, "stopping claim device lock");
while VSS_MANAGER.has_in_progress() {
log_debug!(logger_clone, "waiting for VSS to finish");
sleep(300).await;
}
if let Err(e) = storage_clone.release_device_lock(&logger_clone) {
log_error!(logger_clone, "Error releasing device lock: {e}");
}
break;
}
if let Err(e) = storage_clone.set_device_lock(&logger_clone) {

let config = self.config.as_ref().expect("config is required");
if let (Some(lsp_url), Ok(Some(node_id))) =
(config.lsp_url.as_ref(), storage_clone.get_node_id())
{
match fetch_lnd_channels_snapshot(
&Client::new(),
lsp_url,
&node_id,
&logger_clone,
)
.await
{
Ok(first_lnd_snapshot) => {
log_debug!(
logger_clone,
"First fetched lnd snapshot: {:?}",
first_lnd_snapshot
);
if !VSS_MANAGER.has_in_progress() {
if let Ok(second_lnd_snapshot) = fetch_lnd_channels_snapshot(
&Client::new(),
lsp_url,
&node_id,
&logger_clone,
)
.await
{
log_debug!(
logger_clone,
"Second fetched lnd snapshot: {:?}",
second_lnd_snapshot
);
if first_lnd_snapshot.snapshot == second_lnd_snapshot.snapshot {
log_debug!(logger_clone, "Saving lnd snapshot");
if let Err(e) = storage_clone.write_data(
LND_CHANNELS_SNAPSHOT_KEY.to_string(),
&second_lnd_snapshot,
Some(now().as_secs() as u32),
) {
log_error!(
logger_clone,
"Error saving lnd snapshot: {e}"
);
}
}
}
}
}
Err(e) => {
log_error!(logger_clone, "Error fetching lnd channels: {e}");
}
}
}

if let Err(e) = storage_clone
.set_device_lock(&logger_clone, lsp_url.clone(), config.check_lnd_snapshot)
.await
{
log_error!(logger_clone, "Error setting device lock: {e}");
}

log_debug!(
logger_clone,
"Vss pending writes: {:?}",
VSS_MANAGER.get_pending_writes()
);

let mut remained_sleep_ms = (DEVICE_LOCK_INTERVAL_SECS * 1000) as i32;
while !stop_signal.stopping() && remained_sleep_ms > 0 {
let sleep_ms = 300;
Expand Down
Loading
Loading