Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/target
/book
.DS_Store

# devenv
.devenv/
.devenv.flake.nix
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod pcap;
mod qmdl_store;
mod server;
mod stats;
mod time_correction;

use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -22,7 +23,8 @@ use crate::notifications::{NotificationService, run_notification_worker};
use crate::pcap::get_pcap;
use crate::qmdl_store::RecordingStore;
use crate::server::{
ServerState, debug_set_display_state, get_config, get_qmdl, get_zip, serve_static, set_config,
ServerState, debug_set_display_state, get_config, get_qmdl, get_zip, get_time_correction,
serve_static, set_config, sync_time_from_browser,
};
use crate::stats::{get_qmdl_manifest, get_system_stats};

Expand Down Expand Up @@ -68,6 +70,8 @@ fn get_router() -> AppRouter {
.route("/api/analysis/{name}", post(start_analysis))
.route("/api/config", get(get_config))
.route("/api/config", post(set_config))
.route("/api/time-correction", get(get_time_correction))
.route("/api/time-correction/sync", post(sync_time_from_browser))
.route("/api/debug/display-state", post(debug_set_display_state))
.route("/", get(|| async { Redirect::permanent("/index.html") }))
.route("/{*path}", get(serve_static))
Expand Down Expand Up @@ -287,6 +291,7 @@ async fn run_with_config(
analysis_sender: analysis_tx,
daemon_restart_token: restart_token.clone(),
ui_update_sender: Some(ui_update_tx),
time_correction: Arc::new(RwLock::new(time_correction::TimeCorrection::new())),
});
run_server(&task_tracker, state, shutdown_token.clone()).await;

Expand Down
14 changes: 12 additions & 2 deletions daemon/src/pcap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::http::header::CONTENT_TYPE;
use axum::response::{IntoResponse, Response};
use chrono::Duration;
use log::error;
use rayhunter::diag::DataType;
use rayhunter::gsmtap_parser;
Expand All @@ -23,6 +24,9 @@ pub async fn get_pcap(
Path(mut qmdl_name): Path<String>,
) -> Result<Response, (StatusCode, String)> {
let qmdl_store = state.qmdl_store_lock.read().await;
let time_correction = state.time_correction.read().await;
let offset_seconds = time_correction.offset_seconds();

if qmdl_name.ends_with("pcapng") {
qmdl_name = qmdl_name.trim_end_matches(".pcapng").to_string();
}
Expand All @@ -46,7 +50,7 @@ pub async fn get_pcap(
let (reader, writer) = duplex(1024);

tokio::spawn(async move {
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes).await {
if let Err(e) = generate_pcap_data(writer, qmdl_file, qmdl_size_bytes, offset_seconds).await {
error!("failed to generate PCAP: {e:?}");
}
});
Expand All @@ -60,6 +64,7 @@ pub async fn generate_pcap_data<R, W>(
writer: W,
qmdl_file: R,
qmdl_size_bytes: usize,
offset_seconds: i64,
) -> Result<(), Error>
where
W: AsyncWrite + Unpin + Send,
Expand All @@ -68,6 +73,8 @@ where
let mut pcap_writer = GsmtapPcapWriter::new(writer).await?;
pcap_writer.write_iface_header().await?;

let time_offset = Duration::seconds(offset_seconds);

let mut reader = QmdlReader::new(qmdl_file, Some(qmdl_size_bytes));
while let Some(container) = reader.get_next_messages_container().await? {
if container.data_type != DataType::UserSpace {
Expand All @@ -79,8 +86,11 @@ where
Ok(msg) => {
let maybe_gsmtap_msg = gsmtap_parser::parse(msg)?;
if let Some((timestamp, gsmtap_msg)) = maybe_gsmtap_msg {
// Apply time correction to the timestamp
let datetime = timestamp.to_datetime();
let corrected_datetime = datetime + time_offset;
pcap_writer
.write_gsmtap_message(gsmtap_msg, timestamp)
.write_gsmtap_message_with_datetime(gsmtap_msg, corrected_datetime)
.await?;
}
}
Expand Down
101 changes: 100 additions & 1 deletion daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use axum::http::header::{self, CONTENT_LENGTH, CONTENT_TYPE};
use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use log::{error, warn};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::fs::write;
use tokio::io::{AsyncReadExt, copy, duplex};
Expand All @@ -25,6 +26,7 @@ use crate::config::Config;
use crate::display::DisplayState;
use crate::pcap::generate_pcap_data;
use crate::qmdl_store::RecordingStore;
use crate::time_correction::{TimeCorrection, TimeCorrectionState};

pub struct ServerState {
pub config_path: String,
Expand All @@ -35,6 +37,7 @@ pub struct ServerState {
pub analysis_sender: Sender<AnalysisCtrlMessage>,
pub daemon_restart_token: CancellationToken,
pub ui_update_sender: Option<Sender<DisplayState>>,
pub time_correction: TimeCorrectionState,
}

pub async fn get_qmdl(
Expand Down Expand Up @@ -159,6 +162,10 @@ pub async fn get_zip(
};

let qmdl_store_lock = state.qmdl_store_lock.clone();
let offset_seconds = {
let time_correction = state.time_correction.read().await;
time_correction.offset_seconds()
};

let (reader, writer) = duplex(8192);

Expand Down Expand Up @@ -202,7 +209,7 @@ pub async fn get_zip(
};

if let Err(e) =
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes).await
generate_pcap_data(&mut entry_writer, qmdl_file_for_pcap, qmdl_size_bytes, offset_seconds).await
{
// if we fail to generate the PCAP file, we should still continue and give the
// user the QMDL.
Expand Down Expand Up @@ -250,6 +257,46 @@ pub async fn debug_set_display_state(
}
}

/// Get current time correction information
pub async fn get_time_correction(
State(state): State<Arc<ServerState>>,
) -> Json<TimeCorrection> {
let correction = state.time_correction.read().await;
Json(correction.clone())
}

/// Request to sync time from browser/client
#[derive(Debug, Deserialize)]
pub struct TimeSyncRequest {
/// Browser timestamp in milliseconds since Unix epoch
pub browser_timestamp_ms: i64,
}

/// Response for time sync operation
#[derive(Debug, Serialize)]
pub struct TimeSyncResponse {
pub success: bool,
pub offset_seconds: i64,
pub message: String,
}

/// Sync time correction from browser timestamp
pub async fn sync_time_from_browser(
State(state): State<Arc<ServerState>>,
Json(request): Json<TimeSyncRequest>,
) -> Json<TimeSyncResponse> {
let mut correction = state.time_correction.write().await;

correction.set_from_browser(request.browser_timestamp_ms);
let offset = correction.offset_seconds();

Json(TimeSyncResponse {
success: true,
offset_seconds: offset,
message: format!("Time synchronized. Offset: {} seconds", offset),
})
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -316,6 +363,7 @@ mod tests {
analysis_sender: analysis_tx,
daemon_restart_token: CancellationToken::new(),
ui_update_sender: None,
time_correction: Arc::new(RwLock::new(TimeCorrection::new())),
})
}

Expand Down Expand Up @@ -351,4 +399,55 @@ mod tests {
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
);
}

#[tokio::test]
async fn test_get_time_correction_default() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let state = create_test_server_state(store_lock);

let result = get_time_correction(State(state)).await;
let correction = result.0;

assert_eq!(correction.offset_seconds, 0);
assert!(correction.last_updated.is_none());
}

#[tokio::test]
async fn test_sync_time_from_browser() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let state = create_test_server_state(store_lock);

// Sync with a timestamp 1 hour in the future
let browser_timestamp_ms = (chrono::Utc::now() + chrono::Duration::hours(1)).timestamp_millis();
let request = TimeSyncRequest { browser_timestamp_ms };

let result = sync_time_from_browser(State(state.clone()), Json(request)).await;
let response = result.0;

assert!(response.success);
// Should be approximately 3600 seconds (1 hour)
assert!(response.offset_seconds >= 3595 && response.offset_seconds <= 3605);

// Verify the offset was persisted
let correction = state.time_correction.read().await;
assert_eq!(correction.offset_seconds(), response.offset_seconds);
assert!(correction.last_updated.is_some());
}

#[tokio::test]
async fn test_sync_time_negative_offset() {
let (_temp_dir, store_lock) = create_test_qmdl_store().await;
let state = create_test_server_state(store_lock);

// Sync with a timestamp 2 hours in the past
let browser_timestamp_ms = (chrono::Utc::now() - chrono::Duration::hours(2)).timestamp_millis();
let request = TimeSyncRequest { browser_timestamp_ms };

let result = sync_time_from_browser(State(state.clone()), Json(request)).await;
let response = result.0;

assert!(response.success);
// Should be approximately -7200 seconds (-2 hours)
assert!(response.offset_seconds > -7205 && response.offset_seconds < -7195);
}
}
Loading