From 0372ff99ba22b6d56317d12df411aa4b6380b551 Mon Sep 17 00:00:00 2001
From: Jeff <jeff.no.zhao@gmail.com>
Date: Tue, 21 Jan 2025 18:13:36 +0800
Subject: [PATCH] feat: improve scraper startup error handling (#5191)

### Description

- handle errors and sets set chain metrics if scraper fails for chain
during startup

### Drive-by changes
 - implement Clone for `ScraperDb`
- this is because DbConn doesn't implement Clone when `mock` feature is
enabled

### Related issues
Fixes https://github.com/hyperlane-xyz/issues/issues/1408

### Backward compatibility

Yes

### Testing

Added unittest to test function. But don't have tests in place for
testing the logic in `Scraper::run()`
---
 rust/main/Cargo.lock                          |   4 +
 rust/main/agents/scraper/Cargo.toml           |   5 +
 rust/main/agents/scraper/src/agent.rs         | 377 ++++++++++++++----
 .../agents/scraper/src/db/block_cursor.rs     |   2 +-
 rust/main/agents/scraper/src/db/mod.rs        |  31 +-
 rust/main/agents/scraper/src/settings.rs      |   2 +-
 6 files changed, 344 insertions(+), 77 deletions(-)

diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock
index f13ac028c5..449b00171a 100644
--- a/rust/main/Cargo.lock
+++ b/rust/main/Cargo.lock
@@ -7881,16 +7881,19 @@ dependencies = [
  "console-subscriber",
  "derive_more 0.99.18",
  "ethers",
+ "ethers-prometheus",
  "eyre",
  "futures",
  "hyperlane-base",
  "hyperlane-core",
+ "hyperlane-ethereum",
  "hyperlane-test",
  "itertools 0.12.1",
  "migration",
  "num-bigint 0.4.6",
  "num-traits",
  "prometheus",
+ "reqwest",
  "sea-orm",
  "serde",
  "serde_json",
@@ -7900,6 +7903,7 @@ dependencies = [
  "tokio-test",
  "tracing",
  "tracing-futures",
+ "tracing-test",
 ]
 
 [[package]]
diff --git a/rust/main/agents/scraper/Cargo.toml b/rust/main/agents/scraper/Cargo.toml
index 0b9e429b9b..937d4b09a0 100644
--- a/rust/main/agents/scraper/Cargo.toml
+++ b/rust/main/agents/scraper/Cargo.toml
@@ -34,7 +34,12 @@ hyperlane-core = { path = "../../hyperlane-core", features = ["agent"] }
 migration = { path = "migration" }
 
 [dev-dependencies]
+reqwest.workspace = true
+sea-orm = { workspace = true, features = ["mock"]}
 tokio-test = "0.4"
+tracing-test.workspace = true
+ethers-prometheus = { path = "../../ethers-prometheus", features = ["serde"] }
+hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" }
 hyperlane-test = { path = "../../hyperlane-test" }
 
 [features]
diff --git a/rust/main/agents/scraper/src/agent.rs b/rust/main/agents/scraper/src/agent.rs
index d1ded9010e..2d749a234f 100644
--- a/rust/main/agents/scraper/src/agent.rs
+++ b/rust/main/agents/scraper/src/agent.rs
@@ -56,36 +56,10 @@ impl BaseAgent for Scraper {
         let core = settings.build_hyperlane_core(metrics.clone());
 
         let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
-        let mut scrapers: HashMap<u32, ChainScraper> = HashMap::new();
 
-        for domain in settings.chains_to_scrape.iter() {
-            info!(domain = domain.name(), "create chain scraper for domain");
-            let chain_setup = settings.chain_setup(domain).expect("Missing chain config");
-            info!(domain = domain.name(), "create HyperlaneProvider");
-            let provider = settings
-                .build_provider(domain, &metrics.clone())
-                .await?
-                .into();
-            info!(domain = domain.name(), "create HyperlaneDbStore");
-            let store = HyperlaneDbStore::new(
-                db.clone(),
-                domain.clone(),
-                chain_setup.addresses.mailbox,
-                chain_setup.addresses.interchain_gas_paymaster,
-                provider,
-                &chain_setup.index.clone(),
-            )
-            .await?;
-            info!(domain = domain.name(), "insert chain scraper");
-            scrapers.insert(
-                domain.id(),
-                ChainScraper {
-                    domain: domain.clone(),
-                    store,
-                    index_settings: chain_setup.index.clone(),
-                },
-            );
-        }
+        let scrapers =
+            Self::build_chain_scrapers(&settings, metrics.clone(), &chain_metrics, db.clone())
+                .await;
 
         trace!(domain_count = scrapers.len(), "Created scrapers");
 
@@ -113,11 +87,18 @@ impl BaseAgent for Scraper {
         let server_task = server.run().instrument(info_span!("Relayer server"));
         tasks.push(server_task);
 
-        for (domain, scraper) in self.scrapers.iter() {
-            tasks.push(self.scrape(*domain).await);
+        for scraper in self.scrapers.values() {
+            let chain_conf = match self.settings.chain_setup(&scraper.domain) {
+                Ok(s) => s,
+                Err(err) => {
+                    tracing::error!(?err, ?scraper.domain, "Failed to get chain config");
+                    self.chain_metrics
+                        .set_critical_error(scraper.domain.name(), true);
+                    continue;
+                }
+            };
 
-            let chain_conf = self.settings.chain_setup(&scraper.domain).unwrap();
-            let metrics_updater = MetricsUpdater::new(
+            let metrics_updater = match MetricsUpdater::new(
                 chain_conf,
                 self.core_metrics.clone(),
                 self.agent_metrics.clone(),
@@ -125,7 +106,27 @@ impl BaseAgent for Scraper {
                 Self::AGENT_NAME.to_string(),
             )
             .await
-            .unwrap();
+            {
+                Ok(metrics_updater) => metrics_updater,
+                Err(err) => {
+                    tracing::error!(?err, ?scraper.domain, "Failed to build metrics updater");
+                    self.chain_metrics
+                        .set_critical_error(scraper.domain.name(), true);
+                    continue;
+                }
+            };
+
+            match self.scrape(scraper).await {
+                Ok(scraper_task) => {
+                    tasks.push(scraper_task);
+                }
+                Err(err) => {
+                    tracing::error!(?err, ?scraper.domain, "Failed to scrape domain");
+                    self.chain_metrics
+                        .set_critical_error(scraper.domain.name(), true);
+                    continue;
+                }
+            }
             tasks.push(metrics_updater.spawn());
         }
         if let Err(err) = try_join_all(tasks).await {
@@ -137,8 +138,7 @@ impl BaseAgent for Scraper {
 impl Scraper {
     /// Sync contract data and other blockchain with the current chain state.
     /// This will spawn long-running contract sync tasks
-    async fn scrape(&self, domain_id: u32) -> Instrumented<JoinHandle<()>> {
-        let scraper = self.scrapers.get(&domain_id).unwrap();
+    async fn scrape(&self, scraper: &ChainScraper) -> eyre::Result<Instrumented<JoinHandle<()>>> {
         let store = scraper.store.clone();
         let index_settings = scraper.index_settings.clone();
         let domain = scraper.domain.clone();
@@ -152,20 +152,22 @@ impl Scraper {
                 store.clone(),
                 index_settings.clone(),
             )
-            .await;
+            .await?;
         tasks.push(message_indexer);
-        tasks.push(
-            self.build_delivery_indexer(
+
+        let delivery_indexer = self
+            .build_delivery_indexer(
                 domain.clone(),
                 self.core_metrics.clone(),
                 self.contract_sync_metrics.clone(),
                 store.clone(),
                 index_settings.clone(),
             )
-            .await,
-        );
-        tasks.push(
-            self.build_interchain_gas_payment_indexer(
+            .await?;
+        tasks.push(delivery_indexer);
+
+        let gas_payment_indexer = self
+            .build_interchain_gas_payment_indexer(
                 domain,
                 self.core_metrics.clone(),
                 self.contract_sync_metrics.clone(),
@@ -173,18 +175,75 @@ impl Scraper {
                 index_settings.clone(),
                 BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await,
             )
-            .await,
-        );
+            .await?;
+        tasks.push(gas_payment_indexer);
 
-        tokio::spawn(async move {
+        Ok(tokio::spawn(async move {
             // If any of the tasks panic, we want to propagate it, so we unwrap
             try_join_all(tasks).await.unwrap();
         })
-        .instrument(info_span!("Scraper Tasks"))
+        .instrument(info_span!("Scraper Tasks")))
+    }
+
+    async fn build_chain_scraper(
+        domain: &HyperlaneDomain,
+        settings: &ScraperSettings,
+        metrics: Arc<CoreMetrics>,
+        scraper_db: ScraperDb,
+    ) -> eyre::Result<ChainScraper> {
+        info!(domain = domain.name(), "create chain scraper for domain");
+        let chain_setup = settings.chain_setup(domain)?;
+        info!(domain = domain.name(), "create HyperlaneProvider");
+        let provider = settings
+            .build_provider(domain, &metrics.clone())
+            .await?
+            .into();
+        info!(domain = domain.name(), "create HyperlaneDbStore");
+        let store = HyperlaneDbStore::new(
+            scraper_db,
+            domain.clone(),
+            chain_setup.addresses.mailbox,
+            chain_setup.addresses.interchain_gas_paymaster,
+            provider,
+            &chain_setup.index.clone(),
+        )
+        .await?;
+        Ok(ChainScraper {
+            domain: domain.clone(),
+            store,
+            index_settings: chain_setup.index.clone(),
+        })
+    }
+
+    async fn build_chain_scrapers(
+        settings: &ScraperSettings,
+        metrics: Arc<CoreMetrics>,
+        chain_metrics: &ChainMetrics,
+        scraper_db: ScraperDb,
+    ) -> HashMap<u32, ChainScraper> {
+        let mut scrapers: HashMap<u32, ChainScraper> = HashMap::new();
+
+        for domain in settings.chains_to_scrape.iter() {
+            match Self::build_chain_scraper(domain, settings, metrics.clone(), scraper_db.clone())
+                .await
+            {
+                Ok(scraper) => {
+                    info!(domain = domain.name(), "insert chain scraper");
+                    scrapers.insert(domain.id(), scraper);
+                }
+                Err(err) => {
+                    chain_metrics.set_critical_error(domain.name(), true);
+                    info!(
+                        domain = domain.name(),
+                        ?err,
+                        "Failed to build chain scraper"
+                    );
+                }
+            }
+        }
+        scrapers
     }
-}
 
-impl Scraper {
     async fn build_message_indexer(
         &self,
         domain: HyperlaneDomain,
@@ -192,10 +251,10 @@ impl Scraper {
         contract_sync_metrics: Arc<ContractSyncMetrics>,
         store: HyperlaneDbStore,
         index_settings: IndexSettings,
-    ) -> (
+    ) -> eyre::Result<(
         Instrumented<JoinHandle<()>>,
         Option<BroadcastMpscSender<H512>>,
-    ) {
+    )> {
         let sync = self
             .as_ref()
             .settings
@@ -207,17 +266,20 @@ impl Scraper {
                 true,
             )
             .await
-            .unwrap();
-        let cursor = sync
-            .cursor(index_settings.clone())
-            .await
-            .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
+            .map_err(|err| {
+                tracing::error!(?err, ?domain, "Error syncing sequenced contract");
+                err
+            })?;
+        let cursor = sync.cursor(index_settings.clone()).await.map_err(|err| {
+            tracing::error!(?err, ?domain, "Error getting cursor");
+            err
+        })?;
         let maybe_broadcaser = sync.get_broadcaster();
         let task = tokio::spawn(async move { sync.sync("message_dispatch", cursor.into()).await })
             .instrument(
                 info_span!("ChainContractSync", chain=%domain.name(), event="message_dispatch"),
             );
-        (task, maybe_broadcaser)
+        Ok((task, maybe_broadcaser))
     }
 
     async fn build_delivery_indexer(
@@ -227,7 +289,7 @@ impl Scraper {
         contract_sync_metrics: Arc<ContractSyncMetrics>,
         store: HyperlaneDbStore,
         index_settings: IndexSettings,
-    ) -> Instrumented<JoinHandle<()>> {
+    ) -> eyre::Result<Instrumented<JoinHandle<()>>> {
         let sync = self
             .as_ref()
             .settings
@@ -239,17 +301,22 @@ impl Scraper {
                 true,
             )
             .await
-            .unwrap();
+            .map_err(|err| {
+                tracing::error!(?err, ?domain, "Error syncing contract");
+                err
+            })?;
 
         let label = "message_delivery";
-        let cursor = sync
-            .cursor(index_settings.clone())
-            .await
-            .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
+        let cursor = sync.cursor(index_settings.clone()).await.map_err(|err| {
+            tracing::error!(?err, ?domain, "Error getting cursor");
+            err
+        })?;
         // there is no txid receiver for delivery indexing, since delivery txs aren't batched with
         // other types of indexed txs / events
-        tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await })
-            .instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
+        Ok(tokio::spawn(
+            async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await },
+        )
+        .instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label)))
     }
 
     async fn build_interchain_gas_payment_indexer(
@@ -260,7 +327,7 @@ impl Scraper {
         store: HyperlaneDbStore,
         index_settings: IndexSettings,
         tx_id_receiver: Option<MpscReceiver<H512>>,
-    ) -> Instrumented<JoinHandle<()>> {
+    ) -> eyre::Result<Instrumented<JoinHandle<()>>> {
         let sync = self
             .as_ref()
             .settings
@@ -272,17 +339,181 @@ impl Scraper {
                 true,
             )
             .await
-            .unwrap();
+            .map_err(|err| {
+                tracing::error!(?err, ?domain, "Error syncing contract");
+                err
+            })?;
 
         let label = "gas_payment";
-        let cursor = sync
-            .cursor(index_settings.clone())
-            .await
-            .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
-        tokio::spawn(async move {
+        let cursor = sync.cursor(index_settings.clone()).await.map_err(|err| {
+            tracing::error!(?err, ?domain, "Error getting cursor");
+            err
+        })?;
+        Ok(tokio::spawn(async move {
             sync.sync(label, SyncOptions::new(Some(cursor), tx_id_receiver))
                 .await
         })
-        .instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
+        .instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label)))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::collections::BTreeMap;
+
+    use ethers::utils::hex;
+    use ethers_prometheus::middleware::PrometheusMiddlewareConf;
+    use prometheus::{opts, IntGaugeVec, Registry};
+    use reqwest::Url;
+
+    use hyperlane_base::{
+        settings::{
+            ChainConf, ChainConnectionConf, CoreContractAddresses, Settings, TracingConfig,
+        },
+        BLOCK_HEIGHT_HELP, BLOCK_HEIGHT_LABELS, CRITICAL_ERROR_HELP, CRITICAL_ERROR_LABELS,
+    };
+    use hyperlane_core::{
+        config::OperationBatchConfig, IndexMode, KnownHyperlaneDomain, ReorgPeriod, H256,
+    };
+    use hyperlane_ethereum as h_eth;
+    use sea_orm::{DatabaseBackend, MockDatabase};
+
+    use super::*;
+
+    fn generate_test_scraper_settings() -> ScraperSettings {
+        let chains = [(
+            "arbitrum".to_string(),
+            ChainConf {
+                domain: HyperlaneDomain::Known(KnownHyperlaneDomain::Arbitrum),
+                signer: None,
+                reorg_period: ReorgPeriod::None,
+                addresses: CoreContractAddresses {
+                    mailbox: H256::from_slice(
+                        hex::decode(
+                            "000000000000000000000000598facE78a4302f11E3de0bee1894Da0b2Cb71F8",
+                        )
+                        .unwrap()
+                        .as_slice(),
+                    ),
+                    interchain_gas_paymaster: H256::from_slice(
+                        hex::decode(
+                            "000000000000000000000000c756cFc1b7d0d4646589EDf10eD54b201237F5e8",
+                        )
+                        .unwrap()
+                        .as_slice(),
+                    ),
+                    validator_announce: H256::from_slice(
+                        hex::decode(
+                            "0000000000000000000000001b33611fCc073aB0737011d5512EF673Bff74962",
+                        )
+                        .unwrap()
+                        .as_slice(),
+                    ),
+                    merkle_tree_hook: H256::from_slice(
+                        hex::decode(
+                            "000000000000000000000000AD34A66Bf6dB18E858F6B686557075568c6E031C",
+                        )
+                        .unwrap()
+                        .as_slice(),
+                    ),
+                },
+                connection: ChainConnectionConf::Ethereum(h_eth::ConnectionConf {
+                    rpc_connection: h_eth::RpcConnectionConf::Http {
+                        url: Url::parse("https://sepolia-rollup.arbitrum.io/rpc").unwrap(),
+                    },
+                    transaction_overrides: h_eth::TransactionOverrides {
+                        gas_price: None,
+                        gas_limit: None,
+                        max_fee_per_gas: None,
+                        max_priority_fee_per_gas: None,
+                    },
+                    operation_batch: OperationBatchConfig {
+                        batch_contract_address: None,
+                        max_batch_size: 1,
+                    },
+                }),
+                metrics_conf: PrometheusMiddlewareConf {
+                    contracts: HashMap::new(),
+                    chain: None,
+                },
+                index: IndexSettings {
+                    from: 0,
+                    chunk_size: 1,
+                    mode: IndexMode::Block,
+                },
+            },
+        )];
+
+        ScraperSettings {
+            base: Settings {
+                chains: chains.into_iter().collect(),
+                metrics_port: 5000,
+                tracing: TracingConfig::default(),
+            },
+            db: String::new(),
+            chains_to_scrape: vec![],
+        }
+    }
+
+    #[tokio::test]
+    #[tracing_test::traced_test]
+    async fn test_failed_build_chain_scrapers() {
+        let mut settings = generate_test_scraper_settings();
+
+        let registry = Registry::new();
+        let core_metrics = CoreMetrics::new("scraper", 4000, registry).unwrap();
+        let chain_metrics = ChainMetrics {
+            block_height: IntGaugeVec::new(
+                opts!("block_height", BLOCK_HEIGHT_HELP),
+                BLOCK_HEIGHT_LABELS,
+            )
+            .unwrap(),
+            gas_price: None,
+            critical_error: IntGaugeVec::new(
+                opts!("critical_error", CRITICAL_ERROR_HELP),
+                CRITICAL_ERROR_LABELS,
+            )
+            .unwrap(),
+        };
+
+        // set the chains we want to scrape
+        settings.chains_to_scrape = vec![
+            HyperlaneDomain::Known(KnownHyperlaneDomain::Arbitrum),
+            HyperlaneDomain::Known(KnownHyperlaneDomain::Ethereum),
+        ];
+
+        // Create MockDatabase with mock query results
+        let db = MockDatabase::new(DatabaseBackend::Postgres).append_query_results([
+            // First query result
+            vec![[("height", sea_orm::Value::BigInt(Some(100)))]
+                .into_iter()
+                .collect::<BTreeMap<_, _>>()],
+        ]);
+        let scraper_db = ScraperDb::with_connection(db.into_connection());
+
+        let scrapers = Scraper::build_chain_scrapers(
+            &settings,
+            Arc::new(core_metrics),
+            &chain_metrics,
+            scraper_db,
+        )
+        .await;
+
+        assert_eq!(scrapers.len(), 1);
+        assert!(scrapers.contains_key(&(KnownHyperlaneDomain::Arbitrum as u32)));
+
+        // Arbitrum chain should not have any errors because it's ChainConf exists
+        let metric = chain_metrics
+            .critical_error
+            .get_metric_with_label_values(&["arbitrum"])
+            .unwrap();
+        assert_eq!(metric.get(), 0);
+
+        // Ethereum chain should error because it is missing ChainConf
+        let metric = chain_metrics
+            .critical_error
+            .get_metric_with_label_values(&["ethereum"])
+            .unwrap();
+        assert_eq!(metric.get(), 1);
     }
 }
diff --git a/rust/main/agents/scraper/src/db/block_cursor.rs b/rust/main/agents/scraper/src/db/block_cursor.rs
index 0f4237d4a1..7d8fd7d5cf 100644
--- a/rust/main/agents/scraper/src/db/block_cursor.rs
+++ b/rust/main/agents/scraper/src/db/block_cursor.rs
@@ -101,6 +101,6 @@ impl BlockCursor {
 
 impl ScraperDb {
     pub async fn block_cursor(&self, domain: u32, default_height: u64) -> Result<BlockCursor> {
-        BlockCursor::new(self.0.clone(), domain, default_height).await
+        BlockCursor::new(self.clone_connection(), domain, default_height).await
     }
 }
diff --git a/rust/main/agents/scraper/src/db/mod.rs b/rust/main/agents/scraper/src/db/mod.rs
index 41a99e349d..14db886b33 100644
--- a/rust/main/agents/scraper/src/db/mod.rs
+++ b/rust/main/agents/scraper/src/db/mod.rs
@@ -3,7 +3,7 @@ pub use block_cursor::BlockCursor;
 use eyre::Result;
 pub use message::*;
 pub use payment::*;
-use sea_orm::{Database, DbConn};
+use sea_orm::{Database, DatabaseConnection, DbConn};
 use tracing::instrument;
 pub use txn::*;
 
@@ -19,7 +19,7 @@ mod txn;
 
 /// Database interface to the message explorer database for the scraper. This is
 /// focused on writing data to the database.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct ScraperDb(DbConn);
 
 impl ScraperDb {
@@ -28,4 +28,31 @@ impl ScraperDb {
         let db = Database::connect(url).await?;
         Ok(Self(db))
     }
+
+    #[cfg(test)]
+    pub fn with_connection(db: DbConn) -> Self {
+        Self(db)
+    }
+
+    pub fn clone_connection(&self) -> DbConn {
+        match &self.0 {
+            DatabaseConnection::SqlxPostgresPoolConnection(conn) => {
+                DatabaseConnection::SqlxPostgresPoolConnection(conn.clone())
+            }
+            DatabaseConnection::Disconnected => DatabaseConnection::Disconnected,
+            DatabaseConnection::MockDatabaseConnection(conn) => {
+                DatabaseConnection::MockDatabaseConnection(conn.clone())
+            }
+        }
+    }
+}
+
+/// Not sure why Seaorm's DatabaseConnection does not #[derive(Clone)]
+/// when "mock" feature is enabled.
+/// So we have to implement our own clone instead of #[derive(Clone)]
+impl Clone for ScraperDb {
+    fn clone(&self) -> Self {
+        let conn = self.clone_connection();
+        Self(conn)
+    }
 }
diff --git a/rust/main/agents/scraper/src/settings.rs b/rust/main/agents/scraper/src/settings.rs
index b6c9eb7e49..6b0b4fe211 100644
--- a/rust/main/agents/scraper/src/settings.rs
+++ b/rust/main/agents/scraper/src/settings.rs
@@ -26,7 +26,7 @@ pub struct ScraperSettings {
     #[as_mut]
     #[deref]
     #[deref_mut]
-    base: Settings,
+    pub base: Settings,
 
     pub db: String,
     pub chains_to_scrape: Vec<HyperlaneDomain>,