Skip to content

Commit

Permalink
feat: allow stream creation from ingestor in distributed deployments
Browse files Browse the repository at this point in the history
Co-authored-by: Akshat Agarwal <[email protected]>
  • Loading branch information
Anirudhxx and akshatagarwl committed Nov 2, 2024
1 parent 7217dd5 commit 2108a34
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 6 deletions.
59 changes: 59 additions & 0 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::metrics::prom_utils::Metrics;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::get_staging_metadata;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
Expand Down Expand Up @@ -841,3 +842,61 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {

Ok(())
}

pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), StreamError> {
let client = reqwest::Client::new();

let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| {
StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata"))
})?;
let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap());
let token = staging_metadata.querier_auth_token.unwrap();

if !check_liveness(&querier_endpoint).await {
log::warn!("Querier {} is not live", querier_endpoint);
return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live")));
}

let url = format!(
"{}{}/logstream/{}",
querier_endpoint,
base_path_without_preceding_slash(),
stream_name
);

let response = client
.put(&url)
.header(header::AUTHORIZATION, &token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward create stream request to querier: {}\n Error: {:?}",
&url,
err
);
StreamError::Network(err)
})?;

let status = response.status();

if !status.is_success() {
let response_text = response.text().await.map_err(|err| {
log::error!("Failed to read response text from querier: {}", &url);
StreamError::Network(err)
})?;

log::error!(
"Failed to forward create stream request to querier: {}\nResponse Returned: {:?}",
&url,
response_text
);

return Err(StreamError::Anyhow(anyhow::anyhow!(
"Request failed with status: {}",
status,
)));
}

Ok(())
}
16 changes: 11 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::event::{
error::EventError,
format::{self, EventFormat},
};
use crate::handlers::http::cluster::forward_create_stream_request;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
Expand Down Expand Up @@ -210,11 +211,16 @@ pub async fn create_stream_if_not_exists(
if !streams.contains(&LogStream {
name: stream_name.to_owned(),
}) {
log::error!("Stream {} not found", stream_name);
return Err(PostError::Invalid(anyhow::anyhow!(
"Stream `{}` not found. Please create it using the Query server.",
stream_name
)));
match forward_create_stream_request(stream_name).await {
Ok(()) => log::info!("Stream {} created", stream_name),
Err(e) => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Unable to create stream: {} using query server. Error: {}",
stream_name,
e.to_string(),
)))
}
};
}
metadata::STREAM_INFO
.upsert_stream_info(
Expand Down
4 changes: 4 additions & 0 deletions server/src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use tokio::sync::Mutex;

static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());

use crate::{
event,
Expand Down Expand Up @@ -77,6 +80,7 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

let _ = CREATE_STREAM_LOCK.lock().await;
let headers = create_update_stream(&req, &body, &stream_name).await?;
sync_streams_with_ingestors(headers, body, &stream_name).await?;

Expand Down
4 changes: 4 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub async fn run_metadata_migration(
let metadata = metadata_migration::v3_v4(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
Some("v4") => {
let metadata = metadata_migration::v4_v5(storage_metadata);
put_remote_metadata(&*object_store, &metadata).await?;
}
_ => (),
}
}
Expand Down
50 changes: 50 additions & 0 deletions server/src/migration/metadata_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use base64::Engine;
use rand::distributions::DistString;
use serde_json::{Map, Value as JsonValue};

Expand Down Expand Up @@ -148,6 +149,55 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue {
storage_metadata
}

// maybe rename
pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue {
let metadata = storage_metadata.as_object_mut().unwrap();
metadata.remove_entry("version");
metadata.insert("version".to_string(), JsonValue::String("v5".to_string()));

match metadata.get("server_mode") {
None => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
}
Some(JsonValue::String(mode)) => match mode.as_str() {
"Query" => {
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
"All" => {
metadata.insert(
"server_mode".to_string(),
JsonValue::String(CONFIG.parseable.mode.to_string()),
);
metadata.insert(
"querier_endpoint".to_string(),
JsonValue::String(CONFIG.parseable.address.clone()),
);
}
_ => (),
},
_ => (),
}

metadata.insert(
"querier_auth_token".to_string(),
JsonValue::String(format!(
"Basic {}",
base64::prelude::BASE64_STANDARD.encode(format!(
"{}:{}",
CONFIG.parseable.username, CONFIG.parseable.password
))
)),
);

storage_metadata
}

pub async fn migrate_ingester_metadata() -> anyhow::Result<Option<IngestorMetadata>> {
let imp = ingestor_metadata_path(None);
let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await {
Expand Down
3 changes: 2 additions & 1 deletion server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub use localfs::FSConfig;
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
pub use s3::S3Config;
pub use store_metadata::{
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
get_staging_metadata, put_remote_metadata, put_staging_metadata, resolve_parseable_metadata,
StorageMetadata,
};

// metadata file names in a Stream prefix
Expand Down
4 changes: 4 additions & 0 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct StorageMetadata {
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
#[serde(default)]
pub default_role: Option<String>,
pub querier_endpoint: Option<String>,
pub querier_auth_token: Option<String>,
}

impl StorageMetadata {
Expand All @@ -78,6 +80,8 @@ impl StorageMetadata {
streams: Vec::new(),
roles: HashMap::default(),
default_role: None,
querier_endpoint: None,
querier_auth_token: None,
}
}

Expand Down

0 comments on commit 2108a34

Please sign in to comment.