Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ parquet = "54.0.0"
# Web server and HTTP-related
actix-cors = "0.7.0"
actix-web = { version = "4.9.0", features = ["rustls-0_22"] }
tikv-jemalloc-ctl = "0.6.0"
tikv-jemallocator = "0.6.0"
tikv-jemalloc-sys = "0.6.1"
actix-web-httpauth = "0.8"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
Expand Down
44 changes: 32 additions & 12 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use futures::stream::once;
use futures::{Stream, StreamExt, future};
use futures_util::Future;
use http::StatusCode;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
Expand All @@ -52,7 +51,7 @@ use crate::query::error::ExecuteError;
use crate::query::{CountsRequest, Query as LogicalQuery, execute};
use crate::query::{QUERY_SESSION, resolve_stream_names};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::response::{QueryResponse, force_memory_release};
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
Expand Down Expand Up @@ -241,9 +240,15 @@ async fn handle_non_streaming_query(
with_fields: query_request.fields,
}
.to_json()?;
Ok(HttpResponse::Ok()

let http_response = HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
.json(response);

// Force memory release after HTTP response is fully created
force_memory_release();

Ok(http_response)
}

/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON).
Expand Down Expand Up @@ -324,18 +329,26 @@ fn create_batch_processor(
) -> impl FnMut(Result<RecordBatch, QueryError>) -> Result<Bytes, actix_web::Error> {
move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
// Create response and immediately process to reduce memory retention
let query_response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.map_err(|e| {
};

let response = query_response.to_json().map_err(|e| {
error!("Failed to parse record batch into JSON: {}", e);
actix_web::error::ErrorInternalServerError(e)
})?;
Ok(Bytes::from(format!("{response}\n")))

// Convert to bytes and explicitly drop the response object
let bytes_result = Bytes::from(format!("{response}\n"));
drop(response); // Explicit cleanup

force_memory_release();

Ok(bytes_result)
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
Expand Down Expand Up @@ -380,12 +393,19 @@ pub async fn get_counts(
let (records, _) = get_records_and_fields(&query_request, &creds).await?;

if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
let records = json_records.into_iter().map(Value::Object).collect_vec();
// Use optimized JSON conversion with explicit memory management
let json_records = {
let converted = record_batches_to_json(&records)?;
drop(records); // Explicitly drop the original records early
converted
};

let processed_records: Vec<Value> =
json_records.into_iter().map(Value::Object).collect();

let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
"records": processed_records,
});

return Ok(web::Json(res));
Expand Down
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Registry, fmt};

// Use jemalloc as the global allocator
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
Expand Down
5 changes: 1 addition & 4 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore {
/// Delete an overview
async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> {
let path = RelativePathBuf::from_iter([stream, "overview"]);
Ok(self
.storage
.delete_object(&path)
.await?)
Ok(self.storage.delete_object(&path).await?)
}

/// This function fetches all the keystones from the underlying object store
Expand Down
81 changes: 70 additions & 11 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*
*/

use std::ffi::CString;
use std::sync::Mutex;
use std::time::{Duration, Instant};

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{Value, json};
use tracing::info;
use tracing::{debug, info, warn};

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
Expand All @@ -32,28 +35,84 @@ pub struct QueryResponse {
impl QueryResponse {
pub fn to_json(&self) -> Result<Value, QueryError> {
info!("{}", "Returning query results");
let mut json_records = record_batches_to_json(&self.records)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
// Process in batches to avoid massive allocations
const BATCH_SIZE: usize = 100; // Process 100 record batches at a time
let mut all_values = Vec::new();

for chunk in self.records.chunks(BATCH_SIZE) {
let mut json_records = record_batches_to_json(chunk)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
}
}
}
}

// Convert this batch to values and add to collection
let batch_values: Vec<Value> = json_records.into_iter().map(Value::Object).collect();
all_values.extend(batch_values);
}
let values = json_records.into_iter().map(Value::Object).collect_vec();

let response = if self.with_fields {
json!({
"fields": self.fields,
"records": values,
"records": all_values,
})
} else {
Value::Array(values)
Value::Array(all_values)
};

Ok(response)
}
}

impl Drop for QueryResponse {
fn drop(&mut self) {
force_memory_release();
}
}

