Skip to content

Commit

Permalink
feat: RBAC for distributed system
Browse files Browse the repository at this point in the history
sync the user with its role and password hash from querier to all live ingestors
sync from storage metadata when ingestor starts

sync -
1. user creation (with generated hash)
2. user deletion
3. role updates for a user
4. generate new password for a user
  • Loading branch information
nikhilsinhaparseable committed Aug 8, 2024
1 parent 713ef35 commit 7e43629
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 52 deletions.
220 changes: 217 additions & 3 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::handlers::http::logstream::error::StreamError;
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
Expand All @@ -39,13 +40,15 @@ use itertools::Itertools;
use relative_path::RelativePathBuf;
use serde::de::Error;
use serde_json::error::Error as SerdeError;
use serde_json::Value as JsonValue;
use serde_json::{to_vec, Value as JsonValue};
use url::Url;
type IngestorMetadataArr = Vec<IngestorMetadata>;

use self::utils::StorageStats;

use super::base_path_without_preceding_slash;
use super::rbac::RBACError;
use std::collections::HashSet;
use std::time::Duration;

use super::modal::IngestorMetadata;
Expand Down Expand Up @@ -94,7 +97,7 @@ pub async fn sync_cache_with_ingestors(
Ok(())
}

// forward the request to all ingestors to keep them in sync
// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
req: HttpRequest,
body: Bytes,
Expand Down Expand Up @@ -137,7 +140,218 @@ pub async fn sync_streams_with_ingestors(
log::error!(
"failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res
res.text().await
);
}
}

Ok(())
}

// forward the role update request to all ingestors to keep them in sync
pub async fn sync_users_with_roles_with_ingestors(
username: &String,
role: &HashSet<String>,
) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
let role = to_vec(&role.clone()).map_err(|err| {
log::error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
})?;
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}/role",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.body(role.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the delete user request to all ingestors to keep them in sync
pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.delete(url)
.header(header::AUTHORIZATION, &ingestor.token)
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the create user request to all ingestors to keep them in sync
pub async fn sync_user_creation_with_ingestors(
user: User,
role: &Option<HashSet<String>>,
) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;

let mut user = user.clone();

if let Some(role) = role {
user.roles.clone_from(role);
}
let username = user.username();
let client = reqwest::Client::new();

let user = to_vec(&user).map_err(|err| {
log::error!("Fatal: failed to serialize user: {:?}", err);
RBACError::SerdeError(err)
})?;

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.body(user.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

// forward the password reset request to all ingestors to keep them in sync
pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RBACError::Anyhow(err)
})?;
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/user/{}/generate-new-password",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
);

let res = client
.post(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RBACError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}
Expand Down
44 changes: 43 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
use crate::analytics;
use crate::banner;
use crate::handlers::airplane;
use crate::handlers::http;
use crate::handlers::http::ingest;
use crate::handlers::http::logstream;
use crate::handlers::http::middleware::DisAllowRootUser;
use crate::handlers::http::middleware::RouteExt;
use crate::localcache::LocalCacheManager;
use crate::metrics;
Expand Down Expand Up @@ -147,6 +149,7 @@ impl IngestServer {
.service(Server::get_about_factory())
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Self::get_user_webscope())
.service(Server::get_readiness_factory()),
)
.service(Server::get_ingest_otel_factory());
Expand All @@ -162,7 +165,46 @@ impl IngestServer {
),
)
}

// get the user webscope
fn get_user_webscope() -> Scope {
web::scope("/user")
.service(
web::resource("/{username}")
// PUT /user/{username} => Create a new user
.route(
web::post()
.to(http::rbac::post_user)
.authorize(Action::PutUser),
)
// DELETE /user/{username} => Delete a user
.route(
web::delete()
.to(http::rbac::delete_user)
.authorize(Action::DeleteUser),
)
.wrap(DisAllowRootUser),
)
.service(
web::resource("/{username}/role")
// PUT /user/{username}/roles => Put roles for user
.route(
web::put()
.to(http::rbac::put_role)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
)
.service(
web::resource("/{username}/generate-new-password")
// POST /user/{username}/generate-new-password => reset password for this user
.route(
web::post()
.to(http::rbac::post_gen_password)
.authorize(Action::PutUser)
.wrap(DisAllowRootUser),
),
)
}
fn logstream_api() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
Expand Down
Loading

0 comments on commit 7e43629

Please sign in to comment.