-
Notifications
You must be signed in to change notification settings - Fork 16
Feat/connection draining #104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Summary of ChangesHello @Eeshu-Yadav, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust connection draining and multi-version listener management system. The primary goal is to enable graceful handling of listener updates and shutdowns, preventing abrupt connection terminations. By allowing multiple listener configurations to coexist and transition smoothly, the system can maintain service availability during dynamic configuration changes, ensuring a more resilient and user-friendly experience. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant feature for connection draining. The implementation is extensive, touching configuration, listener management, and HTTP connection handling. While the feature is valuable, the current implementation has several critical and high-severity issues that need to be addressed before merging. These include flawed logic in listener updates, incorrect handling of draining listeners, and broken tests. I've also identified several areas with code duplication and opportunities for refactoring to improve maintainability.
@@ -0,0 +1,213 @@ | |||
// Copyright 2025 The kmesh Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new file appears to contain critically flawed logic for listener updates and draining.
- In
handle_lds_update
,listeners.insert
will overwrite the existing listener entry, immediately dropping itsJoinHandle
and aborting its task. This prevents any graceful draining of the old listener. AMultiMap
would be needed to manage multiple versions of a listener, similar to what's done inlisteners_manager.rs
. - In
remove_listener
,listeners.remove
is called before starting the drain timeout. This means the timeout task will never find the listener to abort it after the drain period.
This implementation seems to contradict the more robust versioning and draining logic being introduced in listeners_manager.rs
. If this file is intended for use, it needs a major rework. If it's experimental or dead code, it should be removed to avoid confusion.
|
||
result.map(|mut response| { | ||
// Apply drain signaling to response if draining is active | ||
manager.drain_signaling.apply_http1_drain_signal_sync(&mut response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to apply_http1_drain_signal_sync
is both redundant and incorrect. It's redundant because drain signaling is already handled within the to_response
implementation, which is called just before this. It's incorrect because apply_http1_drain_signal_sync
unconditionally adds the Connection: close
header without checking if draining is active, which can lead to prematurely closed connections. This line should be removed.
warn!( | ||
"Gracefully draining old listener {} version {} - monitored by background task", | ||
listener_name, version | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a Gradual
drain strategy, the listener task is not being aborted. This is a bug, as the listener will continue to accept new connections while it's supposed to be stopping. You should abort the listener's JoinHandle
here, just as you do for the Immediate
strategy, to prevent new connections from being accepted.
warn!(
"Gracefully draining old listener {} version {} - monitored by background task",
listener_name, version
);
listener_info.handle.abort();
if let Some(drain_handle) = listener_info.drain_manager_handle.as_ref() {
drain_handle.abort();
}
version_info: None, | ||
}; | ||
man.start_listener(l1, l1_info.clone()).unwrap(); | ||
man.start_listener(l1, l1_info.clone()).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function start_listener
is not async
, but it's being called with .await
. This will cause a compilation error. The same issue exists in test_config_change_drains_old_listener
(line 1537) and test_address_conflict_resolution_graceful
(line 1826). Please remove the .await
from these calls.
man.start_listener(l1, l1_info.clone()).unwrap();
fn should_drain(scenario: DrainScenario, drain_type: ConfigDrainType) -> bool { | ||
use ConfigDrainType::*; | ||
use DrainScenario::*; | ||
|
||
match (scenario, drain_type) { | ||
(_, Default) => true, | ||
(HealthCheckFail, ModifyOnly) => false, | ||
(ListenerUpdate, ModifyOnly) => true, | ||
(HotRestart, ModifyOnly) => true, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match &drain_state.protocol_behavior { | ||
super::listeners_manager::ProtocolDrainBehavior::Http1 { connection_close: true } => { | ||
use hyper::header::{HeaderValue, CONNECTION}; | ||
response.headers_mut().insert(CONNECTION, HeaderValue::from_static("close")); | ||
debug!("Applied 'Connection: close' header for HTTP/1.1 drain signaling"); | ||
}, | ||
super::listeners_manager::ProtocolDrainBehavior::Auto => { | ||
use hyper::header::{HeaderValue, CONNECTION}; | ||
response.headers_mut().insert(CONNECTION, HeaderValue::from_static("close")); | ||
debug!("Applied 'Connection: close' header for HTTP/1.1 drain signaling"); | ||
}, | ||
_ => { | ||
debug!("Skipping Connection: close header for non-HTTP/1.1 protocol"); | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The match arms for ProtocolDrainBehavior::Http1
and ProtocolDrainBehavior::Auto
contain identical code. You can combine them using a |
pattern to reduce code duplication and improve readability.
match &drain_state.protocol_behavior {
super::listeners_manager::ProtocolDrainBehavior::Http1 { connection_close: true } |
super::listeners_manager::ProtocolDrainBehavior::Auto => {
use hyper::header::{HeaderValue, CONNECTION};
response.headers_mut().insert(CONNECTION, HeaderValue::from_static("close"));
debug!("Applied 'Connection: close' header for HTTP/1.1 drain signaling");
},
_ => {
debug!("Skipping Connection: close header for non-HTTP/1.1 protocol");
},
}
Ok(context) | ||
} | ||
|
||
async fn monitor_drain_progress(&self, context: Arc<ListenerDrainContext>, listener_id: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The monitor_drain_progress
function takes listener_id: String
as a separate argument, but this information is already available within the context: Arc<ListenerDrainContext>
. To simplify the function signature and reduce redundancy, consider removing the listener_id
parameter and accessing it directly from the context (e.g., context.listener_id.clone()
). You will need to update the call site in initiate_listener_drain
as well.
async fn monitor_drain_progress(&self, context: Arc<ListenerDrainContext>) {
orion-lib/src/listeners/listener.rs
Outdated
drain_handler: Option<Arc<DefaultConnectionHandler>>, | ||
) -> Result<()> { | ||
let shard_id = std::thread::current().id(); | ||
let connection_id = format!("{}:{}:{}", local_address, peer_addr, start_instant.elapsed().as_nanos()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using start_instant.elapsed().as_nanos()
to generate a unique connection_id
is not robust. Under high load or on systems with low-resolution timers, this could lead to collisions. Consider using a more reliable method for generating unique IDs, such as an atomic counter or a UUID. Since uuid
is already a dependency, you could use uuid::Uuid::new_v4()
after adding use uuid;
.
let connection_id = format!("{}:{}:{}", local_address, peer_addr, start_instant.elapsed().as_nanos()); | |
let connection_id = format!("{}:{}:{}", local_address, peer_addr, uuid::Uuid::new_v4()); |
61acff6
to
db23221
Compare
} | ||
|
||
pub fn stop_listener(&mut self, listener_name: &str) -> Result<()> { | ||
if let Some(mut listeners) = self.listener_handles.remove(listener_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dawid-nowak here is logic is updated as per the sgguestion, that you referrring in the #99 . this will be merged after that , should i have improve the logic there as it will be merged after that .kindly tell if needed will modify in that branch , when removing the draining functionality that feedback is also correct during that time , so mistakenly this is here misplaced . kidnly let me know , if want the same logic otherisw this corrected as this pr is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just reviewed a small part, and does not see it handles well a listener with different protocols{contains different filters,both envoy.filters.network.http_connection_manager
and envoy.filters.network.tcp_proxy
}, and not see the draintimeout passed from HttpConnectionManager.DrainTimeout
Tcp { global_timeout: Duration }, | ||
Http { global_timeout: Duration, drain_timeout: Duration }, | ||
Immediate, | ||
Gradual, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does Gradual mean, does envoy have such strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Gradual strategy has been removed from the current implementation. Envoy uses protocol-specific draining strategies (HTTP with drain_timeout, TCP with global timeout) rather than a generic "Gradual" approach
} | ||
} | ||
|
||
pub fn with_timeouts(global_drain_timeout: Duration, default_http_drain_timeout: Duration) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if a listener handles both http and tcp protocol, how do you support them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now supported through the Mixed drain strategy.
pub enum DrainStrategy {
// ... other variants
Mixed {
global_timeout: Duration,
http_drain_timeout: Duration,
tcp_connections: bool,
http_connections: bool,
},
}
} | ||
} | ||
|
||
pub async fn start_listener_draining(&self, drain_state: ListenerDrainState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not seeing these methods called, is this supported in a following pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The methods are now integrated. The DrainSignalingManager provides:
initiate_listener_drain_from_filter_analysis() ,start_listener_draining(),stop_listener_draining()
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct ListenerDrainState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not quite understand what does ListenerDrainState mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ListenerDrainState represents the current state of a listener's draining process. It tracks:
started_at: When draining began
strategy: What drain strategy to use (HTTP/TCP/Mixed)
protocol_behavior: How to handle different protocols during draining
drain_scenario: What triggered the drain (health check fail, listener update, etc.)
drain_type: The configured drain behavior
let's get #102 merged and then we can start reviewing this one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements connection draining functionality for the Orion proxy to handle graceful listener updates and shutdowns. The implementation addresses issue #102 by adding comprehensive connection tracking and draining mechanisms.
- Implements graceful connection draining for listener updates and shutdowns
- Adds comprehensive connection state management with drain timeouts and strategies
- Introduces multi-version listener support with automatic cleanup policies
Reviewed Changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
orion-proxy/src/admin/config_dump.rs | Updates test fixtures to include new drain_type and drain_timeout fields |
orion-lib/src/listeners/mod.rs | Adds new drain_signaling and lds_update modules to module exports |
orion-lib/src/listeners/listeners_manager.rs | Major refactor implementing connection tracking, drain management, and multi-version listener support |
orion-lib/src/listeners/listener.rs | Integrates connection tracking and drain handling into listener processing |
orion-lib/src/listeners/lds_update.rs | New module for managing LDS updates with graceful draining |
orion-lib/src/listeners/http_connection_manager.rs | Extends HTTP connection manager with drain signaling and connection lifecycle management |
orion-lib/src/listeners/drain_signaling.rs | New core module implementing drain strategies and connection state management |
orion-lib/Cargo.toml | Adds dependencies for connection tracking (dashmap, multimap) |
orion-configuration/src/config/network_filters/http_connection_manager.rs | Adds drain_timeout configuration field |
orion-configuration/src/config/listener.rs | Adds drain_type and version_info fields for listener configuration |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
let drain_rate = drained_count as f64 / elapsed.as_secs_f64(); | ||
|
||
if drain_rate > 0.0 { | ||
let estimated_remaining_time = current_connections as f64 / drain_rate; | ||
return Some(Instant::now() + Duration::from_secs_f64(estimated_remaining_time)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Division by zero vulnerability if elapsed.as_secs_f64()
returns 0.0. This can occur when the elapsed time is very small (less than 1 second). Add a check to ensure the elapsed time is non-zero before division.
let drain_rate = drained_count as f64 / elapsed.as_secs_f64(); | |
if drain_rate > 0.0 { | |
let estimated_remaining_time = current_connections as f64 / drain_rate; | |
return Some(Instant::now() + Duration::from_secs_f64(estimated_remaining_time)); | |
let elapsed_secs = elapsed.as_secs_f64(); | |
if elapsed_secs > 0.0 { | |
let drain_rate = drained_count as f64 / elapsed_secs; | |
if drain_rate > 0.0 { | |
let estimated_remaining_time = current_connections as f64 / drain_rate; | |
return Some(Instant::now() + Duration::from_secs_f64(estimated_remaining_time)); | |
} |
Copilot uses AI. Check for mistakes.
assert!(third_version > 0); | ||
assert!(third_version > 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant assertions - the same condition third_version > 0
is checked three times. This appears to be a copy-paste error and should be reduced to a single assertion.
assert!(third_version > 0); | |
assert!(third_version > 0); |
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
sync::broadcast::{self}, | ||
}; | ||
use tracing::{debug, info, warn}; | ||
use uuid; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The uuid import should be more specific, e.g., use uuid::Uuid
instead of importing the entire crate. This makes dependencies clearer and can improve compile times.
use uuid; | |
use uuid::Uuid; |
Copilot uses AI. Check for mistakes.
orion-lib/src/listeners/listener.rs
Outdated
drain_handler: Option<Arc<DefaultConnectionHandler>>, | ||
) -> Result<()> { | ||
let shard_id = std::thread::current().id(); | ||
let connection_id = format!("{}:{}:{}", local_address, peer_addr, uuid::Uuid::new_v4()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Creating a UUID for every connection can be expensive under high load. Consider using a simpler connection ID scheme such as an atomic counter combined with the peer address, which would be more performant while still ensuring uniqueness.
Copilot uses AI. Check for mistakes.
let mut to_remove = Vec::new(); | ||
for (i, listener_info) in versions.iter().enumerate() { | ||
if listener_info.is_draining() { | ||
to_remove.push(i); | ||
} | ||
} | ||
|
||
for &index in to_remove.iter().rev() { | ||
if let Some(listener_info) = versions.get_mut(index) { | ||
listener_info.handle.abort(); | ||
info!("LDS: Draining version of listener '{}' forcibly closed after timeout", name); | ||
} | ||
} | ||
for &index in to_remove.iter().rev() { | ||
versions.remove(index); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Removing elements by index in a Vec is O(n) operation. Since multiple indices are being removed in reverse order, this could be inefficient. Consider using Vec::retain
with a predicate to remove multiple elements in a single pass.
let mut to_remove = Vec::new(); | |
for (i, listener_info) in versions.iter().enumerate() { | |
if listener_info.is_draining() { | |
to_remove.push(i); | |
} | |
} | |
for &index in to_remove.iter().rev() { | |
if let Some(listener_info) = versions.get_mut(index) { | |
listener_info.handle.abort(); | |
info!("LDS: Draining version of listener '{}' forcibly closed after timeout", name); | |
} | |
} | |
for &index in to_remove.iter().rev() { | |
versions.remove(index); | |
} | |
// Abort and log for draining listeners, then remove them in a single pass | |
versions.iter_mut() | |
.filter(|listener_info| listener_info.is_draining()) | |
.for_each(|listener_info| { | |
listener_info.handle.abort(); | |
info!("LDS: Draining version of listener '{}' forcibly closed after timeout", name); | |
}); | |
versions.retain(|listener_info| !listener_info.is_draining()); |
Copilot uses AI. Check for mistakes.
} | ||
|
||
pub fn remove_route(&self) { | ||
self.http_filters_per_route.swap(Arc::new(HashMap::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The empty HashMap creation could be extracted to a static or const to avoid repeated allocations. Consider creating a shared empty HashMap instance.
Copilot uses AI. Check for mistakes.
Now implemented. The code extracts the drain_timeout from the HttpConnectionManager configuration: MainFilter::Http(http_config) => {
has_http = true;
http_drain_timeout = http_config.drain_timeout; |
- Optimize start_listener to avoid second hashmap lookup - Simplify stop_listener using remove() directly instead of get_mut + drain + remove - Extract duplicated test code into create_test_listener_config helper function - Improve code readability and performance Signed-off-by: Eeshu-Yadav <[email protected]>
… versions Enhance the existing multiple listener versions approach with configurable cleanup policies and resource management, following Envoy's patterns for graceful listener updates. Key improvements: - Add ListenerManagerConfig with configurable cleanup policies - Implement automatic cleanup of old listener versions - Support CountBasedOnly, TimeBasedOnly, and Hybrid cleanup strategies - Prevent resource leaks while maintaining connection continuity - Add comprehensive test coverage for cleanup behaviors Benefits: - Graceful LDS-style updates without connection interruption - Long-running connections don't block listener deployments - Configurable behavior for different production environments - Automatic resource management prevents memory/handle leaks Configuration options: - max_versions_per_listener: limit concurrent versions (default: 2) - cleanup_policy: define cleanup strategy (default: CountBasedOnly(2)) - cleanup_interval: background cleanup frequency (default: 60s) Signed-off-by: Eeshu-Yadav <[email protected]>
Implement comprehensive improvements: - Replace HashMap<String, Vec<ListenerInfo>> with MultiMap for cleaner API - Move default config assignment inside ListenerManager::new() constructor - Change listener exit logging from warn to info level - Merge redundant version logging into single informative message - Fix unnecessary double cloning of listener_name - Use Vec::drain for efficient batch removal of old versions - Remove unimplemented TimeBasedOnly cleanup policy to avoid incorrect behavior - Remove redundant Hybrid cleanup policy, keeping only CountBasedOnly Signed-off-by: Eeshu-Yadav <[email protected]>
- Add drain_timeout field with 5000ms default (Envoy compliant) - Integrate with drain signaling and timeout enforcement - Support protocol-specific draining (HTTP/1.1, HTTP/2, TCP) - Add comprehensive test coverage for drain behaviors Signed-off-by: Eeshu-Yadav <[email protected]>
3250c02
to
99c2e88
Compare
…l support - Add ListenerProtocolConfig enum to handle HTTP, TCP, and mixed protocol listeners - Respect HttpConnectionManager.drain_timeout field from configuration - Support listeners with both http_connection_manager and tcp_proxy filters - Remove ambiguous 'Gradual' strategy, align with Envoy's draining behavior - Add initiate_listener_drain_from_filter_analysis() for proper integration Signed-off-by: Eeshu-Yadav <[email protected]>
99c2e88
to
2da8a93
Compare
@dawid-nowak kindly review this one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we need a design proposal FYI https://github.com/kubernetes/enhancements/
i don't understand like , for this functionality why
in the kubernetes repo , which link is given ? in other pr link of man kmesh repo , are referring to kmesh one or the kubernetes one ? kindly clarify please |
I just give you an reference, we can add the proposal in orion |
okk |
fixes : #102