Conversation
…ations route support no auth optionally
…en streaming finished
Summary of ChangesHello @ilblackdragon, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the conversation experience by integrating real-time communication capabilities. It enables direct user-to-user chat within shared conversations and provides live updates, such as new messages and typing indicators, through WebSockets. The changes also lay the groundwork for better attribution of messages by tracking response authors and refine public sharing with dedicated tokens. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces significant new functionality by adding WebSocket support for real-time conversation updates, including a typing indicator and notifications for new messages. The implementation is well-structured with new modules for WebSocket management. However, I've identified a couple of critical regressions in the database layer where batch operations have been replaced with loops, impacting both performance and data consistency. There are also some medium-severity issues regarding error handling and maintainability that should be addressed.
| // Build a query to find groups where any of the member identifiers match | ||
| // We use a parameterized query with multiple (member_type, member_value) pairs | ||
| let mut conditions = Vec::new(); | ||
| let mut param_idx = 1; | ||
|
|
||
| for _ in member_identifiers { | ||
| conditions.push(format!( | ||
| "(m.member_type = ${} AND LOWER(m.member_value) = LOWER(${}))", | ||
| param_idx, | ||
| param_idx + 1 | ||
| )); | ||
| param_idx += 2; | ||
| } | ||
|
|
||
| // Build the params vector with references to the actual values | ||
| let member_types: Vec<String> = member_identifiers.iter().map(|m| m.kind.as_str().to_string()).collect(); | ||
| let member_values: Vec<String> = member_identifiers.iter().map(|m| m.value.clone()).collect(); | ||
|
|
||
| let mut typed_params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = Vec::new(); | ||
| for i in 0..member_identifiers.len() { | ||
| typed_params.push(&member_types[i]); | ||
| typed_params.push(&member_values[i]); | ||
| } | ||
|
|
||
| let query = format!( | ||
| "SELECT DISTINCT g.id, g.owner_user_id, g.name, g.created_at, g.updated_at | ||
| FROM conversation_share_groups g | ||
| JOIN conversation_share_group_members m ON g.id = m.group_id | ||
| WHERE {} | ||
| ORDER BY g.name", | ||
| conditions.join(" OR ") | ||
| ); | ||
|
|
||
| // Use parameterized query with UNNEST to safely match (type, value) pairs | ||
| // This avoids dynamic SQL construction while maintaining correct pairing semantics | ||
| let rows = client | ||
| .query( | ||
| "SELECT DISTINCT g.id, g.owner_user_id, g.name, g.created_at, g.updated_at | ||
| FROM conversation_share_groups g | ||
| JOIN conversation_share_group_members m ON g.id = m.group_id | ||
| JOIN UNNEST($1::text[], $2::text[]) AS search(member_type, member_value) | ||
| ON m.member_type = search.member_type | ||
| AND LOWER(m.member_value) = search.member_value | ||
| ORDER BY g.name", | ||
| &[&member_types, &member_values_lower], | ||
| ) | ||
| .query(&query, &typed_params) | ||
| .await | ||
| .map_err(|e| ConversationError::DatabaseError(e.to_string()))?; |
There was a problem hiding this comment.
The query to find groups for members has been changed from using UNNEST to dynamically building a WHERE ... OR ... clause. This approach has a few drawbacks:
- Performance: For a large number of
member_identifiers, this can be significantly slower than usingUNNEST. - Scalability: It may hit database limits on query length or the number of parameters.
I recommend reverting to the previous implementation that used UNNEST, as it is generally safer and more performant for this type of query in PostgreSQL.
// Use UNNEST to create pairs of (type, value) from arrays
// This is safer than dynamic SQL construction and maintains correct pair matching
// UNNEST with multiple arrays creates rows where elements at the same position are paired
let member_types: Vec<String> = member_identifiers
.iter()
.map(|m| m.kind.as_str().to_string())
.collect();
let member_values_lower: Vec<String> = member_identifiers
.iter()
.map(|m| m.value.to_lowercase())
.collect();
// Use parameterized query with UNNEST to safely match (type, value) pairs
// This avoids dynamic SQL construction while maintaining correct pairing semantics
let rows = client
.query(
"SELECT DISTINCT g.id, g.owner_user_id, g.name, g.created_at, g.updated_at
FROM conversation_share_groups g
JOIN conversation_share_group_members m ON g.id = m.group_id
JOIN UNNEST($1::text[], $2::text[]) AS search(member_type, member_value)
ON m.member_type = search.member_type
AND LOWER(m.member_value) = search.member_value
ORDER BY g.name",
&[&member_types, &member_values_lower],
)
.await
.map_err(|e| ConversationError::DatabaseError(e.to_string()))?;| ShareTarget::Direct(recipients) => { | ||
| // Use batch creation for atomicity - all shares succeed or all fail | ||
| let share_requests: Vec<NewConversationShare> = recipients | ||
| .into_iter() | ||
| .map(Self::normalize_recipient) | ||
| .map(|recipient| NewConversationShare { | ||
| conversation_id: conversation_id.to_string(), | ||
| owner_user_id, | ||
| share_type: ShareType::Direct, | ||
| permission, | ||
| recipient: Some(recipient), | ||
| group_id: None, | ||
| org_email_pattern: None, | ||
| }) | ||
| .collect(); | ||
|
|
||
| shares = self | ||
| .share_repository | ||
| .create_shares_batch(share_requests) | ||
| .await?; | ||
| for recipient in recipients.into_iter().map(Self::normalize_recipient) { | ||
| let share = self | ||
| .share_repository | ||
| .create_share(NewConversationShare { | ||
| conversation_id: conversation_id.to_string(), | ||
| owner_user_id, | ||
| share_type: ShareType::Direct, | ||
| permission, | ||
| recipient: Some(recipient), | ||
| group_id: None, | ||
| org_email_pattern: None, | ||
| public_token: None, | ||
| }) | ||
| .await?; | ||
| shares.push(share); | ||
| } | ||
| } |
There was a problem hiding this comment.
This logic replaces a batch share creation with a loop that creates shares one by one. This is a critical regression for two reasons:
- Loss of Atomicity: The operation is no longer atomic. If an error occurs midway through the loop (e.g., a database constraint violation on the second recipient), some shares will have been created while others have not, leaving the system in an inconsistent state. A batch operation should occur within a single transaction.
- Performance Degradation: Issuing multiple
INSERTstatements in a loop is much less efficient than a single batch operation, especially when sharing with many recipients.
Please reintroduce a batch creation mechanism for creating multiple shares to ensure data consistency and maintain performance.
| let user_profile = state.user_service.get_user_profile(user.user_id).await.ok(); | ||
| let user_name = user_profile.as_ref().and_then(|p| p.user.name.clone()); |
There was a problem hiding this comment.
The use of .ok() here silently discards any error that might occur when fetching the user profile. While the user's name is not critical for a typing indicator, logging the error would be valuable for debugging potential issues with the user service or database. This would help identify underlying problems without failing the entire operation.
| let user_profile = state.user_service.get_user_profile(user.user_id).await.ok(); | |
| let user_name = user_profile.as_ref().and_then(|p| p.user.name.clone()); | |
| let user_profile = match state.user_service.get_user_profile(user.user_id).await { | |
| Ok(profile) => Some(profile), | |
| Err(e) => { | |
| tracing::warn!("Failed to get user profile for typing indicator: {}", e); | |
| None | |
| } | |
| }; | |
| let user_name = user_profile.as_ref().and_then(|p| p.user.name.clone()); |
| if !token.starts_with("sess_") || token.len() != 37 { | ||
| return Err(crate::error::ApiError::invalid_token()); | ||
| } |
There was a problem hiding this comment.
The token prefix sess_ and length 37 are hardcoded here. This makes it harder to change the token format in the future and can lead to inconsistencies if the format is defined elsewhere. These values should be defined as constants for better maintainability and clarity.
| if !token.starts_with("sess_") || token.len() != 37 { | |
| return Err(crate::error::ApiError::invalid_token()); | |
| } | |
| const TOKEN_PREFIX: &str = "sess_"; | |
| const TOKEN_LEN: usize = 37; | |
| if !token.starts_with(TOKEN_PREFIX) || token.len() != TOKEN_LEN { | |
| return Err(crate::error::ApiError::invalid_token()); | |
| } |
|
See #139 |
Enables: