Skip to content

Commit 18e041c

Browse files
authored
Merge pull request Blockstream#45 from mempool/junderw/perf-precache-speed
Increase performance for precache operation
2 parents e663693 + d49f752 commit 18e041c

File tree

12 files changed

+223
-11591
lines changed

12 files changed

+223
-11591
lines changed

contrib/popular-scripts.txt

Lines changed: 0 additions & 11365 deletions
This file was deleted.

electrs-start-liquid

Lines changed: 0 additions & 33 deletions
This file was deleted.

electrs-start-liquidtestnet

Lines changed: 0 additions & 33 deletions
This file was deleted.

electrs-start-mainnet

Lines changed: 0 additions & 44 deletions
This file was deleted.

electrs-start-signet

Lines changed: 0 additions & 45 deletions
This file was deleted.

electrs-start-testnet

Lines changed: 0 additions & 45 deletions
This file was deleted.

src/bin/electrs.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
6969
&metrics,
7070
));
7171

72-
if let Some(ref precache_file) = config.precache_scripts {
73-
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
74-
.expect("cannot load scripts to precache");
75-
precache::precache(&chain, precache_scripthashes);
76-
}
77-
7872
let mempool = Arc::new(RwLock::new(Mempool::new(
7973
Arc::clone(&chain),
8074
&metrics,
@@ -102,6 +96,16 @@ fn run_server(config: Arc<Config>) -> Result<()> {
10296
let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query));
10397
let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics);
10498

99+
if let Some(ref precache_file) = config.precache_scripts {
100+
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
101+
.expect("cannot load scripts to precache");
102+
precache::precache(
103+
Arc::clone(&chain),
104+
precache_scripthashes,
105+
config.precache_threads,
106+
);
107+
}
108+
105109
loop {
106110
if let Err(err) = signal.wait(Duration::from_millis(500), true) {
107111
info!("stopping server: {}", err);

src/config.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct Config {
4949
pub index_unspendables: bool,
5050
pub cors: Option<String>,
5151
pub precache_scripts: Option<String>,
52+
pub precache_threads: usize,
5253
pub utxos_limit: usize,
5354
pub electrum_txs_limit: usize,
5455
pub electrum_banner: String,
@@ -188,6 +189,12 @@ impl Config {
188189
.help("Path to file with list of scripts to pre-cache")
189190
.takes_value(true)
190191
)
192+
.arg(
193+
Arg::with_name("precache_threads")
194+
.long("precache-threads")
195+
.help("Non-zero number of threads to use for precache threadpool. [default: 4 * CORE_COUNT]")
196+
.takes_value(true)
197+
)
191198
.arg(
192199
Arg::with_name("utxos_limit")
193200
.long("utxos-limit")
@@ -483,6 +490,22 @@ impl Config {
483490
index_unspendables: m.is_present("index_unspendables"),
484491
cors: m.value_of("cors").map(|s| s.to_string()),
485492
precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()),
493+
precache_threads: m.value_of("precache_threads").map_or_else(
494+
|| {
495+
std::thread::available_parallelism()
496+
.expect("Can't get core count")
497+
.get()
498+
* 4
499+
},
500+
|s| match s.parse::<usize>() {
501+
Ok(v) if v > 0 => v,
502+
_ => clap::Error::value_validation_auto(format!(
503+
"The argument '{}' isn't a valid value",
504+
s
505+
))
506+
.exit(),
507+
},
508+
),
486509

487510
#[cfg(feature = "liquid")]
488511
parent_network,

src/new_index/precache.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,52 @@ use hex;
88
use std::fs::File;
99
use std::io;
1010
use std::io::prelude::*;
11+
use std::sync::{atomic::AtomicUsize, Arc};
12+
use std::time::Instant;
1113

12-
pub fn precache(chain: &ChainQuery, scripthashes: Vec<FullHash>) {
14+
pub fn precache(chain: Arc<ChainQuery>, scripthashes: Vec<FullHash>, threads: usize) {
1315
let total = scripthashes.len();
14-
info!("Pre-caching stats and utxo set for {} scripthashes", total);
16+
info!(
17+
"Pre-caching stats and utxo set on {} threads for {} scripthashes",
18+
threads, total
19+
);
1520

1621
let pool = rayon::ThreadPoolBuilder::new()
17-
.num_threads(16)
22+
.num_threads(threads)
1823
.thread_name(|i| format!("precache-{}", i))
1924
.build()
2025
.unwrap();
21-
pool.install(|| {
22-
scripthashes
23-
.par_iter()
24-
.enumerate()
25-
.for_each(|(i, scripthash)| {
26-
if i % 5 == 0 {
27-
info!("running pre-cache for scripthash {}/{}", i + 1, total);
28-
}
29-
chain.stats(&scripthash[..]);
30-
//chain.utxo(&scripthash[..]);
31-
})
26+
let now = Instant::now();
27+
let counter = AtomicUsize::new(0);
28+
std::thread::spawn(move || {
29+
pool.install(|| {
30+
scripthashes
31+
.par_iter()
32+
.for_each(|scripthash| {
33+
// First, cache
34+
chain.stats(&scripthash[..], crate::new_index::db::DBFlush::Disable);
35+
let _ = chain.utxo(&scripthash[..], usize::MAX, crate::new_index::db::DBFlush::Disable);
36+
37+
// Then, increment the counter
38+
let pre_increment = counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
39+
let post_increment_counter = pre_increment + 1;
40+
41+
// Then, log
42+
if post_increment_counter % 500 == 0 {
43+
let now_millis = now.elapsed().as_millis();
44+
info!("{post_increment_counter}/{total} Processed in {now_millis} ms running pre-cache for scripthash");
45+
}
46+
47+
// Every 10k counts, flush the DB to disk
48+
if post_increment_counter % 10000 == 0 {
49+
info!("Flushing cache_db... {post_increment_counter}");
50+
chain.store().cache_db().flush();
51+
info!("Done Flushing cache_db!!! {post_increment_counter}");
52+
}
53+
})
54+
});
55+
// After everything is done, flush the cache
56+
chain.store().cache_db().flush();
3257
});
3358
}
3459

src/new_index/query.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ impl Query {
8888
}
8989

9090
pub fn utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> {
91-
let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?;
91+
let mut utxos = self.chain.utxo(
92+
scripthash,
93+
self.config.utxos_limit,
94+
super::db::DBFlush::Enable,
95+
)?;
9296
let mempool = self.mempool();
9397
utxos.retain(|utxo| !mempool.has_spend(&OutPoint::from(utxo)));
9498
utxos.extend(mempool.utxo(scripthash));
@@ -111,7 +115,7 @@ impl Query {
111115

112116
pub fn stats(&self, scripthash: &[u8]) -> (ScriptStats, ScriptStats) {
113117
(
114-
self.chain.stats(scripthash),
118+
self.chain.stats(scripthash, super::db::DBFlush::Enable),
115119
self.mempool().stats(scripthash),
116120
)
117121
}

0 commit comments

Comments
 (0)