Skip to content

Commit

Permalink
fix: json downloader (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
n00m4d authored Dec 22, 2023
1 parent 393bc9e commit 7c5665d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 37 deletions.
59 changes: 27 additions & 32 deletions nft_ingester/src/db_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ pub struct TaskForInsert {
#[derive(Debug, Clone)]
pub struct JsonDownloadTask {
pub metadata_url: String,
pub metadata_url_key: i64,
pub status: TaskStatus,
pub attempts: i16,
pub max_attempts: i16,
}

pub struct UpdatedTask {
pub status: TaskStatus,
pub metadata_url: String,
pub metadata_url_key: i64,
pub attempts: i16,
pub error: String,
}
Expand Down Expand Up @@ -92,27 +93,17 @@ impl DBClient {
}

pub async fn update_tasks(&self, data: Vec<UpdatedTask>) -> Result<(), IngesterError> {
let metadata_id = self
.insert_metadata(
&data
.iter()
.map(|task| task.metadata_url.as_str())
.collect::<Vec<_>>(),
)
.await?;
let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(
"INSERT INTO tasks (tsk_metadata_url, tsk_status, tsk_attempts, tsk_error) ",
);
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("UPDATE tasks SET tsk_status = tmp.tsk_status, tsk_attempts = tmp.tsk_attempts, tsk_error = tmp.tsk_error FROM (");

query_builder.push_values(data, |mut b, off_d| {
b.push_bind(metadata_id.get(&off_d.metadata_url));
b.push_bind(off_d.status);
b.push_bind(off_d.attempts);
b.push_bind(off_d.error);
query_builder.push_values(data, |mut b, key| {
b.push_bind(key.metadata_url_key);
b.push_bind(key.status);
b.push_bind(key.attempts);
b.push_bind(key.error);
});

query_builder.push(" ON CONFLICT (tsk_id) DO UPDATE SET tsk_status = EXCLUDED.tsk_status, tsk_metadata_url = EXCLUDED.tsk_metadata_url, tsk_attempts = EXCLUDED.tsk_attempts,
tsk_error = EXCLUDED.tsk_error;");
query_builder.push(") as tmp (tsk_metadata_url, tsk_status, tsk_attempts, tsk_error) WHERE tasks.tsk_metadata_url = tmp.tsk_metadata_url;");

let query = query_builder.build();
query
Expand All @@ -137,7 +128,9 @@ impl DBClient {
tsk_locked_until = NOW() + INTERVAL '20 seconds'
FROM cte
WHERE t.tsk_id = cte.tsk_id
RETURNING t.tsk_metadata_url, t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;");
RETURNING (
SELECT mtd_url FROM metadata m WHERE m.mtd_id = t.tsk_metadata_url) as metadata_url, t.tsk_metadata_url,
t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;");

let query = query_builder.build();
let rows = query
Expand All @@ -148,13 +141,15 @@ impl DBClient {
let mut tasks = Vec::new();

for row in rows {
let metadata_url: String = row.get("tsk_metadata_url");
let metadata_url: String = row.get("metadata_url");
let metadata_url_key: i64 = row.get("tsk_metadata_url");
let status: TaskStatus = row.get("tsk_status");
let attempts: i16 = row.get("tsk_attempts");
let max_attempts: i16 = row.get("tsk_max_attempts");

tasks.push(JsonDownloadTask {
metadata_url,
metadata_url_key,
status,
attempts,
max_attempts,
Expand Down Expand Up @@ -203,16 +198,16 @@ impl DBClient {
let mut offchain_data_to_insert = Vec::new();

for offchain_d in data.iter() {
offchain_data_to_insert.push(TaskForInsert {
ofd_metadata_url: ids_keys
.get(&offchain_d.ofd_metadata_url)
.copied()
.unwrap_or_default(),
ofd_locked_until: offchain_d.ofd_locked_until,
ofd_attempts: offchain_d.ofd_attempts,
ofd_max_attempts: offchain_d.ofd_max_attempts,
ofd_error: offchain_d.ofd_error.clone(),
});
// save tasks only for those links which are new
if let Some(id) = ids_keys.get(&offchain_d.ofd_metadata_url) {
offchain_data_to_insert.push(TaskForInsert {
ofd_metadata_url: *id,
ofd_locked_until: offchain_d.ofd_locked_until,
ofd_attempts: offchain_d.ofd_attempts,
ofd_max_attempts: offchain_d.ofd_max_attempts,
ofd_error: offchain_d.ofd_error.clone(),
});
}
}

offchain_data_to_insert.sort_by(|a, b| a.ofd_metadata_url.cmp(&b.ofd_metadata_url));
Expand All @@ -237,7 +232,7 @@ impl DBClient {
b.push_bind(TaskStatus::Pending);
});

query_builder.push("ON CONFLICT (tsk_id) DO NOTHING;");
query_builder.push("ON CONFLICT (tsk_metadata_url) DO NOTHING;");

let query = query_builder.build();
query
Expand Down
9 changes: 4 additions & 5 deletions nft_ingester/src/json_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl JsonDownloader {
debug!("tasks that need to be executed: {}", tasks.len());

for task in tasks {
let cloned_task = task.clone();
let cloned_db_client = self.db_client.clone();
let cloned_metrics = self.metrics.clone();
let cloned_rocks = self.rocks_db.clone();
Expand All @@ -58,7 +57,7 @@ impl JsonDownloader {
.build()
.map_err(|e| format!("Failed to create client: {:?}", e))
.unwrap();
let response = Client::get(&client, cloned_task.metadata_url)
let response = Client::get(&client, task.metadata_url.clone())
.send()
.await
.map_err(|e| format!("Failed to make request: {:?}", e));
Expand All @@ -80,7 +79,7 @@ impl JsonDownloader {
};
let data_to_insert = UpdatedTask {
status,
metadata_url: task.metadata_url,
metadata_url_key: task.metadata_url_key,
attempts: task.attempts + 1,
error: response.status().as_str().to_string(),
};
Expand All @@ -105,7 +104,7 @@ impl JsonDownloader {
.unwrap();
let data_to_insert = UpdatedTask {
status: TaskStatus::Success,
metadata_url: task.metadata_url,
metadata_url_key: task.metadata_url_key,
attempts: task.attempts + 1,
error: "".to_string(),
};
Expand All @@ -119,7 +118,7 @@ impl JsonDownloader {
} else {
let data_to_insert = UpdatedTask {
status: TaskStatus::Failed,
metadata_url: task.metadata_url,
metadata_url_key: task.metadata_url_key,
attempts: task.attempts + 1,
error: "Failed to deserialize metadata body"
.to_string(),
Expand Down

0 comments on commit 7c5665d

Please sign in to comment.