Skip to content

Commit

Permalink
wip: sse
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonish committed Dec 30, 2024
1 parent 39241d2 commit 210bd62
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ chrono = { version = "0.4.38", default-features = false, features = ["std", "now
log = "0.4.21"
owo-colors = "4.0.0"
indexmap = "2.2.6"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
43 changes: 43 additions & 0 deletions src/server/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ use crate::server::ServerContext;
use crate::{elastic, queryparser};
use axum::extract::{Extension, Form, Path, State};
use axum::http::StatusCode;
use axum::response::sse::Event;
use axum::response::IntoResponse;
use axum::response::Response;
use axum::response::Sse;
use axum::routing::{get, post};
use axum::Json;
use futures::Stream;
use serde::Deserialize;
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::ops::Sub;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};

use self::genericquery::TimeRange;
Expand Down Expand Up @@ -69,6 +74,7 @@ pub(crate) fn router() -> axum::Router<Arc<ServerContext>> {
.route("/api/ja4db/:fingerprint", get(ja4db))
.route("/api/admin/update/ja4db", post(admin::update_ja4db))
.route("/api/find-dns", get(find_dns))
.route("/api/sse", get(sse_handler))
.nest("/api/1/stats", stats::router())
}

Expand Down Expand Up @@ -467,6 +473,43 @@ pub(crate) async fn events(
Ok(Json(results).into_response())
}

async fn sse_handler(
_session: SessionExtractor,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

tokio::spawn(async move {
let msg = json!({
"foo": "bar",
});
let event = Event::default().json_data(&msg);
if let Err(err) = tx.send(Ok(event.unwrap())) {
dbg!(err);
return;
}

tokio::time::sleep(Duration::from_secs(5)).await;

let event = Event::default().json_data(msg);
if let Err(err) = tx.send(Ok(event.unwrap())) {
dbg!(err);
return;
}


println!("End of SSE");

});

let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(1))
.text("keep-alive-text"),
)
}

async fn ja4db(
_session: SessionExtractor,
Extension(context): Extension<Arc<ServerContext>>,
Expand Down

0 comments on commit 210bd62

Please sign in to comment.