Skip to content

Commit

Permalink
updated logic for user sync from querier to ingestors
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable committed Sep 12, 2024
1 parent 185d8ff commit 11d6875
Showing 1 changed file with 31 additions and 24 deletions.
55 changes: 31 additions & 24 deletions server/src/handlers/http/rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,24 @@ pub async fn post_user(
let username = username.into_inner();

let mut generated_password = String::default();
let mut created_role: HashSet<String> = HashSet::new();
let mut metadata = get_metadata().await?;
if CONFIG.parseable.mode == Mode::Ingest {
let user: Option<ParseableUser> = body
.map(|body| serde_json::from_value(body.into_inner()))
.transpose()?;
let _ = storage::put_staging_metadata(&metadata);
created_role.clone_from(&user.unwrap().roles);
if let Some(body) = body {
let user: ParseableUser = serde_json::from_value(body.into_inner())?;
let _ = storage::put_staging_metadata(&metadata);
let created_role = user.roles.clone();
Users.put_user(user.clone());
Users.put_role(&username, created_role.clone());
}
} else {
validator::user_name(&username)?;
let roles: Option<HashSet<String>> = body
.map(|body| serde_json::from_value(body.into_inner()))
.transpose()?;
if roles.is_none() || roles.as_ref().unwrap().is_empty() {
let roles: HashSet<String> = if let Some(body) = body {
serde_json::from_value(body.into_inner())?
} else {
return Err(RBACError::RoleValidationError);
};

if roles.is_empty() {
return Err(RBACError::RoleValidationError);
}
let _ = UPDATE_LOCK.lock().await;
Expand All @@ -109,20 +113,19 @@ pub async fn post_user(
metadata.users.push(user.clone());

put_metadata(&metadata).await?;
let created_role = roles.clone();
Users.put_user(user.clone());

if CONFIG.parseable.mode == Mode::Query {
sync_user_creation_with_ingestors(user, &roles).await?;
}
if let Some(roles) = roles {
created_role.clone_from(&roles);
sync_user_creation_with_ingestors(user, &Some(roles)).await?;
}
put_role(
web::Path::<String>::from(username.clone()),
web::Json(created_role),
)
.await?;
}
let created_user = metadata
.users
.iter()
.find(|user| user.username() == username)
.unwrap();
Users.put_user(created_user.clone());
put_role(web::Path::<String>::from(username), web::Json(created_role)).await?;

Ok(generated_password)
}

Expand All @@ -148,6 +151,7 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
} else {
return Err(RBACError::UserDoesNotExist);
}
Users.change_password_hash(&username, &new_hash);
} else {
let _ = UPDATE_LOCK.lock().await;
let user::PassCode { password, hash } = user::Basic::gen_new_password();
Expand All @@ -167,11 +171,12 @@ pub async fn post_gen_password(username: web::Path<String>) -> Result<impl Respo
return Err(RBACError::UserDoesNotExist);
}
put_metadata(&metadata).await?;
Users.change_password_hash(&username, &new_hash);
if CONFIG.parseable.mode == Mode::Query {
sync_password_reset_with_ingestors(&username).await?;
}
}
Users.change_password_hash(&username, &new_hash);

Ok(new_password)
}

Expand Down Expand Up @@ -247,15 +252,17 @@ pub async fn put_role(

if CONFIG.parseable.mode == Mode::Ingest {
let _ = storage::put_staging_metadata(&metadata);
// update in mem table
Users.put_role(&username.clone(), role.clone());
} else {
put_metadata(&metadata).await?;
// update in mem table
Users.put_role(&username.clone(), role.clone());
if CONFIG.parseable.mode == Mode::Query {
sync_users_with_roles_with_ingestors(&username, &role).await?;
}
}

// update in mem table
Users.put_role(&username.clone(), role.clone());
Ok(format!("Roles updated successfully for {username}"))
}

Expand Down

0 comments on commit 11d6875

Please sign in to comment.