Skip to content

Commit

Permalink
tests: add integration tests with full dbs sync scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
n00m4d committed Jan 22, 2025
1 parent a10db36 commit e711ef3
Show file tree
Hide file tree
Showing 31 changed files with 874 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 5
--mount type=bind,source=${{ github.workspace }}/rocks_dump,target=/rocks_dump,readonly
steps:
- name: Check out
uses: actions/checkout@v3

- name: Prepare directories
run: mkdir -p db-data rocks_dump

- name: Install dependencies
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ my_rocksdb/
_rocksdb_backup/
_rocksdb_backup_archives/
/.project
# Used by integration tests
rocks_dump
4 changes: 3 additions & 1 deletion integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ This Cargo package helps us run multi-package tests in our workspace. This setup

## Setup

First setup a local Postgres database and export the postgres database URL as follows:
First, set up a local PostgreSQL database. This can be done using the `run_postgres.sh` script. It is highly recommended to use this script as it creates and mounts the required directory, `rocks_dump`. This directory is essential for tests involving full database synchronization.

Then export the postgres database URL as follows:
```export DATABASE_TEST_URL=postgres://postgres@localhost/<database_name>```

Also gain access to mainnet RPCs and devnet RPCs and export the URLs as follows. Currently,
Expand Down
34 changes: 34 additions & 0 deletions integration_tests/run_postgres.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

CONTAINER_NAME="test_db"
IMAGE_NAME="postgres:14"
DB_USER="solana"
DB_PASSWORD="solana"
DB_NAME="solana"
DB_PATH="./db-data"
ROCKS_DUMP_PATH="./rocks_dump"
HOST_PORT="5432"
CONTAINER_PORT="5432"

mkdir -p "$DB_PATH"
mkdir -p "$ROCKS_DUMP_PATH"

docker run -d \
--name $CONTAINER_NAME \
-e POSTGRES_USER=$DB_USER \
-e POSTGRES_PASSWORD=$DB_PASSWORD \
-e POSTGRES_DB=$DB_NAME \
-v "$DB_PATH:/var/lib/postgresql/data:rw" \
-v "$ROCKS_DUMP_PATH:/rocks_dump:ro" \
-p $HOST_PORT:$CONTAINER_PORT \
--shm-size=1g \
$IMAGE_NAME \
postgres -c log_statement=none \
-c log_destination=stderr \

if [ $? -eq 0 ]; then
echo "PostgreSQL container '$CONTAINER_NAME' is running."
else
echo "Failed to start the PostgreSQL container."
exit 1
fi
2 changes: 1 addition & 1 deletion integration_tests/src/account_update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn index_account_update(setup: &TestSetup, pubkey: Pubkey, update: Account
let is_startup = false;

let fbb = serialize_account(fbb, &account_info, slot, is_startup);
index_account_bytes(setup, fbb.finished_data().to_vec()).await;
index_and_sync_account_bytes(setup, fbb.finished_data().to_vec()).await;
}

#[tokio::test]
Expand Down
54 changes: 42 additions & 12 deletions integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ const API_MAX_PAGE_LIMIT: usize = 100;

const DUMP_SYNCHRONIZER_BATCH_SIZE: usize = 1000;
const SYNCHRONIZER_PARALLEL_TASKS: usize = 1;
const SYNCHRONIZER_DUMP_PATH: &str = "rocks_dump";

const POSTGRE_MIGRATIONS_PATH: &str = "../migrations";
const POSTGRE_BASE_DUMP_PATH: &str = "/";

pub struct TestSetup {
pub name: String,
Expand Down Expand Up @@ -105,7 +107,7 @@ impl TestSetup {
red_metrics.clone(),
MIN_PG_CONNECTIONS,
POSTGRE_MIGRATIONS_PATH,
Some(PathBuf::from_str("./dump").unwrap()),
Some(PathBuf::from_str(POSTGRE_BASE_DUMP_PATH).unwrap()),
None,
)
.await
Expand Down Expand Up @@ -185,7 +187,7 @@ impl TestSetup {
storage.clone(),
index_storage.clone(),
DUMP_SYNCHRONIZER_BATCH_SIZE,
"./dump".to_string(),
SYNCHRONIZER_DUMP_PATH.to_string(),
metrics_state.synchronizer_metrics.clone(),
SYNCHRONIZER_PARALLEL_TASKS,
);
Expand Down Expand Up @@ -395,7 +397,16 @@ pub async fn get_token_largest_account(client: &RpcClient, mint: Pubkey) -> anyh
}
}

pub async fn index_account_bytes(setup: &TestSetup, account_bytes: Vec<u8>) {
pub async fn index_and_sync_account_bytes(setup: &TestSetup, account_bytes: Vec<u8>) {
process_and_save_accounts_to_rocks(setup, account_bytes).await;

let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
// copy data to Postgre
setup.synchronizer.synchronize_nft_asset_indexes(&shutdown_rx, 1000).await.unwrap();
setup.synchronizer.synchronize_fungible_asset_indexes(&shutdown_rx, 1000).await.unwrap();
}

async fn process_and_save_accounts_to_rocks(setup: &TestSetup, account_bytes: Vec<u8>) {
let parsed_acc = setup.message_parser.parse_account(account_bytes, false).unwrap();
let ready_to_process = parsed_acc
.into_iter()
Expand Down Expand Up @@ -425,11 +436,6 @@ pub async fn index_account_bytes(setup: &TestSetup, account_bytes: Vec<u8>) {
.await;

let _ = batch_storage.flush();

let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
setup.synchronizer.synchronize_nft_asset_indexes(&shutdown_rx, 1000).await.unwrap();

setup.synchronizer.synchronize_fungible_asset_indexes(&shutdown_rx, 1000).await.unwrap();
}

pub async fn cached_fetch_account(
Expand Down Expand Up @@ -539,11 +545,12 @@ pub enum Order {
AllPermutations,
}

/// Data will be indexed, saved to RocskDB and copied to Postgre.
pub async fn index_seed_events(setup: &TestSetup, events: Vec<&SeedEvent>) {
for event in events {
match event {
SeedEvent::Account(account) => {
index_account_with_ordered_slot(setup, *account).await;
index_and_sync_account_with_ordered_slot(setup, *account).await;
},
SeedEvent::Nft(mint) => {
index_nft(setup, *mint).await;
Expand All @@ -558,6 +565,23 @@ pub async fn index_seed_events(setup: &TestSetup, events: Vec<&SeedEvent>) {
}
}

/// Data will be indexed and saved to one DB - RocksDB.
///
/// For sync with Postgre additional method should be called.
pub async fn single_db_index_seed_events(setup: &TestSetup, events: Vec<&SeedEvent>) {
for event in events {
match event {
SeedEvent::Account(account) => {
index_account_with_ordered_slot(setup, *account).await;
},
_ => {
// TODO: add more seed events processing if it needs
panic!("Current SeedEvent is not supported for single DB processing")
},
}
}
}

#[allow(unused)]
pub fn seed_account(str: &str) -> SeedEvent {
SeedEvent::Account(Pubkey::from_str(str).unwrap())
Expand Down Expand Up @@ -616,7 +640,7 @@ pub async fn index_account(setup: &TestSetup, account: Pubkey) {
// they are "stale".
let slot = Some(DEFAULT_SLOT);
let account_bytes = cached_fetch_account(setup, account, slot).await;
index_account_bytes(setup, account_bytes).await;
index_and_sync_account_bytes(setup, account_bytes).await;
}

#[derive(Clone, Copy)]
Expand All @@ -632,10 +656,16 @@ pub async fn get_nft_accounts(setup: &TestSetup, mint: Pubkey) -> NftAccounts {
NftAccounts { mint, metadata: metadata_account, token: token_account }
}

async fn index_and_sync_account_with_ordered_slot(setup: &TestSetup, account: Pubkey) {
let slot = None;
let account_bytes = cached_fetch_account(setup, account, slot).await;
index_and_sync_account_bytes(setup, account_bytes).await;
}

async fn index_account_with_ordered_slot(setup: &TestSetup, account: Pubkey) {
let slot = None;
let account_bytes = cached_fetch_account(setup, account, slot).await;
index_account_bytes(setup, account_bytes).await;
process_and_save_accounts_to_rocks(setup, account_bytes).await;
}

async fn index_token_mint(setup: &TestSetup, mint: Pubkey) {
Expand All @@ -650,7 +680,7 @@ async fn index_token_mint(setup: &TestSetup, mint: Pubkey) {
let metadata_account = Metadata::find_pda(&mint).0;
match cached_fetch_account_with_error_handling(setup, metadata_account, slot).await {
Ok(account_bytes) => {
index_account_bytes(setup, account_bytes).await;
index_and_sync_account_bytes(setup, account_bytes).await;
},
Err(_) => {
// If we can't find the metadata account, then we assume that the mint is not an NFT.
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ mod common;
mod general_scenario_tests;
mod mpl_core_tests;
mod regular_nft_tests;
mod synchronizer_tests;
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
---
source: integration_tests/src/synchronizer_tests.rs
assertion_line: 183
expression: response
snapshot_kind: text
---
{
"total": 2,
"limit": 50,
"page": 1,
"items": [
{
"interface": "MplCoreAsset",
"id": "9tsHoBrkSqBW5uMxKZyvxL6m9CCaz1a7sGEg8SuckUj",
"content": {
"$schema": "https://schema.metaplex.com/nft1.0.json",
"json_uri": "https://example.com/asset",
"files": [],
"metadata": {
"name": "Test Asset",
"symbol": ""
},
"links": {}
},
"authorities": [
{
"address": "APrZTeVysBJqAznfLXS71NAzjr2fCVTSF1A66MeErzM7",
"scopes": [
"full"
]
}
],
"compression": {
"eligible": false,
"compressed": false,
"data_hash": "",
"creator_hash": "",
"asset_hash": "",
"tree": "",
"seq": 0,
"leaf_id": 0
},
"grouping": [
{
"group_key": "collection",
"group_value": "4FFhh184GNqh3LEK8UhMY7KBuCdNvvhU7C23ZKrKnofb",
"verified": true
}
],
"royalty": {
"royalty_model": "creators",
"target": null,
"percent": 0.0,
"basis_points": 0,
"primary_sale_happened": false,
"locked": false
},
"creators": [],
"ownership": {
"frozen": false,
"delegated": false,
"delegate": null,
"ownership_model": "single",
"owner": "7uScVQiT4vArB88dHrZoeVKWbtsRJmNp9r5Gce5VQpXS"
},
"supply": null,
"mutable": true,
"burnt": false,
"lamports": 3156480,
"executable": false,
"rent_epoch": 18446744073709551615,
"plugins": {},
"mpl_core_info": {
"plugins_json_version": 1
},
"external_plugins": []
},
{
"interface": "V1_NFT",
"id": "8qbRNh9Q9pcksZVnmQemoh7is2NqsRNTx4jmpv75knC6",
"content": {
"$schema": "https://schema.metaplex.com/nft1.0.json",
"json_uri": "https://arweave.net/zt75zlealvVfW7mFy7VHmr_bs3OJFbZIXiORXSka-I4",
"files": [],
"metadata": {
"name": "Thug",
"symbol": "TL",
"token_standard": "NonFungible"
},
"links": {}
},
"authorities": [
{
"address": "3VvLDXqJbw3heyRwFxv8MmurPznmDVUJS9gPMX2BDqfM",
"scopes": [
"full"
]
}
],
"compression": {
"eligible": false,
"compressed": false,
"data_hash": "",
"creator_hash": "",
"asset_hash": "",
"tree": "",
"seq": 0,
"leaf_id": 0
},
"grouping": [],
"royalty": {
"royalty_model": "creators",
"target": null,
"percent": 0.0,
"basis_points": 0,
"primary_sale_happened": false,
"locked": false
},
"creators": [],
"ownership": {
"frozen": false,
"delegated": false,
"delegate": null,
"ownership_model": "single",
"owner": "7uScVQiT4vArB88dHrZoeVKWbtsRJmNp9r5Gce5VQpXS"
},
"supply": {
"print_max_supply": 1000,
"print_current_supply": 0,
"edition_nonce": 255
},
"mutable": true,
"burnt": false,
"lamports": 5616720,
"executable": false,
"metadata_owner": "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s",
"rent_epoch": 18446744073709551615,
"token_info": {
"supply": 1,
"decimals": 0,
"token_program": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
"mint_authority": "HEsxPaf6QFNBaN3LiVQAke99WaFMhT8JC2bWityF7mwZ",
"freeze_authority": "HEsxPaf6QFNBaN3LiVQAke99WaFMhT8JC2bWityF7mwZ"
}
}
]
}
Loading

0 comments on commit e711ef3

Please sign in to comment.