Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: associate commit_schema to the appropriate objects #1225

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Mar 5, 2025

Fixes #XXXX.

Description

Simplify, improve readability


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Introduced improved stream schema merging to enhance data consistency.
  • Refactor
    • Streamlined distributed schema update processes across system components.
  • Bug Fixes
    • Enhanced error reporting with clearer notifications for missing streams and storage-related issues.

Copy link

coderabbitai bot commented Mar 5, 2025

Walkthrough

This pull request restructures schema management across several components. The changes remove direct schema commit calls in favor of methods on a central PARSEABLE object. The schema commitment logic has been moved into new methods on both the object storage and stream structures, consolidating functionality and updating error handling. Notably, error variants have been enhanced to capture specific conditions such as missing streams and issues with the Arrow library.

Changes

File Path Summary
src/handlers/http/query.rs Refactored update_schema_when_distributed: removed direct calls to commit_schema_to_storage and commit_schema; now uses PARSEABLE's methods to commit schemas.
src/storage/object_storage.rs Removed standalone commit_schema_to_storage function; introduced a new async instance method commit_schema that handles schema merging and storage updates.
src/event/mod.rs Removed commit_schema function; updated process to directly call the schema commit on streams; enhanced error handling with a new NotFound variant.
src/parseable/streams.rs Added a new public method commit_schema to the Stream struct that acquires a lock, merges the provided schema with existing metadata, and updates it.
src/storage/mod.rs Introduced a new error variant Arrow(#[from] arrow_schema::ArrowError) in ObjectStorageError to handle errors from the Arrow library.

Sequence Diagram(s)

sequenceDiagram
    participant Q as Query Handler
    participant PS as PARSEABLE
    participant OS as Object Storage
    participant S as Stream

    Q->>PS: Invoke schema commit via PARSEABLE.storage.get_object_store().commit_schema()
    OS->>OS: Merge and persist updated schema
    Q->>PS: Retrieve stream via PARSEABLE.get_stream()
    S->>S: Acquire lock, merge new schema with metadata
    S-->>PS: Return commit result
    PS-->>Q: Return overall status
Loading

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

I'm a little rabbit, code in my sight,
Hopping through functions from day into night.
Schemas now merge with a swift little leap,
In PARSEABLE's garden, clean code runs deep.
With each commit and every new byte,
I celebrate changes with a joyful delight!
🐇💻 Hop on and code right!

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@de-sh de-sh marked this pull request as ready for review March 6, 2025 05:59
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/mod.rs (1)

24-25: Removed schema functionality relocated to the Parseable struct

The import list has been simplified due to the removal of the fetch_schema function. This is part of the larger refactoring effort to encapsulate schema-related functionality within the PARSEABLE instance.

Consider adding a brief comment explaining that schema functionality has been moved to the Parseable struct, to help future developers understand this architectural decision.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8381e72 and 24ac1bb.

📒 Files selected for processing (7)
  • src/handlers/airplane.rs (2 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/handlers/http/mod.rs (1 hunks)
  • src/handlers/http/query.rs (1 hunks)
  • src/parseable/mod.rs (4 hunks)
  • src/prism/logstream/mod.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
🔇 Additional comments (9)
src/handlers/http/logstream.rs (1)

130-134: Refactor: Changed direct function call to method call on PARSEABLE

The code has been refactored to call update_schema_when_distributed as a method on the PARSEABLE singleton rather than as an imported standalone function. This change maintains the same functionality while improving code organization.

src/prism/logstream/mod.rs (1)

79-82: Consistent schema update approach implementation

The schema update call has been modified to use the PARSEABLE instance method rather than a standalone function, aligning with the pattern used in other handlers.

src/handlers/airplane.rs (2)

37-38: Removed direct import of the update_schema_when_distributed function

The import for update_schema_when_distributed from crate::handlers::http::query has been removed as part of the refactoring to use the method on the PARSEABLE singleton.


159-161: Refactored to call schema update method on PARSEABLE instance

The code now uses PARSEABLE.update_schema_when_distributed instead of the imported standalone function. This pattern improves encapsulation and makes the association between the operation and the object clearer.

src/handlers/http/query.rs (1)

86-86: Refactoring method call to use the PARSEABLE instance method.

The code now uses PARSEABLE.update_schema_when_distributed(&tables) instead of a standalone function call. This change is part of a broader refactoring to improve code organization by moving schema-related functionality to the Parseable class.

src/storage/object_storage.rs (2)

783-783: Function call updated to use the new instance method.

This line is updated to use the newly added commit_schema method instead of the previous standalone function. This change aligns with the refactoring effort to move schema-related functionality to appropriate object instances.


793-801: New method added to ObjectStorage trait.

The commit_schema method encapsulates the schema commit functionality within the ObjectStorage trait, improving cohesion. This is a good design choice as it localizes the schema update logic with the storage implementation.

The implementation correctly:

  1. Fetches the existing schema
  2. Merges it with the new schema
  3. Stores the result back to persistent storage
src/parseable/mod.rs (2)

780-796: New method to fetch and merge schemas from storage.

This method provides a clean way to fetch all schema files for a stream and merge them. The implementation is clear and well-documented.

Consider adding error logging when schema deserialization fails, since the expect call could cause a panic:

- .map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
+ .filter_map(|byte_obj| {
+     match serde_json::from_slice(byte_obj) {
+         Ok(schema) => Some(schema),
+         Err(err) => {
+             error!("Failed to parse schema: {}", err);
+             None
+         }
+     }
+ })

798-817: New method to update schema in distributed mode.

This well-implemented method replaces the previous standalone function, improving code organization by moving schema-related functionality to the Parseable class where it logically belongs.

The method:

  1. Correctly checks if running in Query mode
  2. Fetches the schema for each table
  3. Commits the schema to storage
  4. Updates the in-memory schema

The implementation is thread-safe and maintains the same behavior as the previous code while improving modularity.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 6, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

769-789: Improves code organization by centralizing schema distribution logic.

This new method consolidates schema update functionality that was previously located elsewhere (likely in HTTP handlers), making the code more maintainable and logically organized. The method properly:

  1. Checks for the correct mode (Query)
  2. Fetches schemas from storage
  3. Merges them with existing schemas
  4. Updates both storage and stream schemas

This refactoring improves testability and follows better separation of concerns.

Consider adding a comment explaining why this operation is specifically restricted to Query mode to improve future maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24ac1bb and bd3f0ae.

📒 Files selected for processing (4)
  • src/event/mod.rs (3 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (7)
src/parseable/mod.rs (1)

38-38: Appropriate import change to support new method.

The addition of EventError from the event module aligns with the new method's error handling approach.

src/parseable/streams.rs (1)

550-559: Well-implemented schema merging functionality.

The commit_schema method sensibly manages schema evolution by:

  1. Acquiring a write lock on metadata
  2. Creating a current schema from existing metadata
  3. Properly merging the incoming schema with the current one
  4. Updating the metadata with the merged schema

This approach ensures schemas evolve safely and consistently, preventing data loss when schema changes occur.

src/storage/object_storage.rs (3)

783-783: Properly updated to use the new commit_schema method.

The call site has been correctly updated to use the newly implemented method, ensuring consistency with the refactoring.


793-801: Well-designed schema commit method.

The implementation properly:

  1. Fetches the current schema for the stream
  2. Merges it with the provided schema
  3. Persists the combined schema back to storage

This encapsulation improves the code organization by containing schema management logic where it belongs.


803-827: Useful schema retrieval functionality.

The fetch_schemas method provides a clean way to retrieve all schemas associated with a stream, which supports the centralized schema management approach. The implementation correctly:

  1. Uses appropriate path construction
  2. Filters file names to include only schema files
  3. Deserializes each schema correctly
  4. Returns a vector of all schemas

This method is a good addition that completes the schema management refactoring.

src/event/mod.rs (2)

62-66: Simplified schema update during event processing.

The code now directly uses the new commit_schema method on the stream object rather than calling an external function. This change:

  1. Maintains the same functionality while reducing indirection
  2. Improves code clarity by making the operation more explicit
  3. Is consistent with the overall refactoring of schema management

This is a good simplification that maintains the same behavior.


114-131: Enhanced error handling with more specific error types.

The addition of:

  • Stream(#[from] StreamNotFound)
  • Arrow(#[from] arrow_schema::ArrowError)

to the EventError enum improves error handling by:

  1. Providing more specific error types that can be matched and handled appropriately
  2. Allowing for better error reporting and debugging
  3. Using appropriate trait derivation to automatically convert from the source errors

This change enhances the robustness of the error handling system.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 6, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/parseable/streams.rs (1)

541-551: Handle possible schema conflicts more gracefully.

The function uses Schema::try_merge, which can fail if there are incompatible field definitions. Currently, any failure is propagated as a StagingError, but we might consider providing more context or logging for easier debugging. Handling partial merges or reverting to an older schema (if merging fails) could also improve error resilience.

src/parseable/mod.rs (1)

832-854: Consider logging or partial rollback for errors in the loop.

When merging schemas and committing them in a loop, if an error occurs mid-iteration (e.g., in the second table), previously updated tables won't revert. Consider logging partial failures or implementing a rollback strategy to handle partial updates in distributed settings.

src/storage/object_storage.rs (2)

863-863: Report schema commit failures more descriptively.

If this call to self.commit_schema fails, it gets propagated but isn't distinguished from other storage errors in the loop. A specific log or error message clarifying which file/stream triggered the failure would ease troubleshooting.


883-907: Validate or log invalid schema files rather than failing early.

Currently, an invalid schema file immediately triggers an error. This may be intentional, but consider logging corrupted files and skipping them to allow processing valid schemas. Depending on your tolerance for partial data, this approach could improve availability in scenarios where only one of many schema files is corrupted.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bd3f0ae and ebfcc97.

📒 Files selected for processing (7)
  • src/handlers/airplane.rs (2 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/handlers/http/query.rs (1 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/prism/logstream/mod.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/prism/logstream/mod.rs
  • src/handlers/airplane.rs
  • src/handlers/http/logstream.rs
  • src/handlers/http/query.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/parseable/mod.rs (1)

44-47: Import changes look good.

No issues spotted. Importing EventError and LogSourceEntry here helps clarify error handling and data structures.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

874-882: ⚠️ Potential issue

Avoid potential runtime panic when merging schemas.

Using .unwrap() after Schema::try_merge(vec![schema, stream_schema]) can panic if a conflict occurs. Prefer handling or propagating the error properly, e.g., using ? to return an ObjectStorageError. This ensures the system remains robust if incompatible schemas appear.

-        let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
+        let new_schema = Schema::try_merge(vec![schema, stream_schema])?;
🧹 Nitpick comments (2)
src/parseable/mod.rs (1)

833-855: Well-structured implementation of the distributed schema update method.

The new update_schema_when_distributed method effectively centralizes schema update logic within the Parseable struct. The implementation properly checks preconditions, fetches and merges schemas, and updates both the storage and in-memory representations to maintain consistency.

Documentation comments explaining the method's purpose would be helpful for maintainability.

+    /// Updates schema by merging schemas stored by ingestors when running in Query mode
+    ///
+    /// This method fetches all schemas associated with each specified stream, 
+    /// merges them, and commits the merged schema both to storage and in-memory.
+    /// It only operates when in Query mode.
     pub async fn update_schema_when_distributed(
src/storage/object_storage.rs (1)

884-908: Good implementation of the schema fetching functionality.

The fetch_schemas method properly retrieves and parses all schema files for a given stream, allowing efficient schema management in distributed mode. The implementation follows good patterns with appropriate error handling and object filtering.

Consider adding a comment indicating what specific schema files it's looking for (e.g., ingestor-specific schemas or all schemas) to clarify its purpose.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ebfcc97 and 58b0db2.

📒 Files selected for processing (3)
  • src/parseable/mod.rs (2 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (1)
src/parseable/streams.rs (1)

541-551: Excellent addition of the schema commit method.

This new commit_schema method properly handles the merging of schemas and updates the stream metadata accordingly. The implementation follows good patterns by acquiring a write lock, merging schemas safely with error propagation, and properly replacing the existing schema fields.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 16, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 16, 2025
@de-sh de-sh changed the title refactor: maintain sensible associations to ensure readability/testability refactor: associate commit_schema to the appropriate objects Mar 24, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

594-604: Good refactoring: New method centralizes schema commitment logic.

This new method nicely encapsulates the schema merging and commitment logic that was previously scattered across the codebase. The implementation correctly:

  1. Acquires a write lock on metadata
  2. Creates a current schema from existing values
  3. Merges the provided schema with the current one
  4. Updates the metadata with the new merged schema

This refactoring improves code organization and makes schema updates more consistent throughout the application.

Consider a small robustness improvement: create a new HashMap for the schema fields before clearing the existing one, to avoid potential data loss if an error occurs between clearing and repopulating:

-        metadata.schema.clear();
-        metadata
-            .schema
-            .extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
+        let new_schema_map: HashMap<String, Arc<Field>> = 
+            schema.fields.iter().map(|f| (f.name().clone(), f.clone())).collect();
+        metadata.schema.clear();
+        metadata.schema.extend(new_schema_map);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e58c928 and 4e26ba5.

📒 Files selected for processing (5)
  • src/event/mod.rs (3 hunks)
  • src/handlers/http/query.rs (1 hunks)
  • src/parseable/streams.rs (2 hunks)
  • src/storage/mod.rs (1 hunks)
  • src/storage/object_storage.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/handlers/http/query.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/event/mod.rs
🔇 Additional comments (1)
src/parseable/streams.rs (1)

31-31: Appropriate import addition.

Adding the SchemaRef import is appropriate as it's needed for the new commit_schema method parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant