diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 24e045ce6..108252af0 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError; use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; +use crate::rbac::user::User; use crate::stats::Stats; use crate::storage::object_storage::ingestor_metadata_path; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; @@ -39,13 +40,15 @@ use itertools::Itertools; use relative_path::RelativePathBuf; use serde::de::Error; use serde_json::error::Error as SerdeError; -use serde_json::Value as JsonValue; +use serde_json::{to_vec, Value as JsonValue}; use url::Url; type IngestorMetadataArr = Vec; use self::utils::StorageStats; use super::base_path_without_preceding_slash; +use super::rbac::RBACError; +use std::collections::HashSet; use std::time::Duration; use super::modal::IngestorMetadata; @@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors( Ok(()) } -// forward the request to all ingestors to keep them in sync +// forward the create/update stream request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( headers: HeaderMap, body: Bytes, @@ -142,7 +145,218 @@ pub async fn sync_streams_with_ingestors( log::error!( "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, - res + res.text().await + ); + } + } + + Ok(()) +} + +// forward the role update request to all ingestors to keep them in sync +pub async fn sync_users_with_roles_with_ingestors( + username: &String, + role: &HashSet, +) -> Result<(), RBACError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + RBACError::Anyhow(err) + })?; + + let client = reqwest::Client::new(); + let role = to_vec(&role.clone()).map_err(|err| { + log::error!("Fatal: failed to serialize role: {:?}", err); + RBACError::SerdeError(err) + })?; + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/user/{}/role", + ingestor.domain_name, + base_path_without_preceding_slash(), + username + ); + + let res = client + .put(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .body(role.clone()) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + } + + Ok(()) +} + +// forward the delete user request to all ingestors to keep them in sync +pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + RBACError::Anyhow(err) + })?; + + let client = reqwest::Client::new(); + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/user/{}", + ingestor.domain_name, + base_path_without_preceding_slash(), + username + ); + + let res = client + .delete(url) + .header(header::AUTHORIZATION, &ingestor.token) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + } + + Ok(()) +} + +// forward the create user request to all ingestors to keep them in sync +pub async fn sync_user_creation_with_ingestors( + user: User, + role: &Option>, +) -> Result<(), RBACError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + RBACError::Anyhow(err) + })?; + + let mut user = user.clone(); + + if let Some(role) = role { + user.roles.clone_from(role); + } + let username = user.username(); + let client = reqwest::Client::new(); + + let user = to_vec(&user).map_err(|err| { + log::error!("Fatal: failed to serialize user: {:?}", err); + RBACError::SerdeError(err) + })?; + + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/user/{}", + ingestor.domain_name, + base_path_without_preceding_slash(), + username + ); + + let res = client + .post(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .body(user.clone()) + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await + ); + } + } + + Ok(()) +} + +// forward the password reset request to all ingestors to keep them in sync +pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> { + let ingestor_infos = get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + RBACError::Anyhow(err) + })?; + let client = reqwest::Client::new(); + + for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } + let url = format!( + "{}{}/user/{}/generate-new-password", + ingestor.domain_name, + base_path_without_preceding_slash(), + username + ); + + let res = client + .post(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .map_err(|err| { + log::error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, + err + ); + RBACError::Network(err) + })?; + + if !res.status().is_success() { + log::error!( + "failed to forward request to ingestor: {}\nResponse Returned: {:?}", + ingestor.domain_name, + res.text().await ); } } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 6789819fc..aec5bd612 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,9 +18,11 @@ use crate::analytics; use crate::banner; use crate::handlers::airplane; +use crate::handlers::http; use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; +use crate::handlers::http::middleware::DisAllowRootUser; use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; use crate::metrics; @@ -182,6 +184,7 @@ impl IngestServer { .service(Server::get_about_factory()) .service(Self::analytics_factory()) .service(Server::get_liveness_factory()) + .service(Self::get_user_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) @@ -198,7 +201,46 @@ impl IngestServer { ), ) } - + // get the user webscope + fn get_user_webscope() -> Scope { + web::scope("/user") + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route( + web::post() + .to(http::rbac::post_user) + .authorize(Action::PutUser), + ) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(http::rbac::delete_user) + .authorize(Action::DeleteUser), + ) + .wrap(DisAllowRootUser), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(http::rbac::put_role) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ), + ) + .service( + web::resource("/{username}/generate-new-password") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(http::rbac::post_gen_password) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) + } fn logstream_api() -> Scope { web::scope("/logstream").service( web::scope("/{logstream}") diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index db435b234..3c73b42c1 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -19,8 +19,14 @@ use std::collections::{HashMap, HashSet}; use crate::{ - option::CONFIG, - rbac::{map::roles, role::model::DefaultPrivilege, user, Users}, + handlers::http::cluster::sync_users_with_roles_with_ingestors, + option::{Mode, CONFIG}, + rbac::{ + map::roles, + role::model::DefaultPrivilege, + user::{self, User as ParseableUser}, + Users, + }, storage::{self, ObjectStorageError, StorageMetadata}, validator::{self, error::UsernameValidationError}, }; @@ -28,6 +34,11 @@ use actix_web::{http::header::ContentType, web, Responder}; use http::StatusCode; use tokio::sync::Mutex; +use super::cluster::{ + sync_password_reset_with_ingestors, sync_user_creation_with_ingestors, + sync_user_deletion_with_ingestors, +}; + // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); @@ -64,62 +75,109 @@ pub async fn post_user( body: Option>, ) -> Result { let username = username.into_inner(); - let roles: Option> = body - .map(|body| serde_json::from_value(body.into_inner())) - .transpose()?; - if roles.is_none() || roles.as_ref().unwrap().is_empty() { - return Err(RBACError::RoleValidationError); - } - validator::user_name(&username)?; - let _ = UPDATE_LOCK.lock().await; - if Users.contains(&username) { - return Err(RBACError::UserExists); - } + + let mut generated_password = String::default(); let mut metadata = get_metadata().await?; - if metadata - .users - .iter() - .any(|user| user.username() == username) - { - // should be unreachable given state is always consistent - return Err(RBACError::UserExists); - } - let (user, password) = user::User::new_basic(username.clone()); - metadata.users.push(user.clone()); - put_metadata(&metadata).await?; - // set this user to user map - Users.put_user(user); - - if let Some(roles) = roles { - put_role(web::Path::::from(username), web::Json(roles)).await?; + if CONFIG.parseable.mode == Mode::Ingest { + if let Some(body) = body { + let user: ParseableUser = serde_json::from_value(body.into_inner())?; + let _ = storage::put_staging_metadata(&metadata); + let created_role = user.roles.clone(); + Users.put_user(user.clone()); + Users.put_role(&username, created_role.clone()); + } + } else { + validator::user_name(&username)?; + let roles: HashSet = if let Some(body) = body { + serde_json::from_value(body.into_inner())? + } else { + return Err(RBACError::RoleValidationError); + }; + + if roles.is_empty() { + return Err(RBACError::RoleValidationError); + } + let _ = UPDATE_LOCK.lock().await; + if Users.contains(&username) + || metadata + .users + .iter() + .any(|user| user.username() == username) + { + return Err(RBACError::UserExists); + } + + let (user, password) = user::User::new_basic(username.clone()); + + generated_password = password; + metadata.users.push(user.clone()); + + put_metadata(&metadata).await?; + let created_role = roles.clone(); + Users.put_user(user.clone()); + + if CONFIG.parseable.mode == Mode::Query { + sync_user_creation_with_ingestors(user, &Some(roles)).await?; + } + put_role( + web::Path::::from(username.clone()), + web::Json(created_role), + ) + .await?; } - Ok(password) + Ok(generated_password) } // Handler for POST /api/v1/user/{username}/generate-new-password // Resets password for the user to a newly generated one and returns it pub async fn post_gen_password(username: web::Path) -> Result { let username = username.into_inner(); - let _ = UPDATE_LOCK.lock().await; - let user::PassCode { password, hash } = user::Basic::gen_new_password(); + let mut new_password = String::default(); + let mut new_hash = String::default(); let mut metadata = get_metadata().await?; - if let Some(user) = metadata - .users - .iter_mut() - .filter_map(|user| match user.ty { - user::UserType::Native(ref mut user) => Some(user), - _ => None, - }) - .find(|user| user.username == username) - { - user.password_hash.clone_from(&hash); + if CONFIG.parseable.mode == Mode::Ingest { + let _ = storage::put_staging_metadata(&metadata); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + new_hash.clone_from(&user.password_hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + Users.change_password_hash(&username, &new_hash); } else { - return Err(RBACError::UserDoesNotExist); + let _ = UPDATE_LOCK.lock().await; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + new_password.clone_from(&password); + new_hash.clone_from(&hash); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata).await?; + Users.change_password_hash(&username, &new_hash); + if CONFIG.parseable.mode == Mode::Query { + sync_password_reset_with_ingestors(&username).await?; + } } - put_metadata(&metadata).await?; - Users.change_password_hash(&username, &hash); - Ok(password) + + Ok(new_password) } // Handler for GET /api/v1/user/{username}/role @@ -152,7 +210,16 @@ pub async fn delete_user(username: web::Path) -> Result StatusCode::BAD_REQUEST, Self::ValidationError(_) => StatusCode::BAD_REQUEST, Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Network(_) => StatusCode::BAD_GATEWAY, + Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::RoleValidationError => StatusCode::BAD_REQUEST, } } diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index e6393b27b..acdbeb8e4 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -163,7 +163,7 @@ pub async fn resolve_parseable_metadata( // update the server mode for local metadata metadata.server_mode = CONFIG.parseable.mode.to_string(); metadata.staging = CONFIG.staging_dir().to_path_buf(); - }, + }, } Ok(metadata) } @@ -272,12 +272,15 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec } pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { + let mut staging_metadata = meta.clone(); + staging_metadata.server_mode = CONFIG.parseable.mode.to_string(); + staging_metadata.staging = CONFIG.staging_dir().to_path_buf(); let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(path)?; - serde_json::to_writer(&mut file, meta)?; + serde_json::to_writer(&mut file, &staging_metadata)?; Ok(()) }