// Rate-limited memory release with proper error handling
static LAST_PURGE: Mutex<Option<Instant>> = Mutex::new(None);
const PURGE_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour
pub fn force_memory_release() {
{
let mut last_purge = LAST_PURGE.lock().unwrap();
if let Some(last) = *last_purge {
if last.elapsed() < PURGE_INTERVAL {
return;
}
}
*last_purge = Some(Instant::now());
}

// Advance epoch to refresh statistics and trigger potential cleanup
if let Err(e) = tikv_jemalloc_ctl::epoch::mib().and_then(|mib| mib.advance()) {
warn!("Failed to advance jemalloc epoch: {:?}", e);
}

// Purge all arenas using MALLCTL_ARENAS_ALL
if let Ok(arena_purge) = CString::new("arena.4096.purge") {
unsafe {
let ret = tikv_jemalloc_sys::mallctl(
arena_purge.as_ptr(),
std::ptr::null_mut(), // oldp (not reading)
std::ptr::null_mut(), // oldlenp (not reading)
std::ptr::null_mut(), // newp (void operation)
0, // newlen (void operation)
);
if ret != 0 {
warn!("Arena purge failed with code: {}", ret);
} else {
debug!("Successfully purged all jemalloc arenas");
}
}
} else {
warn!("Failed to create CString for arena purge");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: purging via "arena.4096.purge" likely does nothing. Use MALLCTL_ARENAS_ALL or iterate narenas.

jemalloc expects arena.<i>.purge, where <i> can be the special constant MALLCTL_ARENAS_ALL to target all arenas. Hardcoding 4096 is not portable and typically returns ENOENT, so the purge won’t run. Either stringify the constant or iterate all arenas and purge each one.

Authoritative docs: use arena.<i>.purge with <i>=MALLCTL_ARENAS_ALL, or resolve a MIB and call mallctlbymib. (jemalloc.net)

Apply this safer approach (iterate arenas and flush current thread’s tcache first):

-    // Purge all arenas using MALLCTL_ARENAS_ALL
-    if let Ok(arena_purge) = CString::new("arena.4096.purge") {
-        unsafe {
-            let ret = tikv_jemalloc_sys::mallctl(
-                arena_purge.as_ptr(),
-                std::ptr::null_mut(), // oldp (not reading)
-                std::ptr::null_mut(), // oldlenp (not reading)
-                std::ptr::null_mut(), // newp (void operation)
-                0,                    // newlen (void operation)
-            );
-            if ret != 0 {
-                warn!("Arena purge failed with code: {}", ret);
-            } else {
-                debug!("Successfully purged all jemalloc arenas");
-            }
-        }
-    } else {
-        warn!("Failed to create CString for arena purge");
-    }
+    // Flush this thread's tcache to return cached objects to arenas
+    if let Ok(tcache_flush) = CString::new("thread.tcache.flush") {
+        unsafe {
+            let _ = tikv_jemalloc_sys::mallctl(
+                tcache_flush.as_ptr(),
+                std::ptr::null_mut(),
+                std::ptr::null_mut(),
+                std::ptr::null_mut(),
+                0,
+            );
+        }
+    }
+
+    // Purge each initialized arena
+    if let Ok(n) = tikv_jemalloc_ctl::arenas::narenas::read() {
+        for i in 0..n {
+            if let Ok(name) = CString::new(format!("arena.{i}.purge")) {
+                unsafe {
+                    let ret = tikv_jemalloc_sys::mallctl(
+                        name.as_ptr(),
+                        std::ptr::null_mut(),
+                        std::ptr::null_mut(),
+                        std::ptr::null_mut(),
+                        0,
+                    );
+                    if ret != 0 {
+                        warn!("Arena purge failed for index {i} with code: {ret}");
+                    }
+                }
+            }
+        }
+        debug!("Requested purge for {n} arenas");
+    } else {
+        warn!("Failed to read jemalloc arenas.narenas");
+    }

Notes:

  • thread.tcache.flush helps ensure cached objects are returned to arenas before purging. (jemalloc.net)
  • Iterating arenas::narenas is portable across jemalloc builds. (docs.rs)

Committable suggestion skipped: line range outside the PR's diff.

}
14 changes: 11 additions & 3 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ use crate::event::DEFAULT_TIMESTAMP_KEY;
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
// Early return for empty records to avoid unnecessary allocations
if records.is_empty() {
return Ok(Vec::new());
}

let buf = Vec::with_capacity(records.len() * 1024); // Pre-allocate with reasonable capacity
let mut writer = arrow_json::ArrayWriter::new(buf);
for record in records {
writer.write(record)?;
Expand All @@ -57,8 +62,11 @@ pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String,

let buf = writer.into_inner();

let json_rows: Vec<Map<String, Value>> =
serde_json::from_reader(buf.as_slice()).unwrap_or_default();
// Use a cursor to avoid extra allocations during parsing
let json_rows: Vec<Map<String, Value>> = {
let cursor = std::io::Cursor::new(buf);
serde_json::from_reader(cursor).unwrap_or_else(|_| Vec::with_capacity(0))
};

Ok(json_rows)
}
Expand Down
Loading