Skip to content

Commit 8968e75

Browse files
authored
Avoid block when query (#388)
* Avoid block when query * fmt * Add timeout to rpc channel * Add timeout to rpc channel * fmt
1 parent 3b3e3b9 commit 8968e75

File tree

4 files changed

+50
-29
lines changed

4 files changed

+50
-29
lines changed

apps/indexer-proxy/proxy/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "subql-indexer-proxy"
3-
version = "2.0.4"
3+
version = "2.0.5"
44
edition = "2021"
55

66
[dependencies]

apps/indexer-proxy/proxy/src/metrics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::time::Instant;
3030
use subql_indexer_utils::request::REQUEST_CLIENT;
3131
use sysinfo::{System, SystemExt};
3232
use tokio::sync::Mutex;
33+
use tokio::time::{sleep, Duration};
3334

3435
use crate::cli::COMMAND;
3536
use crate::primitives::METRICS_LOOP_TIME;
@@ -101,7 +102,7 @@ struct Labels {
101102
pub fn listen() {
102103
tokio::spawn(async {
103104
loop {
104-
tokio::time::sleep(std::time::Duration::from_secs(METRICS_LOOP_TIME)).await;
105+
sleep(Duration::from_secs(METRICS_LOOP_TIME)).await;
105106

106107
let lock = OWNER_COUNTER.lock().await;
107108
let current = lock.clone();

apps/indexer-proxy/proxy/src/p2p.rs

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use tdn::{
4040
},
4141
};
4242
use tokio::sync::{mpsc::Sender, RwLock};
43+
use tokio::time::{sleep, Duration};
4344

4445
use crate::{
4546
account::{get_indexer, indexer_healthy},
@@ -57,7 +58,9 @@ pub static P2P_SENDER: Lazy<RwLock<Vec<ChannelRpcSender>>> = Lazy::new(|| RwLock
5758
pub async fn send(method: &str, params: Vec<RpcParam>, gid: GroupId) {
5859
let senders = P2P_SENDER.read().await;
5960
if !senders.is_empty() {
60-
senders[0].send(rpc_request(0, method, params, gid)).await;
61+
senders[0]
62+
.send_timeout(rpc_request(0, method, params, gid), 100)
63+
.await;
6164
}
6265
}
6366

@@ -69,44 +72,53 @@ pub async fn stop_network() {
6972
debug!("RESTART NEW P2P NETWORK");
7073
senders[0].send(rpc_request(0, "p2p-stop", vec![], 0)).await;
7174
drop(senders);
72-
tokio::time::sleep(std::time::Duration::from_secs(P2P_RESTART_TIME)).await;
75+
sleep(Duration::from_secs(P2P_RESTART_TIME)).await;
7376
}
7477
}
7578

76-
pub async fn report_conflict(deployment: &str, channel: &str, conflict: i32, start: i64, end: i64) {
79+
pub async fn report_conflict(
80+
deployment: String,
81+
channel: String,
82+
conflict: i32,
83+
start: i64,
84+
end: i64,
85+
) {
7786
let senders = P2P_SENDER.read().await;
7887
if senders.is_empty() {
7988
warn!("NONE NETWORK WHEN REPORT CONFLICT");
8089
} else {
8190
senders[0]
82-
.send(rpc_request(
83-
0,
84-
"payg-report-conflict",
85-
vec![
86-
deployment.into(),
87-
channel.into(),
88-
conflict.into(),
89-
start.into(),
90-
end.into(),
91-
],
92-
0,
93-
))
91+
.send_timeout(
92+
rpc_request(
93+
0,
94+
"payg-report-conflict",
95+
vec![
96+
deployment.into(),
97+
channel.into(),
98+
conflict.into(),
99+
start.into(),
100+
end.into(),
101+
],
102+
0,
103+
),
104+
100,
105+
)
94106
.await;
95107
drop(senders);
96108
}
97109
}
98110

99111
async fn check_stable() {
100112
loop {
101-
tokio::time::sleep(std::time::Duration::from_secs(P2P_STABLE_TIME)).await;
113+
sleep(Duration::from_secs(P2P_STABLE_TIME)).await;
102114
let senders = P2P_SENDER.read().await;
103115
if senders.is_empty() {
104116
drop(senders);
105117
continue;
106118
} else {
107119
debug!("Check stable connections");
108120
senders[0]
109-
.send(rpc_request(0, "p2p-stable", vec![], 0))
121+
.send_timeout(rpc_request(0, "p2p-stable", vec![], 0), 100)
110122
.await;
111123
}
112124
drop(senders);
@@ -115,7 +127,7 @@ async fn check_stable() {
115127

116128
async fn report_metrics() {
117129
loop {
118-
tokio::time::sleep(std::time::Duration::from_secs(P2P_METRICS_TIME)).await;
130+
sleep(Duration::from_secs(P2P_METRICS_TIME)).await;
119131
let senders = P2P_SENDER.read().await;
120132
if senders.is_empty() {
121133
drop(senders);
@@ -132,15 +144,15 @@ async fn report_metrics() {
132144

133145
async fn report_status() {
134146
loop {
135-
tokio::time::sleep(std::time::Duration::from_secs(P2P_METRICS_STATUS_TIME)).await;
147+
sleep(Duration::from_secs(P2P_METRICS_STATUS_TIME)).await;
136148
let senders = P2P_SENDER.read().await;
137149
if senders.is_empty() {
138150
drop(senders);
139151
continue;
140152
} else {
141153
debug!("Report projects status");
142154
senders[0]
143-
.send(rpc_request(0, "project-report-status", vec![], 0))
155+
.send_timeout(rpc_request(0, "project-report-status", vec![], 0), 100)
144156
.await;
145157
}
146158
drop(senders);
@@ -149,15 +161,15 @@ async fn report_status() {
149161

150162
async fn broadcast_healthy() {
151163
loop {
152-
tokio::time::sleep(std::time::Duration::from_secs(P2P_BROADCAST_HEALTHY_TIME)).await;
164+
sleep(Duration::from_secs(P2P_BROADCAST_HEALTHY_TIME)).await;
153165
let senders = P2P_SENDER.read().await;
154166
if senders.is_empty() {
155167
drop(senders);
156168
continue;
157169
} else {
158170
debug!("Report projects healthy");
159171
senders[0]
160-
.send(rpc_request(0, "project-broadcast-healthy", vec![], 0))
172+
.send_timeout(rpc_request(0, "project-broadcast-healthy", vec![], 0), 100)
161173
.await;
162174
}
163175
drop(senders);

apps/indexer-proxy/proxy/src/payg.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ pub async fn query_state(
298298
state: &Value,
299299
network_type: MetricsNetwork,
300300
) -> Result<(Vec<u8>, String, String)> {
301+
debug!("Start handle query channel");
301302
let project = get_project(project_id).await?;
302303
let mut state = QueryState::from_json(state)?;
303304

@@ -308,6 +309,7 @@ pub async fn query_state(
308309

309310
// check channel state
310311
let (mut state_cache, keyname) = fetch_channel_cache(state.channel_id).await?;
312+
debug!("Got channel cache");
311313

312314
// check signer
313315
if !state_cache.signer.contains(&signer) {
@@ -351,22 +353,28 @@ pub async fn query_state(
351353
if local_next > remote_next + price {
352354
// mark conflict is happend
353355
let times = ((local_next - remote_next) / price).as_u32() as i32;
354-
let now = Utc::now().timestamp();
355356
if times <= 1 {
356-
state_cache.conflict = now;
357+
state_cache.conflict = Utc::now().timestamp();
357358
}
358-
let channel = format!("{:#x}", state.channel_id);
359-
report_conflict(&project.id, &channel, times, state_cache.conflict, now).await;
360359
}
361360

362361
if local_next > remote_next + price * conflict {
363362
warn!(
364363
"CONFLICT: local_next: {}, remote_next: {}, price: {}, conflict: {}",
365364
local_next, remote_next, price, conflict
366365
);
366+
367+
let project_id = project.id.clone();
368+
let channel = format!("{:#x}", state.channel_id);
369+
let times = conflict.as_u32() as i32;
370+
let start = state_cache.conflict;
371+
let end = Utc::now().timestamp();
372+
tokio::spawn(report_conflict(project_id, channel, times, start, end));
373+
367374
// overflow the conflict
368375
return Err(Error::PaygConflict(1050));
369376
}
377+
debug!("Verified channel, start query");
370378

371379
// query the data.
372380
let (data, signature) = project
@@ -418,7 +426,7 @@ pub async fn query_state(
418426
.map_err(|e| error!("{:?}", e));
419427
});
420428

421-
state.remote = state_cache.spent;
429+
state.remote = local_next;
422430
debug!("Handle query channel success");
423431
let state_bytes = serde_json::to_vec(&state.to_json()).unwrap_or(vec![]);
424432
let state_string = general_purpose::STANDARD.encode(&state_bytes);

0 commit comments

Comments
 (0)