Skip to content

Commit

Permalink
general: more response alerts and reports with SSE
Browse files Browse the repository at this point in the history
SSE (server side events) allows us to stream partial results to the
client. This can really help with responsivement on large amounts of
data in SQLite.

For Elastic, we fake it by just sending a single SSE response as Elastic
is responsive enough on large datasets.

Plus a bunch of misc. changes across the board.
  • Loading branch information
jasonish committed Jan 8, 2025
1 parent 1d8ecc6 commit 4986fc0
Show file tree
Hide file tree
Showing 27 changed files with 768 additions and 282 deletions.
175 changes: 90 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ chrono = { version = "0.4.38", default-features = false, features = ["std", "now
log = "0.4.21"
owo-colors = "4.0.0"
indexmap = "2.2.6"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
1 change: 1 addition & 0 deletions src/elastic/eventrepo/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl ElasticEventRepo {
"http.hostname",
"quic.sni",
"tls.sni",
"host",
])
};

Expand Down
4 changes: 2 additions & 2 deletions src/elastic/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl ElasticEventRepo {
.await
}

pub(crate) async fn get_earliest_timestamp(&self) -> Result<Option<crate::datetime::DateTime>> {
pub(crate) async fn earliest_timestamp(&self) -> Result<Option<crate::datetime::DateTime>> {
#[rustfmt::skip]
let request = json!({
"query": {
Expand Down Expand Up @@ -662,7 +662,7 @@ impl ElasticEventRepo {
let bound_max = datetime::DateTime::now();
let bound_min = if let Some(timestamp) = qs.first_from() {
timestamp
} else if let Some(timestamp) = self.get_earliest_timestamp().await? {
} else if let Some(timestamp) = self.earliest_timestamp().await? {
debug!(
"No time-range provided by client, using earliest from database of {}",
&timestamp
Expand Down
7 changes: 7 additions & 0 deletions src/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,11 @@ impl EventRepo {
EventRepo::SQLite(ds) => Ok(ds.agg(field, size, order, query).await?),
}
}

pub(crate) async fn earliest_timestamp(&self) -> Result<Option<DateTime>> {
match self {
EventRepo::Elastic(repo) => repo.earliest_timestamp().await,
EventRepo::SQLite(repo) => repo.earliest_timestamp().await,
}
}
}
120 changes: 114 additions & 6 deletions src/server/api/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,37 @@

use super::util::parse_duration;
use crate::error::AppError;
use crate::eventrepo::EventRepo;
use crate::prelude::*;
use crate::queryparser;
use crate::queryparser::{QueryElement, QueryValue};
use crate::server::{main::SessionExtractor, ServerContext};
use axum::response::sse::Event;
use axum::response::Sse;
use axum::Extension;
use axum::{extract::State, response::IntoResponse, Form, Json};
use futures::Stream;
use serde::Deserialize;
use std::convert::Infallible;
use std::time::Duration;
use std::{ops::Sub, sync::Arc};

#[derive(Debug, Deserialize)]
pub(crate) struct AggParams {
/// Field name to group and return the counts for.
field: String,
pub(crate) field: String,
/// Humanized time range string.
#[serde(default = "default_time_range")]
time_range: String,
pub(crate) time_range: String,
/// Number of results to return.
#[serde(default = "default_size")]
size: usize,
pub(crate) size: usize,
/// Sort order, desc or asc.
#[serde(default = "default_order")]
order: String,
pub(crate) order: String,
/// Optional query string.
q: Option<String>,
tz_offset: Option<String>,
pub(crate) q: Option<String>,
pub(crate) tz_offset: Option<String>,
}

const fn default_size() -> usize {
Expand All @@ -40,6 +48,106 @@ fn default_order() -> String {
"desc".to_string()
}

pub(crate) async fn agg_sse(
_session: SessionExtractor,
Extension(context): Extension<Arc<ServerContext>>,
Form(form): Form<AggParams>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

// First parse the query string.
let default_tz_offset = form.tz_offset.as_deref();
let mut query_string = form
.q
.clone()
.map(|qs| queryparser::parse(&qs, default_tz_offset))
.transpose()
.unwrap()
.unwrap_or_default();

let min_timestamp = parse_duration(&form.time_range)
.map(|d| chrono::Utc::now().sub(d))
.map_err(|err| AppError::BadRequest(format!("time_range: {err}")))
.unwrap();
query_string.push(QueryElement {
negated: false,
value: QueryValue::From(min_timestamp.into()),
});

if let EventRepo::Elastic(ds) = &context.datastore {
// For Elastic we delay the SSE reponse until we have data
// ready so we can return a proper HTTP error if the query
// fails.
let result = ds
.agg(&form.field, form.size, &form.order, query_string.clone())
.await?;
let response = json!({
"rows": result,
"done": true,
});
let event = Event::default()
.json_data(response)
.map_err(|err| AppError::StringError(format!("{:?}", err)))?;
let _ = tx.send(Ok(event));

let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
return Ok(Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(1))
.text("keep-alive-text"),
));
}

tokio::spawn(async move {
match &context.datastore {
EventRepo::Elastic(_) => {
unreachable!();
}
EventRepo::SQLite(ds) => {
let (aggtx, mut aggrx) = tokio::sync::mpsc::unbounded_channel();

let tx0 = tx.clone();
let field = form.field.clone();
tokio::spawn(async move {
while let Some(result) = aggrx.recv().await {
if let Ok(event) = Event::default().json_data(result) {
if tx0.send(Ok(event)).is_err() {
debug!("Client disappeared, terminating SSE agg ({})", field);
return;
}
}
}
});

if let Err(err) = ds
.agg_stream(
&form.field,
form.size,
&form.order,
query_string,
Some(aggtx),
)
.await
{
let event = Event::default().comment(format!("error: {:?}", err));
let _ = tx.send(Ok(event));
}

let event = Event::default().comment("done");
let _ = tx.send(Ok(event));
}
}
});

let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);

Ok(Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(1))
.text("keep-alive-text"),
))
}

pub(crate) async fn agg(
_session: SessionExtractor,
State(context): State<Arc<ServerContext>>,
Expand Down
14 changes: 13 additions & 1 deletion src/server/api/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: MIT

use axum::extract::Extension;
use axum::http::header::HeaderMap;
use axum::http::header::SET_COOKIE;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Json;
Expand Down Expand Up @@ -92,8 +94,18 @@ pub(crate) async fn post(
error!("Failed to save session: {:?}", err);
}

let mut headers = HeaderMap::new();
if let Some(session_id) = &session.session_id {
let cookie = format!(
"x-evebox-session-id={}; Path=/; HttpOnly; Secure; SameSite=Strict; Max-Age={}",
session_id,
chrono::Duration::days(365).num_seconds()
);
headers.insert(SET_COOKIE, cookie.parse().unwrap());
}

(
StatusCode::OK,
headers,
Json(serde_json::json!({
"session_id": session.session_id,
})),
Expand Down
6 changes: 4 additions & 2 deletions src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::datetime::DateTime;
use crate::error::AppError;
use crate::eventrepo::EventQueryParams;
use crate::eventrepo::EventRepo;
use crate::prelude::*;
use crate::queryparser;
use crate::queryparser::{QueryElement, QueryValue};
use crate::server::api::genericquery::GenericQuery;
Expand All @@ -23,7 +24,6 @@ use std::collections::HashMap;
use std::ops::Sub;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, info, warn};

use self::genericquery::TimeRange;
use self::util::parse_duration;
Expand Down Expand Up @@ -74,6 +74,7 @@ pub(crate) fn router() -> axum::Router<Arc<ServerContext>> {
.route("/api/find-dns", get(find_dns))
.route("/api/events/count", get(count::count))
.route("/api/events/earliest-timestamp", get(earliest_timestamp))
.route("/sse/agg", get(agg::agg_sse))
.nest("/api/1/stats", stats::router())
}

Expand Down Expand Up @@ -105,6 +106,7 @@ pub(crate) async fn config(

pub(crate) async fn get_user(SessionExtractor(session): SessionExtractor) -> impl IntoResponse {
let user = json!({
"anonymous": session.session_id.is_none(),
"username": session.username,
});
Json(user)
Expand Down Expand Up @@ -141,7 +143,7 @@ pub(crate) async fn dhcp_ack(

#[rustfmt::skip]
let response = json!({
"events": response,
"events": response,
});

Ok(Json(response))
Expand Down
2 changes: 1 addition & 1 deletion src/server/api/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) async fn info(
let max_row_id = sqlite.max_row_id().await?;
let event_count_estimate = max_row_id - min_row_id;

let min_timestamp = sqlite.min_timestamp().await?;
let min_timestamp = sqlite.earliest_timestamp().await?;
let max_timestamp = sqlite.max_timestamp().await?;

let mut response = Response {
Expand Down
12 changes: 2 additions & 10 deletions src/server/api/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ pub(crate) async fn earliest_timestamp(
_session: SessionExtractor,
Extension(context): Extension<Arc<ServerContext>>,
) -> Result<impl IntoResponse, AppError> {
match &context.datastore {
EventRepo::Elastic(ds) => {
let ts = ds.get_earliest_timestamp().await?;
Ok(Json(ts).into_response())
}
EventRepo::SQLite(ds) => {
let ts = ds.min_timestamp().await?;
Ok(Json(ts).into_response())
}
}
let ts = context.datastore.earliest_timestamp().await?;
Ok(Json(ts))
}
18 changes: 4 additions & 14 deletions src/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,20 +555,10 @@ where
.unwrap();
let headers = &req.headers;

let get_session_id = || {
let header_session_id = headers
.get("x-evebox-session-id")
.and_then(|h| h.to_str().map(|s| s.to_string()).ok());
if header_session_id.is_some() {
return header_session_id;
}

CookieJar::from_headers(headers)
.get("x-evebox-session-id")
.map(|c| c.value().to_string())
};

let session_id = get_session_id();
let cookies = CookieJar::from_headers(headers);
let session_id = cookies
.get("x-evebox-session-id")
.map(|c| c.value().to_string());

let remote_user = headers
.get("remote_user")
Expand Down
3 changes: 1 addition & 2 deletions src/server/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ impl Session {
}

pub fn anonymous(username: Option<String>) -> Session {
let session_id = generate_session_id();
Session {
username,
session_id: Some(session_id),
session_id: None,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sqlite/eventrepo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl SqliteEventRepo {

/// Return the earliest/minimum timestamp found in the events
/// table.
pub async fn min_timestamp(&self) -> Result<Option<DateTime>> {
pub(crate) async fn earliest_timestamp(&self) -> Result<Option<DateTime>> {
let sql = "SELECT MIN(timestamp) FROM events";

if *LOG_QUERY_PLAN {
Expand Down
Loading

0 comments on commit 4986fc0

Please sign in to comment.