Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MTG-972] synchronizer optimized #333

Open
wants to merge 41 commits into
base: feature/MTG-868-slots-storage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9f68ff5
dumper binary measuring rate of dump
StanChe Nov 26, 2024
9492a41
dumper with sharding
StanChe Nov 27, 2024
8ff64a7
fix for messaging
StanChe Nov 27, 2024
ef2daae
a more appropriate sharding
StanChe Nov 27, 2024
c9790f2
concurrency safety via locks
StanChe Nov 27, 2024
410fd3e
refactoring of synchronization, collecting metadata to dump it in the…
StanChe Nov 27, 2024
00435a8
fix after merge
StanChe Nov 28, 2024
7211371
simplified synchronizer files a bit and refactored dumper after the m…
StanChe Nov 28, 2024
b2917ba
removed unused file
StanChe Nov 28, 2024
3f4556a
dumper dumping everything: added fungibles and the last known keys
StanChe Nov 28, 2024
4d9647b
synchronizer rework
StanChe Nov 29, 2024
21d9605
cleanup
StanChe Nov 29, 2024
e53b9ae
removed incorrect SQL constraint disablement
StanChe Dec 2, 2024
e8b51af
fix for messed up minus in sql table name
StanChe Dec 2, 2024
dd1c1f6
handle cases when the previous dump was not successful - if exists check
StanChe Dec 2, 2024
14e8a80
using less min connections and really using that value
StanChe Dec 2, 2024
18055dc
synchronizing calls to tasks table with a semaphore
StanChe Dec 2, 2024
0be380f
fixing the synchronizer locking
StanChe Dec 3, 2024
e86bcf9
metrics names
StanChe Dec 3, 2024
116ab14
a more adequate number of min connections for postgres
StanChe Dec 3, 2024
34f6711
init the logger first
StanChe Dec 3, 2024
8637ef2
reverted logger init
StanChe Dec 3, 2024
bbb62a7
fucking config read in the json worker
StanChe Dec 3, 2024
82e6a55
rework of the PG load order
StanChe Dec 3, 2024
995ba0e
trying to catch, why docker is restarting
StanChe Dec 3, 2024
c7edea1
no clue what's happening inside docker
StanChe Dec 3, 2024
1fc1dda
commented out turning table to unlogged to experiment with batch load…
StanChe Dec 4, 2024
3d331f5
MTG-929 fix for the docker build
StanChe Dec 4, 2024
d326991
MTG-929 fix for makefile
StanChe Dec 4, 2024
a67fb9a
MTG-929 fix for makefile
StanChe Dec 4, 2024
4556bdf
using a simple method of loading directly without temp tables
StanChe Dec 4, 2024
a5006c8
fix for handling some out of order updates potentially leaving us wit…
StanChe Dec 5, 2024
150b789
fix for assetindex cleaner
StanChe Dec 6, 2024
2a94704
Merge pull request #334 from metaplex-foundation/feature/MTG-1018-fix…
StanChe Dec 6, 2024
24eb52a
backfill through docker compose
StanChe Dec 6, 2024
2caedad
backfiller in Makefile
StanChe Dec 6, 2024
2efeace
fix separator
StanChe Dec 6, 2024
16c10b8
fix for Makefile
StanChe Dec 6, 2024
5ba69be
makefile fix
StanChe Dec 6, 2024
2d34f03
semaphored the load data requests in order to avoid lengthy locks
StanChe Dec 9, 2024
9a74ad1
shards number change
StanChe Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
.PHONY: build start build-integrity-verification start-integrity-verification start-raw-backfiller start-core-indexing dev stop clippy test

.PHONY: build start build-integrity-verification start-integrity-verification start-core-indexing dev stop clippy test
# start-backfiller
SHELL := /bin/bash

build:
@docker compose -f docker-compose.yaml build ingester raw-backfiller das-api synchronizer core-indexing slot-persister
@docker compose -f docker-compose.yaml build ingester das-api synchronizer core-indexing slot-persister
# backfiller

start:
@docker compose -f docker-compose.yaml up -d ingester
Expand All @@ -26,8 +27,8 @@ build-integrity-verification:
start-integrity-verification:
@docker compose -f docker-compose.yaml up -d integrity-verification

start-raw-backfiller:
@docker compose -f docker-compose.yaml up -d raw-backfiller
# start-backfiller:
# @docker compose -f docker-compose.yaml up -d backfiller

