-
Notifications
You must be signed in to change notification settings - Fork 16
feat: allow multiple versions of the same listener #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow multiple versions of the same listener #99
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Eeshu-Yadav, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant enhancement to the listener management system, allowing the application to run multiple versions of the same listener concurrently. This change is crucial for enabling graceful updates and configuration changes without interrupting ongoing connections, improving the overall stability and reliability of the system. The core modification involves restructuring how listeners are stored and managed, incorporating version tracking, and updating the start and stop mechanisms accordingly.
Highlights
- Data Structure Change: The internal storage for listener handles in
ListenersManager
has been changed from aBTreeMap
to aHashMap<String, Vec<ListenerInfo>>
. This allows multipleListenerInfo
entries to be associated with a single listener name. - Version Tracking: A
version
field has been added to theListenerInfo
struct, and aversion_counter
toListenersManager
, enabling the tracking of different active versions of the same listener. - Graceful Listener Updates: The
start_listener
function now creates a new version of a listener instead of stopping and replacing an existing one. This prevents breaking active connections during listener configuration updates. - Comprehensive Listener Shutdown: The
stop_listener
function has been updated to correctly handle and abort all active versions of a specified listener, ensuring a complete shutdown. - Enhanced Test Coverage: Existing tests have been updated, and a new test case (
start_multiple_listener_versions
) has been added to thoroughly verify the new multi-version listener functionality.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly implements support for multiple versions of the same listener, which is a great feature for enabling zero-downtime updates. The changes to the data structures and logic in ListenersManager
are well-implemented, and the tests are updated accordingly. I have a few suggestions to improve code efficiency and maintainability in start_listener
, stop_listener
, and one of the new tests. Overall, this is a solid contribution.
self.listener_handles.entry(listener_name.clone()).or_insert_with(Vec::new).push(listener_info); | ||
|
||
info!("Listener {} now has {} active version(s)", listener_name, self.listener_handles.get(&listener_name).unwrap().len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be made more efficient and safer by avoiding a second hashmap lookup and an unwrap()
. You can store the mutable reference returned by entry()
and reuse it to get the length of the vector.
self.listener_handles.entry(listener_name.clone()).or_insert_with(Vec::new).push(listener_info); | |
info!("Listener {} now has {} active version(s)", listener_name, self.listener_handles.get(&listener_name).unwrap().len()); | |
let versions = self.listener_handles.entry(listener_name.clone()).or_default(); | |
versions.push(listener_info); | |
info!("Listener {} now has {} active version(s)", listener_name, versions.len()); |
if let Some(listeners) = self.listener_handles.get_mut(listener_name) { | ||
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name); | ||
for listener_info in listeners.drain(..) { | ||
info!("Stopping listener {} version {}", listener_name, listener_info.version); | ||
listener_info.handle.abort(); | ||
} | ||
self.listener_handles.remove(listener_name); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation of stop_listener
can be simplified and made more idiomatic. Instead of using get_mut
, drain
, and then remove
, you can use self.listener_handles.remove(listener_name)
directly. This atomically removes the entry and returns the Vec
of listeners, which you can then iterate over.
if let Some(listeners) = self.listener_handles.get_mut(listener_name) { | |
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name); | |
for listener_info in listeners.drain(..) { | |
info!("Stopping listener {} version {}", listener_name, listener_info.version); | |
listener_info.handle.abort(); | |
} | |
self.listener_handles.remove(listener_name); | |
} else { | |
if let Some(listeners) = self.listener_handles.remove(listener_name) { | |
info!("Stopping all {} version(s) of listener {}", listeners.len(), listener_name); | |
for listener_info in listeners { | |
info!("Stopping listener {} version {}", listener_name, listener_info.version); | |
listener_info.handle.abort(); | |
} | |
} else { |
async fn start_multiple_listener_versions() { | ||
let chan = 10; | ||
let name = "multi-version-listener"; | ||
|
||
let (_conf_tx, conf_rx) = mpsc::channel(chan); | ||
let (_route_tx, route_rx) = mpsc::channel(chan); | ||
let mut man = ListenersManager::new(conf_rx, route_rx); | ||
|
||
// Start first version | ||
let (routeb_tx1, routeb_rx) = broadcast::channel(chan); | ||
let (_secb_tx1, secb_rx) = broadcast::channel(chan); | ||
let l1 = Listener::test_listener(name, routeb_rx, secb_rx); | ||
let l1_info = ListenerConfig { | ||
name: name.into(), | ||
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234), | ||
filter_chains: HashMap::default(), | ||
bind_device: None, | ||
with_tls_inspector: false, | ||
proxy_protocol_config: None, | ||
with_tlv_listener_filter: false, | ||
tlv_listener_filter_config: None, | ||
}; | ||
man.start_listener(l1, l1_info).unwrap(); | ||
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
tokio::task::yield_now().await; | ||
|
||
// Start second version | ||
let (routeb_tx2, routeb_rx) = broadcast::channel(chan); | ||
let (_secb_tx2, secb_rx) = broadcast::channel(chan); | ||
let l2 = Listener::test_listener(name, routeb_rx, secb_rx); | ||
let l2_info = ListenerConfig { | ||
name: name.into(), | ||
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1235), // Different port | ||
filter_chains: HashMap::default(), | ||
bind_device: None, | ||
with_tls_inspector: false, | ||
proxy_protocol_config: None, | ||
with_tlv_listener_filter: false, | ||
tlv_listener_filter_config: None, | ||
}; | ||
man.start_listener(l2, l2_info).unwrap(); | ||
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
tokio::task::yield_now().await; | ||
|
||
// Start third version | ||
let (routeb_tx3, routeb_rx) = broadcast::channel(chan); | ||
let (_secb_tx3, secb_rx) = broadcast::channel(chan); | ||
let l3 = Listener::test_listener(name, routeb_rx, secb_rx); | ||
let l3_info = ListenerConfig { | ||
name: name.into(), | ||
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1236), // Different port | ||
filter_chains: HashMap::default(), | ||
bind_device: None, | ||
with_tls_inspector: false, | ||
proxy_protocol_config: None, | ||
with_tlv_listener_filter: false, | ||
tlv_listener_filter_config: None, | ||
}; | ||
man.start_listener(l3, l3_info).unwrap(); | ||
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
tokio::task::yield_now().await; | ||
|
||
// Verify all three versions are active | ||
assert!(routeb_tx1.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
assert!(routeb_tx2.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
assert!(routeb_tx3.send(RouteConfigurationChange::Removed("n/a".into())).is_ok()); | ||
|
||
assert_eq!(man.listener_handles.get(name).unwrap().len(), 3); | ||
|
||
// Stop all versions | ||
man.stop_listener(name).unwrap(); | ||
|
||
// Verify no listeners remain in the manager | ||
assert!(man.listener_handles.get(name).is_none()); | ||
|
||
tokio::task::yield_now().await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test has a lot of duplicated code for creating ListenerConfig
instances. To improve readability and maintainability, consider extracting this logic into a helper function within the test. For example:
fn create_test_listener_config(name: &str, port: u16) -> ListenerConfig {
ListenerConfig {
name: name.into(),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
filter_chains: HashMap::default(),
bind_device: None,
with_tls_inspector: false,
proxy_protocol_config: None,
with_tlv_listener_filter: false,
tlv_listener_filter_config: None,
}
}
You can then call this helper to create l1_info
, l2_info
, and l3_info
.
439ab99
to
ae980ef
Compare
- Change storage from BTreeMap to HashMap<String, Vec<ListenerInfo>> - Add version tracking to prevent breaking existing connections - Update start_listener to create new versions instead of stopping existing ones - Modify stop_listener to handle multiple versions - Update tests to verify multiple version functionality Signed-off-by: Eeshu-Yadav <[email protected]>
ae980ef
to
ea6cd88
Compare
8666898
to
e6d7bd7
Compare
e6d7bd7
to
c62fe60
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some changes requested
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>, | ||
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>, | ||
listener_handles: BTreeMap<&'static str, ListenerInfo>, | ||
listener_handles: HashMap<String, Vec<ListenerInfo>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to use multimap ?
https://docs.rs/multimap/latest/multimap/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced HashMap<String, Vec<ListenerInfo>>
with MultiMap<String, ListenerInfo>
orion-lib/src/lib.rs
Outdated
let mgr = ListenersManager::new( | ||
listener_configuration_receiver, | ||
route_configuration_receiver, | ||
listeners::listeners_manager::ListenerManagerConfig::default(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be assigned insise of ListenerManager::new ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed ListenerManagerConfig::default()
from lib.rs
and moved the default config assignment inside ListenerManager::new(). Added with_config()
method for tests requiring custom configuration.
let join_handle = tokio::spawn(async move { | ||
let error = listener.start().await; | ||
warn!("Listener {listener_name} exited: {error}"); | ||
warn!("Listener {} version {} exited: {}", listener_name_clone, version, error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change logging level to info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed listener exit logging from warn!
to info!
level
self.version_counter += 1; | ||
let version = self.version_counter; | ||
|
||
info!("Starting new version {} of listener {}", version, listener_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably unnecessary. I would merge it with the log at line 160
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed duplicate "Starting new version" log and merged into single informative message: "Started version {} of listener {} ({} total active version(s))"
.
|
||
info!("Starting new version {} of listener {}", version, listener_name); | ||
|
||
let listener_name_clone = listener_name.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be cloned twice ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eliminated unnecessary double clone of listener_name
. Changed from listener_name_clone to listener_name_for_async
(single clone).
CleanupPolicy::CountBasedOnly(max_count) => { | ||
if versions.len() > *max_count { | ||
let to_remove = versions.len() - max_count; | ||
for _ in 0..to_remove { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace with Vec::drain to remove the whole batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vec::drain(0..to_remove)
for efficient batch removal. Changed from individual versions.remove(0)
calls to let removed = versions.drain(0..to_remove).collect::<Vec<_>>()
} | ||
} | ||
}, | ||
CleanupPolicy::TimeBasedOnly(_timeout) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep it simple and remove if it is not implemented properly. Don't like that we will introduce potentially incorrect behaviour and rely on comments/TODOs to keep track of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeBasedOnly(Duration)
from CleanupPolicy enum. Eliminated unimplemented TODO behavior and all related match arms to avoid incorrect behavior.
} | ||
} | ||
}, | ||
CleanupPolicy::Hybrid { max_count, .. } => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how this is different from CountBasedOnly policy. I would remove it as at the moment this functionality is not necessary and it is unlikely that it is going to be used in the nearest future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completely removed Hybrid { timeout: Duration, max_count: usize } policy. Simplified to only CountBasedOnly(usize)
policy, eliminating unnecessary complexity.
|
||
#[derive(Debug, Clone)] | ||
pub struct ListenerManagerConfig { | ||
pub max_versions_per_listener: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does envoy has this limit? What if the old version listener still has connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Eeshu-Yadav , This looks much better, thank you. I have added one small comment/ask.
} | ||
|
||
// Re-insert the remaining versions | ||
for version in versions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use multimap::insert_many ?
} | ||
|
||
fn cleanup_old_versions(&mut self, listener_name: &str) { | ||
if let Some(mut versions) = self.listener_handles.remove(listener_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could use self.listener_handles.get_vec_mut
instead of remove
so then you won't need to re-add listeners in 217 and you won't need to make a call at 22.
Because of line 206, there is no risk of draining all listeners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I donot see where you shutdown the old listener. I think it may leaked forever if there are long running tcp connections
Self::with_config(listener_configuration_channel, route_configuration_channel, ListenerManagerConfig::default()) | ||
} | ||
|
||
pub fn with_config( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn with_config( | |
pub fn new_with_config( |
info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); | ||
// spawn the task for this listener address, this will spawn additional task per connection | ||
|
||
self.version_counter += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use the version from xds configuration
let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0); | ||
info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count); | ||
|
||
self.cleanup_old_versions(&listener_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
self.cleanup_old_versions(&listener_name); | |
self.drain_listener(&listener_name); |
7ba2498
to
360c688
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we maybe not keep on adding extra functionality with every commit ? This just makes this processes unsustainable..
Let's focus on completing this as it was and we can add drain functionality in a different PR.
I agree making each pr as simple and light-weight as possible. The prerequisites is to make sure it does not diverge with the final solution. |
6c5d3d4
to
e0c5d2a
Compare
- Replace HashMap<String, Vec<ListenerInfo>> with MultiMap<String, ListenerInfo> - Change listener exit logging from warn to info level - Merge duplicate logging into single informative message - Eliminate unnecessary double clone of listener_name - Use get_vec_mut instead of remove/re-insert pattern for efficiency Signed-off-by: Eeshu-Yadav <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small thing.. @Eeshu-Yadav and we really need to stop making so many changes.
This is again different from what has been reviewed the last time. It seems like the removing listeners by version logic has been removed.
So this all look very chaotic.. It is great that you want to contribute but making so many random changes will result in much longer reviews.
So let just fix the nit and get it ready without adding any more functionality to this PR.
@dawid-nowak first we have to merge this one , then we will finish the work on the #104 |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: dawid-nowak The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/lgtm |
fixes #98