Skip to content

Commit a769ee0

Browse files
Feat/graphman/clear stale call cache (#6186)
* graph,node,store: Add option to remove stale call_cache in graphman Signed-off-by: Maksim Dimitrov <[email protected]> * docs: Update graphman usage documentation Signed-off-by: Maksim Dimitrov <[email protected]> * store: Add clear_stale_call_cache test Signed-off-by: Maksim Dimitrov <[email protected]> * node: Add max contracts option and value validations Signed-off-by: Maksim Dimitrov <[email protected]> * graph, store: Batch stale contracts and handle max limit Signed-off-by: Maksim Dimitrov <[email protected]> * test: Update tests case Signed-off-by: Maksim Dimitrov <[email protected]> * docs: Update graphman docs Signed-off-by: Maksim Dimitrov <[email protected]> * store: Update the cache test Signed-off-by: Maksim Dimitrov <[email protected]> --------- Signed-off-by: Maksim Dimitrov <[email protected]>
1 parent 91865fd commit a769ee0

File tree

7 files changed

+349
-13
lines changed

7 files changed

+349
-13
lines changed

docs/graphman.md

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -371,21 +371,30 @@ Inspect all blocks after block `13000000`:
371371

372372
Remove the call cache of the specified chain.
373373

374-
If block numbers are not mentioned in `--from` and `--to`, then all the call cache will be removed.
374+
Either remove entries in the range `--from` and `--to`, remove stale contracts which have not been accessed for a specified duration `--ttl_days`, or remove the entire cache with `--remove-entire-cache`. Removing the entire cache can reduce indexing performance significantly and should generally be avoided.
375375

376-
USAGE:
377-
graphman chain call-cache <CHAIN_NAME> remove [OPTIONS]
376+
Usage: graphman chain call-cache <CHAIN_NAME> remove [OPTIONS]
378377

379-
OPTIONS:
380-
-f, --from <FROM>
381-
Starting block number
378+
Options:
379+
--remove-entire-cache
380+
Remove the entire cache
381+
382+
--ttl-days <TTL_DAYS>
383+
Remove stale contracts based on call_meta table
382384

383-
-h, --help
384-
Print help information
385+
--ttl-max-contracts <TTL_MAX_CONTRACTS>
386+
Limit the number of contracts to consider for stale contract removal
387+
388+
-f, --from <FROM>
389+
Starting block number
385390

386-
-t, --to <TO>
391+
-t, --to <TO>
387392
Ending block number
388393

394+
-h, --help
395+
Print help (see a summary with '-h')
396+
397+
389398
### DESCRIPTION
390399

391400
Remove the call cache of a specified chain.
@@ -404,6 +413,15 @@ the first block number will be used as the starting block number.
404413
The `to` option is used to specify the ending block number of the block range. In the absence of `to` option,
405414
the last block number will be used as the ending block number.
406415

416+
#### `--remove-entire-cache`
417+
The `--remove-entire-cache` option is used to remove the entire call cache of the specified chain.
418+
419+
#### `--ttl-days <TTL_DAYS>`
420+
The `--ttl-days` option is used to remove stale contracts based on the `call_meta.accessed_at` field. For example, if `--ttl-days` is set to 7, all calls to a contract that has not been accessed in the last 7 days will be removed from the call cache.
421+
422+
#### `--ttl-max-contracts <TTL_MAX_CONTRACTS>`
423+
The `--ttl-max-contracts` option is used to limit the maximum number of contracts to be removed when using the `--ttl-days` option. For example, if `--ttl-max-contracts` is set to 100, at most 100 contracts will be removed from the call cache even if more contracts meet the TTL criteria.
424+
407425
### EXAMPLES
408426

409427
Remove the call cache for all blocks numbered from 10 to 20:
@@ -412,5 +430,12 @@ Remove the call cache for all blocks numbered from 10 to 20:
412430

413431
Remove all the call cache of the specified chain:
414432

415-
graphman --config config.toml chain call-cache ethereum remove
433+
graphman --config config.toml chain call-cache ethereum remove --remove-entire-cache
434+
435+
Remove stale contracts from the call cache that have not been accessed in the last 7 days:
436+
437+
graphman --config config.toml chain call-cache ethereum remove --ttl-days 7
438+
439+
Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts:
440+
graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100
416441

graph/src/blockchain/mock.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,13 @@ impl ChainStore for MockChainStore {
571571
async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> {
572572
unimplemented!()
573573
}
574+
async fn clear_stale_call_cache(
575+
&self,
576+
_ttl_days: i32,
577+
_ttl_max_contracts: Option<i64>,
578+
) -> Result<(), Error> {
579+
unimplemented!()
580+
}
574581
fn chain_identifier(&self) -> Result<ChainIdentifier, Error> {
575582
unimplemented!()
576583
}

graph/src/components/store/traits.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,13 @@ pub trait ChainStore: ChainHeadStore {
599599
/// Clears call cache of the chain for the given `from` and `to` block number.
600600
async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>;
601601

602+
/// Clears stale call cache entries for the given TTL in days.
603+
async fn clear_stale_call_cache(
604+
&self,
605+
ttl_days: i32,
606+
ttl_max_contracts: Option<i64>,
607+
) -> Result<(), Error>;
608+
602609
/// Return the chain identifier for this store.
603610
fn chain_identifier(&self) -> Result<ChainIdentifier, Error>;
604611

node/src/bin/manager.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,14 +555,21 @@ pub enum ChainCommand {
555555
pub enum CallCacheCommand {
556556
/// Remove the call cache of the specified chain.
557557
///
558-
/// Either remove entries in the range `--from` and `--to`, or remove
559-
/// the entire cache with `--remove-entire-cache`. Removing the entire
558+
/// Either remove entries in the range `--from` and `--to`,
559+
/// remove the cache for contracts that have not been accessed for the specified duration --ttl_days,
560+
/// or remove the entire cache with `--remove-entire-cache`. Removing the entire
560561
/// cache can reduce indexing performance significantly and should
561562
/// generally be avoided.
562563
Remove {
563564
/// Remove the entire cache
564565
#[clap(long, conflicts_with_all = &["from", "to"])]
565566
remove_entire_cache: bool,
567+
/// Remove the cache for contracts that have not been accessed in the last <TTL_DAYS> days
568+
#[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))]
569+
ttl_days: Option<i32>,
570+
/// Limits the number of contracts to consider for cache removal when using --ttl_days
571+
#[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))]
572+
ttl_max_contracts: Option<i64>,
566573
/// Starting block number
567574
#[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")]
568575
from: Option<i32>,
@@ -1472,8 +1479,19 @@ async fn main() -> anyhow::Result<()> {
14721479
from,
14731480
to,
14741481
remove_entire_cache,
1482+
ttl_days,
1483+
ttl_max_contracts,
14751484
} => {
14761485
let chain_store = ctx.chain_store(&chain_name)?;
1486+
if let Some(ttl_days) = ttl_days {
1487+
return commands::chain::clear_stale_call_cache(
1488+
chain_store,
1489+
ttl_days,
1490+
ttl_max_contracts,
1491+
)
1492+
.await;
1493+
}
1494+
14771495
if !remove_entire_cache && from.is_none() && to.is_none() {
14781496
bail!("you must specify either --from and --to or --remove-entire-cache");
14791497
}

node/src/manager/commands/chain.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,21 @@ pub async fn clear_call_cache(
8181
Ok(())
8282
}
8383

84+
pub async fn clear_stale_call_cache(
85+
chain_store: Arc<ChainStore>,
86+
ttl_days: i32,
87+
ttl_max_contracts: Option<i64>,
88+
) -> Result<(), Error> {
89+
println!(
90+
"Removing stale entries from the call cache for `{}`",
91+
chain_store.chain
92+
);
93+
chain_store
94+
.clear_stale_call_cache(ttl_days, ttl_max_contracts)
95+
.await?;
96+
Ok(())
97+
}
98+
8499
pub async fn info(
85100
primary: ConnectionPool,
86101
store: Arc<BlockStore>,

store/postgres/src/chain_store.rs

Lines changed: 198 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub use data::Storage;
8383

8484
/// Encapuslate access to the blocks table for a chain.
8585
mod data {
86+
use crate::diesel::dsl::IntervalDsl;
8687
use diesel::sql_types::{Array, Binary, Bool, Nullable};
8788
use diesel::{connection::SimpleConnection, insert_into};
8889
use diesel::{delete, prelude::*, sql_query};
@@ -104,8 +105,10 @@ mod data {
104105
use graph::prelude::transaction_receipt::LightTransactionReceipt;
105106
use graph::prelude::web3::types::H256;
106107
use graph::prelude::{
107-
serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError,
108+
info, serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger,
109+
StoreError,
108110
};
111+
109112
use std::collections::HashMap;
110113
use std::convert::TryFrom;
111114
use std::fmt;
@@ -1398,6 +1401,190 @@ mod data {
13981401
}
13991402
}
14001403

1404+
pub fn clear_stale_call_cache(
1405+
&self,
1406+
conn: &mut PgConnection,
1407+
logger: &Logger,
1408+
ttl_days: i32,
1409+
ttl_max_contracts: Option<i64>,
1410+
) -> Result<(), Error> {
1411+
let mut total_calls: usize = 0;
1412+
let mut total_contracts: i64 = 0;
1413+
// We process contracts in batches to avoid loading too many entries into memory
1414+
// at once. Each contract can have many calls, so we also delete calls in batches.
1415+
// Note: The batch sizes were chosen based on experimentation. Potentially, they
1416+
// could be made configurable via ENV vars.
1417+
let contracts_batch_size: i64 = 2000;
1418+
let cache_batch_size: usize = 10000;
1419+
1420+
// Limits the number of contracts to process if ttl_max_contracts is set.
1421+
// Used also to adjust the final batch size, so we don't process more
1422+
// contracts than the set limit.
1423+
let remaining_contracts = |processed: i64| -> Option<i64> {
1424+
ttl_max_contracts.map(|limit| limit.saturating_sub(processed))
1425+
};
1426+
1427+
match self {
1428+
Storage::Shared => {
1429+
use public::eth_call_cache as cache;
1430+
use public::eth_call_meta as meta;
1431+
1432+
loop {
1433+
if let Some(0) = remaining_contracts(total_contracts) {
1434+
info!(
1435+
logger,
1436+
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
1437+
total_calls,
1438+
total_contracts
1439+
);
1440+
break;
1441+
}
1442+
1443+
let batch_limit = remaining_contracts(total_contracts)
1444+
.map(|left| left.min(contracts_batch_size))
1445+
.unwrap_or(contracts_batch_size);
1446+
1447+
let stale_contracts = meta::table
1448+
.select(meta::contract_address)
1449+
.filter(
1450+
meta::accessed_at
1451+
.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())),
1452+
)
1453+
.limit(batch_limit)
1454+
.get_results::<Vec<u8>>(conn)?;
1455+
1456+
if stale_contracts.is_empty() {
1457+
info!(
1458+
logger,
1459+
"Finished cleaning call cache: deleted {} entries for {} contracts",
1460+
total_calls,
1461+
total_contracts
1462+
);
1463+
break;
1464+
}
1465+
1466+
loop {
1467+
let next_batch = cache::table
1468+
.select(cache::id)
1469+
.filter(cache::contract_address.eq_any(&stale_contracts))
1470+
.limit(cache_batch_size as i64)
1471+
.get_results::<Vec<u8>>(conn)?;
1472+
let deleted_count =
1473+
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
1474+
.execute(conn)?;
1475+
1476+
total_calls += deleted_count;
1477+
1478+
if deleted_count < cache_batch_size {
1479+
break;
1480+
}
1481+
}
1482+
1483+
let deleted_contracts = diesel::delete(
1484+
meta::table.filter(meta::contract_address.eq_any(&stale_contracts)),
1485+
)
1486+
.execute(conn)?;
1487+
1488+
total_contracts += deleted_contracts as i64;
1489+
}
1490+
1491+
Ok(())
1492+
}
1493+
Storage::Private(Schema {
1494+
call_cache,
1495+
call_meta,
1496+
..
1497+
}) => {
1498+
let select_query = format!(
1499+
"WITH stale_contracts AS (
1500+
SELECT contract_address
1501+
FROM {}
1502+
WHERE accessed_at < current_date - interval '{} days'
1503+
LIMIT $1
1504+
)
1505+
SELECT contract_address FROM stale_contracts",
1506+
call_meta.qname, ttl_days
1507+
);
1508+
1509+
let delete_cache_query = format!(
1510+
"WITH targets AS (
1511+
SELECT id
1512+
FROM {}
1513+
WHERE contract_address = ANY($1)
1514+
LIMIT {}
1515+
)
1516+
DELETE FROM {} USING targets
1517+
WHERE {}.id = targets.id",
1518+
call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname
1519+
);
1520+
1521+
let delete_meta_query = format!(
1522+
"DELETE FROM {} WHERE contract_address = ANY($1)",
1523+
call_meta.qname
1524+
);
1525+
1526+
#[derive(QueryableByName)]
1527+
struct ContractAddress {
1528+
#[diesel(sql_type = Bytea)]
1529+
contract_address: Vec<u8>,
1530+
}
1531+
1532+
loop {
1533+
if let Some(0) = remaining_contracts(total_contracts) {
1534+
info!(
1535+
logger,
1536+
"Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)",
1537+
total_calls,
1538+
total_contracts
1539+
);
1540+
break;
1541+
}
1542+
1543+
let batch_limit = remaining_contracts(total_contracts)
1544+
.map(|left| left.min(contracts_batch_size))
1545+
.unwrap_or(contracts_batch_size);
1546+
1547+
let stale_contracts: Vec<Vec<u8>> = sql_query(&select_query)
1548+
.bind::<BigInt, _>(batch_limit)
1549+
.load::<ContractAddress>(conn)?
1550+
.into_iter()
1551+
.map(|r| r.contract_address)
1552+
.collect();
1553+
1554+
if stale_contracts.is_empty() {
1555+
info!(
1556+
logger,
1557+
"Finished cleaning call cache: deleted {} entries for {} contracts",
1558+
total_calls,
1559+
total_contracts
1560+
);
1561+
break;
1562+
}
1563+
1564+
loop {
1565+
let deleted_count = sql_query(&delete_cache_query)
1566+
.bind::<Array<Bytea>, _>(&stale_contracts)
1567+
.execute(conn)?;
1568+
1569+
total_calls += deleted_count;
1570+
1571+
if deleted_count < cache_batch_size {
1572+
break;
1573+
}
1574+
}
1575+
1576+
let deleted_contracts = sql_query(&delete_meta_query)
1577+
.bind::<Array<Bytea>, _>(&stale_contracts)
1578+
.execute(conn)?;
1579+
1580+
total_contracts += deleted_contracts as i64;
1581+
}
1582+
1583+
Ok(())
1584+
}
1585+
}
1586+
}
1587+
14011588
pub(super) fn update_accessed_at(
14021589
&self,
14031590
conn: &mut PgConnection,
@@ -2508,6 +2695,16 @@ impl ChainStoreTrait for ChainStore {
25082695
Ok(())
25092696
}
25102697

2698+
async fn clear_stale_call_cache(
2699+
&self,
2700+
ttl_days: i32,
2701+
ttl_max_contracts: Option<i64>,
2702+
) -> Result<(), Error> {
2703+
let conn = &mut *self.get_conn()?;
2704+
self.storage
2705+
.clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts)
2706+
}
2707+
25112708
async fn transaction_receipts_in_block(
25122709
&self,
25132710
block_hash: &H256,

0 commit comments

Comments
 (0)