-
Notifications
You must be signed in to change notification settings - Fork 436
refactor(ws, ws-utils): implement 7-phase refactoring plan #2112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Phase 1: Enhanced error types with context (Timeout, DataSend, ParseError) Phase 2.1: Extract ConnectionConfig, RetryConfig, KeepAliveConfig types Phase 3: Create SampleBuffer, SampleSource trait, AudioSamples enum Phase 4.1: Return SendTask handle from from_audio for proper shutdown Phase 4.3: Improve ConnectionManager with RwLock and async methods Phase 5.1: Fix unwrap() and add InvalidRequest error variant Phase 5.2: Simplify WebSocketIO trait with DecodeError Phase 6: Update tests for API changes Phase 7: Add module-level documentation Breaking changes: - WebSocketIO trait: from_message -> decode with Result<T, DecodeError> - from_audio return type now includes SendTask handle - ConnectionManager::acquire_connection is now async Co-Authored-By: yujonglee <[email protected]>
- Fix is_empty for Stereo to check both channels - Fix potential busy-wait when source returns empty vectors Co-Authored-By: yujonglee <[email protected]>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
✅ Deploy Preview for hyprnote-storybook ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Deploy Preview for hyprnote ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
📝 WalkthroughWalkthroughAdds audio conversion utilities and a buffered f32 sample stream to ws-utils; makes ConnectionManager async with an RwLock; introduces a client-side config module, DecodeError and SendTask, richer ws error variants; updates call sites and tests to new APIs and async acquisition. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer as Stream Consumer
participant Stream as BufferedAudioStream
participant Buffer as SampleBuffer
participant Source as SampleSource
Consumer->>Stream: poll_next(cx)
alt buffer has samples
Stream->>Buffer: next_sample()
Buffer-->>Stream: Some(f32)
Stream-->>Consumer: Ready(Some(f32))
else buffer empty
Stream->>Source: poll_samples(cx)
alt Ready(Some(samples))
Stream->>Buffer: push_samples(samples)
Stream->>Buffer: next_sample()
Buffer-->>Stream: Some(f32)
Stream-->>Consumer: Ready(Some(f32))
else Ready(None)
Stream-->>Consumer: Ready(None)
else Pending
Stream-->>Consumer: Pending
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
crates/ws-utils/src/audio.rs (1)
24-39: Consider handling odd-length input explicitly.The function silently discards trailing bytes if
data.len()is not a multiple of 2, and silently discards a trailing sample if the total sample count is odd. This may be intentional for audio data, but consider documenting this behavior or returning an error/warning for malformed input.crates/ws/src/client.rs (1)
206-211: Grace period sleep occurs unconditionally before close.The
close_grace_periodsleep happens even if the loop exited due to an error. For error cases, an immediate close might be more appropriate. Consider making the sleep conditional on successful completion.- tracing::debug!("draining remaining messages before close"); - tokio::time::sleep(close_grace_period).await; + // Only wait for grace period on clean exit (not errors) + if error_tx.is_empty() { // or track exit reason + tracing::debug!("draining remaining messages before close"); + tokio::time::sleep(close_grace_period).await; + } if let Err(e) = ws_sender.close().await { tracing::debug!("ws_close_failed: {:?}", e); }Note: The exact condition depends on how you want to track whether the exit was clean. This is a minor optimization.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
crates/ws-utils/src/audio.rs(1 hunks)crates/ws-utils/src/buffered_stream.rs(1 hunks)crates/ws-utils/src/lib.rs(1 hunks)crates/ws-utils/src/manager.rs(2 hunks)crates/ws/Cargo.toml(1 hunks)crates/ws/src/client.rs(9 hunks)crates/ws/src/config.rs(1 hunks)crates/ws/src/error.rs(1 hunks)crates/ws/src/lib.rs(1 hunks)crates/ws/tests/client_tests.rs(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
crates/ws/src/lib.rs (1)
crates/ws/tests/client_tests.rs (4)
client(103-103)client(120-120)client(174-174)client(221-221)
crates/ws/src/config.rs (2)
crates/ws-utils/src/buffered_stream.rs (1)
default(45-47)crates/ws-utils/src/manager.rs (1)
default(11-15)
crates/ws/src/error.rs (1)
crates/ws/src/client.rs (1)
tokio(138-138)
crates/ws-utils/src/lib.rs (1)
plugins/local-stt/src/ext.rs (1)
manager(449-449)
crates/ws-utils/src/audio.rs (1)
crates/audio-utils/src/lib.rs (2)
bytes_to_f32_samples(78-85)mix_audio_f32(91-100)
crates/ws/tests/client_tests.rs (1)
crates/ws/src/client.rs (1)
decode(70-70)
crates/ws/src/client.rs (3)
crates/ws/tests/client_tests.rs (3)
to_input(24-26)to_message(28-30)decode(32-39)crates/ws/src/config.rs (2)
default(12-18)default(28-33)crates/ws/src/error.rs (1)
timeout(32-37)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Redirect rules - hyprnote
- GitHub Check: Header rules - hyprnote
- GitHub Check: Pages changed - hyprnote
- GitHub Check: Devin
- GitHub Check: desktop_ci (linux, depot-ubuntu-24.04-8)
- GitHub Check: desktop_ci (macos, depot-macos-14)
- GitHub Check: desktop_ci (linux, depot-ubuntu-22.04-8)
- GitHub Check: fmt
🔇 Additional comments (25)
crates/ws/Cargo.toml (1)
14-14: LGTM!Moving
serde_jsonfrom dev-dependencies to regular dependencies is appropriate since it's now used in the main crate code forDecodeError::DeserializationError.crates/ws/src/lib.rs (1)
1-11: LGTM!Clear crate-level documentation and proper feature-gating for the new
configmodule alongside the existingclientmodule.crates/ws-utils/src/lib.rs (1)
1-12: LGTM!Clear crate documentation and well-organized module structure with appropriate re-exports for the new audio and buffered_stream modules.
crates/ws/tests/client_tests.rs (2)
32-39: LGTM!The
decodeimplementation correctly handlesTextmessages with proper error mapping viaDecodeError::DeserializationError, and returnsDecodeError::UnsupportedTypefor non-text message types.
103-103: LGTM!The tests correctly capture the new triple return
(output, handle, send_task)fromfrom_audio. The_send_taskis appropriately prefixed to indicate intentional non-use in these test scenarios.crates/ws-utils/src/audio.rs (2)
3-6: LGTM!Clean enum design with clear separation between mono and stereo audio representations.
8-22: LGTM!The
to_monomethod correctly leveragesmix_audio_f32for stereo-to-mono conversion, andis_emptyproperly checks both channels for stereo data.crates/ws/src/config.rs (3)
4-19: LGTM!Well-structured configuration with sensible defaults. The 8-second connect timeout and 5-second close grace period are reasonable for production WebSocket connections.
21-34: LGTM!The retry configuration with 5 attempts and 500ms delay provides a good balance between responsiveness and avoiding connection storms.
36-40: Intentionally noDefaultforKeepAliveConfig.This is appropriate since the
messagefield has no sensible default value and must be explicitly configured by the caller.crates/ws/src/error.rs (2)
3-29: LGTM!Well-designed error enum with clear, actionable messages. The
#[source]attribute onTimeoutproperly preserves the error chain, and structured variants likeDataSendprovide useful context for debugging.
31-50: LGTM!Clean helper constructors that reduce boilerplate at call sites while maintaining type safety with
impl Into<String>for flexible input.crates/ws-utils/src/manager.rs (4)
1-8: LGTM! Clean struct definition with appropriate synchronization primitive.Using
tokio::sync::RwLockis correct for async contexts. TheArc<RwLock<Option<CancellationToken>>>pattern allows shared ownership with async-safe interior mutability.
19-30: Proper connection acquisition with automatic cancellation of previous tokens.The logic correctly takes exclusive write access, cancels any existing token, and atomically replaces it with a new one. This ensures only one active connection at a time.
32-37: LGTM! Explicit cancellation method added.The
cancel_allmethod provides a clean way to cancel the current token without acquiring a new connection.
45-55: LGTM! Useful accessors added to ConnectionGuard.The
is_cancelled()andchild_token()methods extend the API surface appropriately, enabling synchronous cancellation checks and hierarchical token management.crates/ws-utils/src/buffered_stream.rs (3)
6-9: LGTM! Simple and effective buffer structure.The
SampleBufferstruct with position-based iteration is a clean approach for buffering audio samples.
33-36: Confirm that replacing the buffer (not appending) is intentional.
push_samplesreplaces the entire buffer contents rather than appending. This is appropriate for a streaming model where each poll returns a new chunk, but verify this matches the expected usage pattern ofSampleSourceimplementations.
97-105: Implementation is correct and trait-compatible.
BufferedAudioStreamproperly implementsStream<Item = f32>, so returningselffromas_stream()is valid. This follows the same pattern as otherkalosm_sound::AsyncSourceimplementations in the codebase (WebSocketAudioSource,ChannelAudioSource).crates/ws/src/client.rs (6)
35-52: LGTM! Clean SendTask wrapper with proper panic propagation.The
wait()method correctly handles:
- Normal completion
- Panic propagation via
resume_unwind- Cancellation mapping to
UnexpectedClose
54-61: LGTM! Well-structured DecodeError enum.The
DecodeErrorenum provides clear distinction between unsupported message types and deserialization failures, with properthiserrorderivation.
88-96: LGTM! Builder pattern extensions for configuration.The
with_configandwith_keep_alivemethods provide clean configuration APIs following the builder pattern.
141-212: Consider returning early on initial message failure instead of continuing to the drain phase.When the initial message fails (lines 143-151), the task returns an
Errbut still proceeds to the drain/close logic at lines 206-211 in the happy path. Since this is inside the spawned task, the early return skips the drain. This is actually correct behavior—verify this is intentional.The error handling pattern (check
error_tx.send().is_err()) is consistent throughout.
224-232: Decoding errors silently drop messages without yielding errors.When
T::decode(msg)returnsUnsupportedTypeorDeserializationError, the message is logged but not propagated to the output stream. If consumers need to know about decode failures, consider yielding them as errors.Verify this behavior is intentional—some use cases may want to surface decode errors to callers rather than silently dropping them.
270-280: LGTM! Robust connection handling with timeout and error mapping.The
try_connectmethod properly:
- Validates the request with clear error mapping
- Applies configurable connection timeout
- Maps timeout and connection errors to domain-specific error types
- Update ListenClientIO and ListenClientDualIO to implement decode instead of from_message - Update from_audio calls to handle new SendTask return value - Update acquire_connection call to be async Co-Authored-By: yujonglee <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
owhisper/owhisper-client/src/live.rs (1)
197-199: Clarify or retainSendTaskto make audio send lifecycle explicitThe new
from_audiotriple is destructured but theSendTaskparts (_send_task,_mic_send_task,_spk_send_task) are immediately dropped. IfSendTaskis a detached/background task that continues running after drop, this is fine but non-obvious; if drop cancels or aborts the send loop, this would prematurely stop audio transmission.I’d recommend either:
- Adding a short comment at these sites explaining that the task is intentionally dropped because it runs detached, or
- Storing the
SendTaskinsideSingleHandle/DualHandleso its lifetime clearly matches the streaming session.Please double‑check the ws client’s
SendTasksemantics and pick the appropriate option.Also applies to: 239-241, 278-279
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/transcribe-whisper-local/src/service/streaming.rs(1 hunks)owhisper/owhisper-client/src/live.rs(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
owhisper/owhisper-client/src/live.rs (2)
crates/ws/tests/client_tests.rs (5)
decode(32-39)client(103-103)client(120-120)client(174-174)client(221-221)crates/ws/src/client.rs (1)
decode(70-70)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Redirect rules - hyprnote
- GitHub Check: Header rules - hyprnote
- GitHub Check: Pages changed - hyprnote
- GitHub Check: fmt
- GitHub Check: desktop_ci (linux, depot-ubuntu-24.04-8)
- GitHub Check: desktop_ci (linux, depot-ubuntu-22.04-8)
- GitHub Check: desktop_ci (macos, depot-macos-14)
- GitHub Check: Devin
🔇 Additional comments (2)
crates/transcribe-whisper-local/src/service/streaming.rs (1)
116-116: Asyncacquire_connection().awaitusage looks correctCloning the
ConnectionManagerand awaitingacquire_connection()before wiringon_upgradeis consistent with the new async API and keeps the guard lifetime aligned with the websocket session; no additional changes needed here.owhisper/owhisper-client/src/live.rs (1)
136-141:WebSocketIO::decodeimplementations align with new DecodeError contractMapping
Message::TexttoStringand treating all other variants asDecodeError::UnsupportedTypeis consistent with the updatedWebSocketIO::decodesemantics and the example usage in the ws client tests; this looks good.Also applies to: 170-175
Returning Poll::Pending without registering a waker violates the async contract and can cause the stream to never wake up. Reverting to the original 'continue' behavior which loops and polls again immediately. Co-Authored-By: yujonglee <[email protected]>
- Document the contract that implementations must return non-empty samples - Add debug_assert to catch contract violations during development - Explain why empty samples are problematic (busy-loop risk) Co-Authored-By: yujonglee <[email protected]>
Remove AudioSamples, SampleBuffer, SampleSource, and BufferedAudioStream as they were not integrated with existing code. The deduplication of WebSocketAudioSource and ChannelAudioSource can be done in a follow-up PR. Co-Authored-By: yujonglee <[email protected]>
refactor(ws, ws-utils): implement 7-phase refactoring plan
Summary
This PR implements a comprehensive refactoring of the
wsandws-utilscrates based on a Claude Code analysis. Key changes include:Error Handling: Replaced generic
Unknownerror variant with structured types (Timeoutwith duration context,DataSendwith context,InvalidRequest,ParseError). Silentlet _ =error ignoring replaced with proper logging.Configuration: Extracted hardcoded values (5 retries, 500ms delay, 8s timeout, 5s grace period) into
ConnectionConfig,RetryConfig, andKeepAliveConfigtypes.API Changes (Breaking):
WebSocketIO::from_message(msg) -> Option<T>changed todecode(msg) -> Result<T, DecodeError>from_audionow returns(stream, handle, SendTask)instead of(stream, handle)for proper shutdown handlingConnectionManager::acquire_connectionis nowasync(usesRwLockinstead ofMutex)Updates since last revision
owhisper-clientto implementdecodemethod instead offrom_messageforListenClientIOandListenClientDualIOfrom_audiocall sites inowhisper-clientto handle the new 3-tuple return value (stream, handle, SendTask)transcribe-whisper-localto await the now-asyncacquire_connection()callAudioSamples,SampleBuffer,SampleSource,BufferedAudioStream) - these were scaffolding for future deduplication work but weren't integrated; will be done in a follow-up PRReview & Testing Checklist for Human
_send_taskhandling is intentional: TheSendTaskreturn values are being discarded with_prefix inowhisper-client. Confirm this is acceptable or if proper shutdown handling viasend_task.wait()should be added.Mutexto asyncRwLockshould be tested in real usage scenarios to ensure no deadlocks or behavior changes.from_audio,from_message, oracquire_connectionthat may need updating.Recommended test plan: Run
cargo test -p ws -p ws-utils, then manually test code paths that useConnectionManager::acquire_connectionor implementWebSocketIO(e.g., the STT streaming functionality).Notes
serde_jsonmoved from dev-dependencies to regular dependencies (required forDecodeError)Link to Devin run: https://app.devin.ai/sessions/151775d46b90483bb19252faa1e936fd
Requested by: yujonglee (@yujonglee)