start-core-indexing:
@docker compose -f docker-compose.yaml up -d core-indexing
Expand Down
77 changes: 35 additions & 42 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
ingester:
container_name: ingester
restart: always
entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; else exec ./profiling_ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; fi"
entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; else exec ./profiling_ingester --slots-db-path $$INGESTER_SLOTS_DB_PATH --secondary-slots-db-path $$INGESTER_SECONDARY_SLOTS_DB_PATH; fi"
env_file:
- .env
network_mode: host
Expand Down Expand Up @@ -34,7 +34,7 @@ services:
das-api:
container_name: das-api
restart: always
entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./api; else exec ./profiling_api; fi"
entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./api; else exec ./profiling_api; fi"
env_file:
- .env
network_mode: host
Expand All @@ -57,7 +57,7 @@ services:
synchronizer:
container_name: synchronizer
restart: always
entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./synchronizer; else exec ./profiling_synchronizer; fi"
entrypoint: sh -c "if [ -z '${MALLOC_CONF}' ]; then exec ./synchronizer; else exec ./profiling_synchronizer; fi"
env_file:
- .env
network_mode: host
Expand Down Expand Up @@ -97,53 +97,46 @@ services:
options:
max-size: "2048m"

raw-backfiller:
container_name: raw-backfiller
restart: always
entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./raw_backfiller; else exec ./profiling_raw_backfiller; fi"
env_file:
- .env
network_mode: host
volumes:
- ${INGESTER_ROCKS_DB_PATH}:${INGESTER_ROCKS_DB_PATH_CONTAINER}:rw
- ${INGESTER_PROFILING_FILE_PATH}:${INGESTER_PROFILING_FILE_PATH_CONTAINER}:rw
- ./creds.json:/usr/src/app/creds.json
- ./heaps:/usr/src/app/heaps:rw
stop_grace_period: 5m
build:
context: .
dockerfile: ingester.Dockerfile
logging:
options:
max-size: "2048m"
# TODO: pass the arguments to the container, check the mounts as we have 2 dbs - slots and the main one
# backfill:
# container_name: backfill
# restart: always
# entrypoint: sh -c "if [ -z '$$MALLOC_CONF' ]; then exec ./backfill; else exec ./profiling_backfill; fi"
# env_file:
# - .env
# network_mode: host
# volumes:
# - ${INGESTER_ROCKS_DB_PATH}:${INGESTER_ROCKS_DB_PATH_CONTAINER}:rw
# - ${INGESTER_PROFILING_FILE_PATH}:${INGESTER_PROFILING_FILE_PATH_CONTAINER}:rw
# - ./creds.json:/usr/src/app/creds.json
# - ./heaps:/usr/src/app/heaps:rw
# stop_grace_period: 5m
# build:
# context: .
# dockerfile: ingester.Dockerfile
# logging:
# options:
# max-size: "2048m"

slot-persister:
container_name: slot-persister
restart: always
entrypoint: |
sh -c "
ARGS=\"--target-db-path $target_db_path\"
ARGS=\"$$ARGS --rpc-host $rpc_host\"
[ -n \"$start_slot\" ] && ARGS=\"$$ARGS --start-slot $start_slot\"
[ -n \"$big_table_credentials\" ] && ARGS=\"$$ARGS --big-table-credentials $big_table_credentials\"
[ -n \"$big_table_timeout\" ] && ARGS=\"$$ARGS --big-table-timeout $big_table_timeout\"
[ -n \"$metrics_port\" ] && ARGS=\"$$ARGS --metrics-port $metrics_port\"
[ -n \"$chunk_size\" ] && ARGS=\"$$ARGS --chunk-size $chunk_size\"
[ -n \"$max_concurrency\" ] && ARGS=\"$$ARGS --max-concurrency $max_concurrency\"

if [ -z \"$MALLOC_CONF\" ]; then
exec ./slot_persister $$ARGS
else
exec ./profiling_slot_persister $$ARGS
fi"
entrypoint: >
sh -c "if [ -f /usr/src/app/creds.json ]; then
export BIG_TABLE_CREDENTIALS=/usr/src/app/creds.json;
fi;
exec ./slot_persister"
env_file:
- .env
environment:
SLOTS_DB_PRIMARY_PATH: ${INGESTER_SLOTS_DB_PATH}
RPC_HOST: ${INGESTER_RPC_HOST}
# BIG_TABLE_CREDENTIALS: /usr/src/app/creds.json # refactored this to account for the file doesn't exist case
METRICS_PORT: 9090
network_mode: host
volumes:
- ${target_db_path}:${target_db_path}:rw
- ${INGESTER_PROFILING_FILE_PATH}:${INGESTER_PROFILING_FILE_PATH_CONTAINER}:rw
- ${big_table_credentials:-/tmp/creds.json}:${big_table_credentials:-/tmp/creds.json}
- ./heaps:/usr/src/app/heaps:rw
- ${INGESTER_SLOTS_DB_PATH}:${INGESTER_SLOTS_DB_PATH}:rw
- ./creds.json:/usr/src/app/creds.json
stop_grace_period: 5m
build:
context: .
Expand Down
9 changes: 5 additions & 4 deletions ingester.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ COPY rocks-db ./rocks-db
COPY tests/setup ./tests/setup
COPY usecase ./usecase
COPY integrity_verification ./integrity_verification
COPY integration_tests ./integration_tests

RUN cargo chef prepare --recipe-path recipe.json

Expand All @@ -36,12 +37,12 @@ RUN cargo chef cook --release --recipe-path recipe.json
# Building the services
FROM cacher AS builder
COPY . .
RUN cargo build --release --bin ingester --bin api --bin raw_backfiller --bin synchronizer --bin slot_persister
RUN cargo build --release --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister

# Building the profiling feature services
FROM cacher AS builder-with-profiling
COPY . .
RUN cargo build --release --features profiling --bin ingester --bin api --bin raw_backfiller --bin synchronizer --bin slot_persister
RUN cargo build --release --features profiling --bin ingester --bin api --bin backfill --bin synchronizer --bin slot_persister

# Final image
FROM rust:1.76-slim-bullseye AS runtime
Expand All @@ -52,12 +53,12 @@ ENV TZ=Etc/UTC APP_USER=appuser LD_PRELOAD="/usr/local/lib/libjemalloc.so.2"
RUN groupadd $APP_USER && useradd -g $APP_USER $APP_USER && mkdir -p ${APP}

COPY --from=builder /rust/target/release/ingester ${APP}/ingester
COPY --from=builder /rust/target/release/raw_backfiller ${APP}/raw_backfiller
COPY --from=builder /rust/target/release/backfill ${APP}/backfill
COPY --from=builder /rust/target/release/api ${APP}/api
COPY --from=builder /rust/target/release/synchronizer ${APP}/synchronizer
COPY --from=builder /rust/target/release/slot_persister ${APP}/slot_persister
COPY --from=builder-with-profiling /rust/target/release/ingester ${APP}/profiling_ingester
COPY --from=builder-with-profiling /rust/target/release/raw_backfiller ${APP}/profiling_raw_backfiller
COPY --from=builder-with-profiling /rust/target/release/backfill ${APP}/profiling_backfill
COPY --from=builder-with-profiling /rust/target/release/api ${APP}/profiling_api
COPY --from=builder-with-profiling /rust/target/release/synchronizer ${APP}/profiling_synchronizer
COPY --from=builder-with-profiling /rust/target/release/slot_persister ${APP}/profiling_slot_persister
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use tokio::{
};
use usecase::proofs::MaybeProofChecker;

use tracing::{error, info};
use serde::de::DeserializeOwned;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
Expand All @@ -51,6 +50,7 @@ use solana_sdk::{
};
use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding};
use std::{fmt, time::Duration};
use tracing::{error, info};

use std::path::PathBuf;

Expand Down
1 change: 0 additions & 1 deletion interface/src/price_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::error::UsecaseError;
use async_trait::async_trait;
use solana_program::pubkey::Pubkey;
use std::collections::HashMap;

#[async_trait]
Expand Down
6 changes: 5 additions & 1 deletion nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ stretto = { workspace = true }
async-channel = { workspace = true }
tokio-util = { workspace = true }
tracing-subscriber = { workspace = true }
clap = { workspace = true }
clap = { workspace = true, features = ["env"] }
bincode = { workspace = true }
metrics-utils = { path = "../metrics_utils" }
rocks-db = { path = "../rocks-db" }
Expand Down Expand Up @@ -84,6 +84,7 @@ indicatif = { workspace = true }
tokio-retry = { workspace = true }
axum = { workspace = true }
rocksdb = { workspace = true }
num-bigint = "0.4"

[dev-dependencies]
setup = { path = "../tests/setup" }
Expand Down Expand Up @@ -145,3 +146,6 @@ name = "explorer"

[[bin]]
name = "synchronizer_utils"

[[bin]]
name = "dumper"
8 changes: 5 additions & 3 deletions nft_ingester/src/api/dapi/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use interface::processing_possibility::ProcessingPossibilityChecker;
use itertools::Itertools;
use metrics_utils::ApiMetricsConfig;
use rocks_db::asset::{AssetLeaf, AssetSelectedMaps};
use rocks_db::{AssetAuthority, Storage};
use rocks_db::Storage;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};

Expand Down Expand Up @@ -140,7 +140,9 @@ fn asset_selected_maps_into_full_asset(
if let Some(asset_static_details) = &asset_complete_details.static_details {
// collection itself cannot have a collection
// TODO!: should we also include in this check FungibleToken?
if &asset_static_details.specification_asset_class != &SpecificationAssetClass::MplCoreCollection {
if &asset_static_details.specification_asset_class
!= &SpecificationAssetClass::MplCoreCollection
{
if let Some(collection_details) = &asset_complete_details.collection {
if !collection_details.is_collection_verified.value {
return None;
Expand Down Expand Up @@ -294,7 +296,7 @@ pub async fn get_by_ids<
},
);
}
Err(e) => {}
Err(_) => {}
Copy link
Contributor

@kstepanovdev kstepanovdev Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter is unused, indeed. However, completely swallowing the error looks dubious. Maybe should it be logged at least?

}
}

Expand Down
7 changes: 6 additions & 1 deletion nft_ingester/src/api/dapi/rpc_asset_convertors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,12 @@ pub fn to_authority(
// even if there is no authority for asset we should not set Pubkey::default(), just empty string
let auth_key = update_authority
.map(|update_authority| update_authority.to_string())
.unwrap_or(authority.as_ref().map(|auth| auth.authority.to_string()).unwrap_or("".to_string()));
.unwrap_or(
authority
.as_ref()
.map(|auth| auth.authority.to_string())
.unwrap_or("".to_string()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Default?

);

vec![Authority {
address: auth_key,
Expand Down
5 changes: 1 addition & 4 deletions nft_ingester/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::{BackfillerConfig, BackfillerSourceMode, BigTableConfig};
use crate::config::{BackfillerSourceMode, BigTableConfig};
use crate::error::IngesterError;
use async_trait::async_trait;
use backfill_rpc::rpc::BackfillRPC;
Expand Down Expand Up @@ -118,7 +118,6 @@ impl BlockProducer for BackfillSource {

#[derive(Clone)]
pub struct TransactionsParser<C: BlockConsumer, P: BlockProducer, S: SlotGetter> {
rocks_client: Arc<rocks_db::Storage>,
slot_getter: Arc<S>,
consumer: Arc<C>,
producer: Arc<P>,
Expand All @@ -134,7 +133,6 @@ where
S: SlotGetter,
{
pub fn new(
rocks_client: Arc<rocks_db::Storage>,
slot_getter: Arc<S>,
consumer: Arc<C>,
producer: Arc<P>,
Expand All @@ -143,7 +141,6 @@ where
chunk_size: usize,
) -> TransactionsParser<C, P, S> {
TransactionsParser {
rocks_client,
slot_getter,
consumer,
producer,
Expand Down
6 changes: 4 additions & 2 deletions nft_ingester/src/bin/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ pub const DEFAULT_SECONDARY_ROCKSDB_PATH: &str = "./my_rocksdb_secondary";

#[tokio::main(flavor = "multi_thread")]
pub async fn main() -> Result<(), IngesterError> {
tracing_subscriber::fmt::init();
info!("Starting API server...");

let config: ApiConfig = setup_config("API_");
init_logger(&config.get_log_level());
// init_logger(&config.get_log_level());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, delete it if it will not be used


let guard = if config.run_profiling {
Some(
Expand Down Expand Up @@ -73,6 +73,7 @@ pub async fn main() -> Result<(), IngesterError> {
config.database_config.get_database_url()?.as_str(),
min_connections,
max_connections,
None,
red_metrics.clone(),
)
.await?;
Expand Down Expand Up @@ -120,6 +121,7 @@ pub async fn main() -> Result<(), IngesterError> {
rocks_storage.clone(),
json_downloader_metrics.clone(),
red_metrics.clone(),
nft_ingester::config::default_parallel_json_downloaders(),
)
.await,
))
Expand Down
Loading