Skip to content

Commit 0e4878f

Browse files
committed
refactor: DRY block-lookup logic in atomic-data-publisher
Extract shared helpers block_boundary_for_table and next_block_for_table to consolidate duplicated SQL queries across correction job functions. Simplify test helpers with run_block_range_query.
1 parent 8374c90 commit 0e4878f

2 files changed

Lines changed: 68 additions & 126 deletions

File tree

utils/atomic-data-publisher/src/database.rs

Lines changed: 53 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,15 +1009,6 @@ impl DatabaseClient {
10091009
&self,
10101010
query_name: &str,
10111011
parameters: &serde_json::Value,
1012-
) -> Result<Option<u64>> {
1013-
self.get_block_for_query(query_name, parameters, true).await
1014-
}
1015-
1016-
async fn get_block_for_query(
1017-
&self,
1018-
query_name: &str,
1019-
parameters: &serde_json::Value,
1020-
use_max: bool,
10211012
) -> Result<Option<u64>> {
10221013
let pool = self.get_pool().await;
10231014
let block = match query_name {
@@ -1031,26 +1022,12 @@ impl DatabaseClient {
10311022
))
10321023
})?;
10331024

1034-
let (block, has_data) = match hotspot_type {
1025+
let query = match hotspot_type {
10351026
"mobile" => {
1036-
let query = if use_max {
1037-
r#"SELECT COALESCE((SELECT MAX(last_block) FROM mobile_hotspot_infos), -1)::bigint as block"#
1038-
} else {
1039-
r#"SELECT COALESCE((SELECT MIN(last_block) FROM mobile_hotspot_infos), -1)::bigint as block"#
1040-
};
1041-
let row = sqlx::query(query).fetch_one(&*pool).await?;
1042-
let block_value = row.get::<i64, _>("block");
1043-
(block_value, block_value >= 0)
1027+
r#"SELECT COALESCE((SELECT MAX(last_block) FROM mobile_hotspot_infos), -1)::bigint as block"#
10441028
}
10451029
"iot" => {
1046-
let query = if use_max {
1047-
r#"SELECT COALESCE((SELECT MAX(last_block) FROM iot_hotspot_infos), -1)::bigint as block"#
1048-
} else {
1049-
r#"SELECT COALESCE((SELECT MIN(last_block) FROM iot_hotspot_infos), -1)::bigint as block"#
1050-
};
1051-
let row = sqlx::query(query).fetch_one(&*pool).await?;
1052-
let block_value = row.get::<i64, _>("block");
1053-
(block_value, block_value >= 0)
1030+
r#"SELECT COALESCE((SELECT MAX(last_block) FROM iot_hotspot_infos), -1)::bigint as block"#
10541031
}
10551032
_ => {
10561033
return Err(anyhow::anyhow!(
@@ -1059,68 +1036,41 @@ impl DatabaseClient {
10591036
))
10601037
}
10611038
};
1039+
let row = sqlx::query(query).fetch_one(&*pool).await?;
1040+
let block_value = row.get::<i64, _>("block");
10621041

1063-
let operation = if use_max { "Max" } else { "Min" };
10641042
debug!(
1065-
"{} last_block for {} hotspots: {}",
1066-
operation, hotspot_type, block
1043+
"Max last_block for {} hotspots: {}",
1044+
hotspot_type, block_value
10671045
);
10681046

1069-
if has_data {
1070-
Some(block as u64)
1047+
if block_value >= 0 {
1048+
Some(block_value as u64)
10711049
} else {
10721050
None
10731051
}
10741052
}
1075-
"construct_entity_ownership_changes" => {
1076-
let query = if use_max {
1077-
r#"SELECT COALESCE((SELECT MAX(last_block) FROM asset_owners), -1)::bigint as block"#
1078-
} else {
1079-
r#"SELECT COALESCE((SELECT MIN(last_block) FROM asset_owners WHERE last_block >= 0), -1)::bigint as block"#
1053+
"construct_entity_ownership_changes"
1054+
| "construct_welcome_pack_ownership"
1055+
| "construct_welcome_pack_reward_destination" => {
1056+
let table = match query_name {
1057+
"construct_entity_ownership_changes" => "asset_owners",
1058+
_ => "welcome_packs",
10801059
};
1081-
let row = sqlx::query(query).fetch_one(&*pool).await?;
1082-
1083-
let block: i64 = row.get("block");
1084-
if block >= 0 {
1085-
Some(block as u64)
1086-
} else {
1087-
None
1088-
}
1060+
Self::max_block_for_table(&pool, table).await?
10891061
}
10901062
"construct_entity_reward_destination_changes" => {
1091-
let query = if use_max {
1063+
let row = sqlx::query(
10921064
r#"
10931065
SELECT GREATEST(
10941066
COALESCE((SELECT MAX(last_block) FROM asset_owners), -1),
10951067
COALESCE((SELECT MAX(last_block) FROM recipients), -1),
10961068
COALESCE((SELECT MAX(last_block) FROM mini_fanouts), -1)
10971069
)::bigint as block
1098-
"#
1099-
} else {
1100-
r#"
1101-
SELECT LEAST(
1102-
COALESCE((SELECT MIN(last_block) FROM asset_owners WHERE last_block >= 0), -1),
1103-
COALESCE((SELECT MIN(last_block) FROM recipients WHERE last_block >= 0), -1),
1104-
COALESCE((SELECT MIN(last_block) FROM mini_fanouts WHERE last_block >= 0), -1)
1105-
)::bigint as block
1106-
"#
1107-
};
1108-
let row = sqlx::query(query).fetch_one(&*pool).await?;
1109-
1110-
let block: i64 = row.get("block");
1111-
if block >= 0 {
1112-
Some(block as u64)
1113-
} else {
1114-
None
1115-
}
1116-
}
1117-
"construct_welcome_pack_ownership" | "construct_welcome_pack_reward_destination" => {
1118-
let query = if use_max {
1119-
r#"SELECT COALESCE((SELECT MAX(last_block) FROM welcome_packs), -1)::bigint as block"#
1120-
} else {
1121-
r#"SELECT COALESCE((SELECT MIN(last_block) FROM welcome_packs WHERE last_block >= 0), -1)::bigint as block"#
1122-
};
1123-
let row = sqlx::query(query).fetch_one(&*pool).await?;
1070+
"#,
1071+
)
1072+
.fetch_one(&*pool)
1073+
.await?;
11241074

11251075
let block: i64 = row.get("block");
11261076
if block >= 0 {
@@ -1135,15 +1085,36 @@ impl DatabaseClient {
11351085
}
11361086
};
11371087

1138-
let operation = if use_max { "Max" } else { "Min" };
11391088
debug!(
1140-
"{} last_block for query '{}': {:?}",
1141-
operation, query_name, block
1089+
"Max last_block for query '{}': {:?}",
1090+
query_name, block
11421091
);
11431092

11441093
Ok(block)
11451094
}
11461095

1096+
async fn max_block_for_table(pool: &PgPool, table: &str) -> Result<Option<u64>> {
1097+
let query =
1098+
format!(r#"SELECT COALESCE((SELECT MAX(last_block) FROM {table}), -1)::bigint as block"#);
1099+
let row = sqlx::query(&query).fetch_one(pool).await?;
1100+
let block: i64 = row.get("block");
1101+
Ok(if block >= 0 { Some(block as u64) } else { None })
1102+
}
1103+
1104+
async fn next_block_for_table(
1105+
pool: &PgPool,
1106+
table: &str,
1107+
after_block: u64,
1108+
) -> Result<Option<u64>> {
1109+
let query =
1110+
format!(r#"SELECT MIN(last_block)::bigint as block FROM {table} WHERE last_block > $1"#);
1111+
let row = sqlx::query(&query)
1112+
.bind(after_block as i64)
1113+
.fetch_one(pool)
1114+
.await?;
1115+
Ok(row.get::<Option<i64>, _>("block").map(|b| b as u64))
1116+
}
1117+
11471118
async fn get_next_data_block_after(
11481119
&self,
11491120
job: &PollingJob,
@@ -1187,17 +1158,14 @@ impl DatabaseClient {
11871158

11881159
row.get::<Option<i64>, _>("block").map(|b| b as u64)
11891160
}
1190-
"construct_entity_ownership_changes" => {
1191-
let query = r#"
1192-
SELECT MIN(last_block)::bigint as block FROM asset_owners WHERE last_block > $1
1193-
"#;
1194-
1195-
let row = sqlx::query(query)
1196-
.bind(after_block as i64)
1197-
.fetch_one(&*pool)
1198-
.await?;
1199-
1200-
row.get::<Option<i64>, _>("block").map(|b| b as u64)
1161+
"construct_entity_ownership_changes"
1162+
| "construct_welcome_pack_ownership"
1163+
| "construct_welcome_pack_reward_destination" => {
1164+
let table = match job.query_name.as_str() {
1165+
"construct_entity_ownership_changes" => "asset_owners",
1166+
_ => "welcome_packs",
1167+
};
1168+
Self::next_block_for_table(&pool, table, after_block).await?
12011169
}
12021170
"construct_entity_reward_destination_changes" => {
12031171
let query = r#"
@@ -1217,17 +1185,6 @@ impl DatabaseClient {
12171185

12181186
row.get::<Option<i64>, _>("block").map(|b| b as u64)
12191187
}
1220-
"construct_welcome_pack_ownership" | "construct_welcome_pack_reward_destination" => {
1221-
let query =
1222-
r#"SELECT MIN(last_block)::bigint as block FROM welcome_packs WHERE last_block > $1"#;
1223-
1224-
let row = sqlx::query(query)
1225-
.bind(after_block as i64)
1226-
.fetch_one(&*pool)
1227-
.await?;
1228-
1229-
row.get::<Option<i64>, _>("block").map(|b| b as u64)
1230-
}
12311188
_ => {
12321189
debug!(
12331190
"Unknown query name for next block lookup: {}",

utils/atomic-data-publisher/tests/common/mod.rs

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -177,36 +177,37 @@ pub async fn seed_mini_fanout(
177177

178178
// --- Query runners ---
179179

180-
pub async fn run_ownership_query(
180+
async fn run_block_range_query(
181181
pool: &PgPool,
182+
sql: &str,
182183
last_processed_block: i64,
183184
max_block: i64,
184185
) -> Vec<Value> {
185-
sqlx::query(OWNERSHIP_SQL)
186+
sqlx::query(sql)
186187
.bind(last_processed_block)
187188
.bind(max_block)
188189
.fetch_all(pool)
189190
.await
190-
.expect("run ownership query")
191+
.expect("run query")
191192
.into_iter()
192193
.map(|row| row.get::<Value, _>("atomic_data"))
193194
.collect()
194195
}
195196

197+
pub async fn run_ownership_query(
198+
pool: &PgPool,
199+
last_processed_block: i64,
200+
max_block: i64,
201+
) -> Vec<Value> {
202+
run_block_range_query(pool, OWNERSHIP_SQL, last_processed_block, max_block).await
203+
}
204+
196205
pub async fn run_reward_query(
197206
pool: &PgPool,
198207
last_processed_block: i64,
199208
max_block: i64,
200209
) -> Vec<Value> {
201-
sqlx::query(REWARD_SQL)
202-
.bind(last_processed_block)
203-
.bind(max_block)
204-
.fetch_all(pool)
205-
.await
206-
.expect("run reward query")
207-
.into_iter()
208-
.map(|row| row.get::<Value, _>("atomic_data"))
209-
.collect()
210+
run_block_range_query(pool, REWARD_SQL, last_processed_block, max_block).await
210211
}
211212

212213
pub async fn run_hotspot_query(
@@ -232,29 +233,13 @@ pub async fn run_wp_ownership_query(
232233
last_processed_block: i64,
233234
max_block: i64,
234235
) -> Vec<Value> {
235-
sqlx::query(WP_OWNERSHIP_SQL)
236-
.bind(last_processed_block)
237-
.bind(max_block)
238-
.fetch_all(pool)
239-
.await
240-
.expect("run wp ownership query")
241-
.into_iter()
242-
.map(|row| row.get::<Value, _>("atomic_data"))
243-
.collect()
236+
run_block_range_query(pool, WP_OWNERSHIP_SQL, last_processed_block, max_block).await
244237
}
245238

246239
pub async fn run_wp_reward_query(
247240
pool: &PgPool,
248241
last_processed_block: i64,
249242
max_block: i64,
250243
) -> Vec<Value> {
251-
sqlx::query(WP_REWARD_SQL)
252-
.bind(last_processed_block)
253-
.bind(max_block)
254-
.fetch_all(pool)
255-
.await
256-
.expect("run wp reward query")
257-
.into_iter()
258-
.map(|row| row.get::<Value, _>("atomic_data"))
259-
.collect()
244+
run_block_range_query(pool, WP_REWARD_SQL, last_processed_block, max_block).await
260245
}

0 commit comments

Comments
 (0)