Skip to content

Commit e5f8520

Browse files
committed
Merge branch 'gracefulswitch' into child_manager_work_scheduler
2 parents 33cd311 + 075f2c0 commit e5f8520

File tree

4 files changed

+92
-77
lines changed

4 files changed

+92
-77
lines changed

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use super::{Subchannel, SubchannelState};
4747
// An LbPolicy implementation that manages multiple children.
4848
#[derive(Debug)]
4949
pub(crate) struct ChildManager<T: Debug, S: ResolverUpdateSharder<T>> {
50-
subchannel_child_map: HashMap<WeakSubchannel, usize>,
50+
subchannel_to_child_idx: HashMap<WeakSubchannel, usize>,
5151
children: Vec<Child<T>>,
5252
update_sharder: S,
5353
pending_work: Arc<Mutex<HashSet<usize>>>,
@@ -60,10 +60,9 @@ pub(crate) struct ChildManager<T: Debug, S: ResolverUpdateSharder<T>> {
6060
#[derive(Debug)]
6161
pub(crate) struct Child<T> {
6262
pub identifier: T,
63-
pub policy: Box<dyn LbPolicy>,
6463
pub builder: Arc<dyn LbPolicyBuilder>,
6564
pub state: LbState,
66-
pub updated: bool, // Set when the child updates its picker; cleared in child_states is called.
65+
policy: Box<dyn LbPolicy>,
6766
work_scheduler: Arc<ChildWorkScheduler>,
6867
}
6968

@@ -107,7 +106,7 @@ where
107106
) -> Self {
108107
Self {
109108
update_sharder,
110-
subchannel_child_map: Default::default(),
109+
subchannel_to_child_idx: Default::default(),
111110
children: Default::default(),
112111
pending_work: Default::default(),
113112
runtime,
@@ -170,12 +169,11 @@ where
170169
) {
171170
// Add all created subchannels into the subchannel_child_map.
172171
for csc in channel_controller.created_subchannels {
173-
self.subchannel_child_map.insert(csc.into(), child_idx);
172+
self.subchannel_to_child_idx.insert(csc.into(), child_idx);
174173
}
175174
// Update the tracked state if the child produced an update.
176175
if let Some(state) = channel_controller.picker_update {
177176
self.children[child_idx].state = state;
178-
self.children[child_idx].updated = true;
179177
self.updated = true;
180178
};
181179
}
@@ -219,26 +217,35 @@ where
219217
let old_children = mem::take(&mut self.children);
220218

221219
// Replace the subchannel map with an empty map.
222-
let old_subchannel_child_map = mem::take(&mut self.subchannel_child_map);
220+
let old_subchannel_child_map = mem::take(&mut self.subchannel_to_child_idx);
223221

224-
// Reverse the old subchannel map.
225-
let mut old_child_subchannels_map: HashMap<usize, Vec<WeakSubchannel>> = HashMap::new();
222+
// Reverse the old subchannel map into a vector indexed by the old child ID.
223+
let mut old_child_subchannels: Vec<Vec<WeakSubchannel>> = Vec::new();
224+
old_child_subchannels.resize_with(old_children.len(), Vec::new);
226225

227-
for (subchannel, child_idx) in old_subchannel_child_map {
228-
old_child_subchannels_map
229-
.entry(child_idx)
230-
.or_default()
231-
.push(subchannel);
226+
for (subchannel, old_idx) in old_subchannel_child_map {
227+
old_child_subchannels[old_idx].push(subchannel);
232228
}
233229

234230
// Build a map of the old children from their IDs for efficient lookups.
235-
let old_children = old_children.into_iter().enumerate().map(|(old_idx, e)| {
236-
(
237-
(e.builder.name(), e.identifier),
238-
(e.policy, e.state, old_idx, e.work_scheduler, e.updated),
239-
)
240-
});
241-
let mut old_children: HashMap<(&'static str, T), _> = old_children.collect();
231+
// This leverages a Child<usize> to hold all the entries where the
232+
// identifier becomes the index within the old self.children vector.
233+
let mut old_children: HashMap<(&'static str, T), _> = old_children
234+
.into_iter()
235+
.enumerate()
236+
.map(|(old_idx, e)| {
237+
(
238+
(e.builder.name(), e.identifier),
239+
Child {
240+
identifier: old_idx,
241+
policy: e.policy,
242+
builder: e.builder,
243+
state: e.state,
244+
work_scheduler: e.work_scheduler,
245+
},
246+
)
247+
})
248+
.collect();
242249

243250
// Split the child updates into the IDs and builders, and the
244251
// ResolverUpdates/LbConfigs.
@@ -251,26 +258,21 @@ where
251258
// subchannel map.
252259
for (new_idx, (identifier, builder)) in ids_builders.into_iter().enumerate() {
253260
let k = (builder.name(), identifier);
254-
if let Some((policy, state, old_idx, work_scheduler, updated)) = old_children.remove(&k)
255-
{
256-
for subchannel in old_child_subchannels_map
257-
.remove(&old_idx)
258-
.into_iter()
259-
.flatten()
260-
{
261-
self.subchannel_child_map.insert(subchannel, new_idx);
261+
if let Some(old_child) = old_children.remove(&k) {
262+
let old_idx = old_child.identifier;
263+
for subchannel in mem::take(&mut old_child_subchannels[old_idx]) {
264+
self.subchannel_to_child_idx.insert(subchannel, new_idx);
262265
}
263266
if old_pending_work.contains(&old_idx) {
264267
pending_work.insert(new_idx);
265268
}
266-
*work_scheduler.idx.lock().unwrap() = Some(new_idx);
269+
*old_child.work_scheduler.idx.lock().unwrap() = Some(new_idx);
267270
self.children.push(Child {
268271
builder,
269272
identifier: k.1,
270-
state,
271-
policy,
272-
work_scheduler,
273-
updated,
273+
state: old_child.state,
274+
policy: old_child.policy,
275+
work_scheduler: old_child.work_scheduler,
274276
});
275277
} else {
276278
let work_scheduler = Arc::new(ChildWorkScheduler {
@@ -288,14 +290,13 @@ where
288290
state: LbState::initial(),
289291
policy,
290292
work_scheduler,
291-
updated: false,
292293
});
293294
};
294295
}
295296

296297
// Invalidate all deleted children's work_schedulers.
297-
for (_, (_, _, _, work_scheduler, _)) in old_children {
298-
*work_scheduler.idx.lock().unwrap() = None;
298+
for (_, old_child) in old_children {
299+
*old_child.work_scheduler.idx.lock().unwrap() = None;
299300
}
300301

301302
// Release the pending_work mutex before calling into the children to
@@ -333,7 +334,7 @@ where
333334
) {
334335
// Determine which child created this subchannel.
335336
let child_idx = *self
336-
.subchannel_child_map
337+
.subchannel_to_child_idx
337338
.get(&WeakSubchannel::new(&subchannel))
338339
.unwrap();
339340
let policy = &mut self.children[child_idx].policy;

grpc/src/client/load_balancing/graceful_switch.rs

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::client::load_balancing::child_manager::{
22
self, ChildManager, ChildUpdate, ResolverUpdateSharder,
33
};
44
use crate::client::load_balancing::{
5-
ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, ParsedJsonLbConfig, Subchannel,
6-
SubchannelState, WorkScheduler, GLOBAL_LB_REGISTRY,
5+
ChannelController, LbConfig, LbPolicy, LbPolicyBuilder, LbState, ParsedJsonLbConfig,
6+
Subchannel, SubchannelState, WorkScheduler, GLOBAL_LB_REGISTRY,
77
};
88
use crate::client::name_resolution::ResolverUpdate;
99
use crate::client::ConnectivityState;
@@ -103,6 +103,7 @@ impl UpdateSharder {
103103
#[derive(Debug)]
104104
pub(crate) struct GracefulSwitchPolicy {
105105
child_manager: ChildManager<(), UpdateSharder>, // Child ID is the name of the child policy.
106+
last_update: LbState, // Saves the last output LbState to determine if an update is needed.
106107
}
107108

108109
impl LbPolicy for GracefulSwitchPolicy {
@@ -115,7 +116,7 @@ impl LbPolicy for GracefulSwitchPolicy {
115116
let res = self
116117
.child_manager
117118
.resolver_update(update, config, channel_controller)?;
118-
self.maybe_swap(channel_controller);
119+
self.update_picker(channel_controller);
119120
Ok(())
120121
}
121122

@@ -127,17 +128,17 @@ impl LbPolicy for GracefulSwitchPolicy {
127128
) {
128129
self.child_manager
129130
.subchannel_update(subchannel, state, channel_controller);
130-
self.maybe_swap(channel_controller);
131+
self.update_picker(channel_controller);
131132
}
132133

133134
fn work(&mut self, channel_controller: &mut dyn ChannelController) {
134135
self.child_manager.work(channel_controller);
135-
self.maybe_swap(channel_controller);
136+
self.update_picker(channel_controller);
136137
}
137138

138139
fn exit_idle(&mut self, channel_controller: &mut dyn ChannelController) {
139140
self.child_manager.exit_idle(channel_controller);
140-
self.maybe_swap(channel_controller);
141+
self.update_picker(channel_controller);
141142
}
142143
}
143144

@@ -152,6 +153,7 @@ impl GracefulSwitchPolicy {
152153
pub fn new(runtime: Arc<dyn Runtime>, work_scheduler: Arc<dyn WorkScheduler>) -> Self {
153154
GracefulSwitchPolicy {
154155
child_manager: ChildManager::new(UpdateSharder::new(), runtime, work_scheduler),
156+
last_update: LbState::initial(),
155157
}
156158
}
157159

@@ -191,10 +193,27 @@ impl GracefulSwitchPolicy {
191193
Err("no supported policies found in config".into())
192194
}
193195

194-
fn maybe_swap(&mut self, channel_controller: &mut dyn ChannelController) {
195-
if !self.child_manager.child_updated() {
196+
fn update_picker(&mut self, channel_controller: &mut dyn ChannelController) {
197+
let Some(update) = self.maybe_swap(channel_controller) else {
198+
return;
199+
};
200+
if self.last_update.connectivity_state == update.connectivity_state
201+
&& std::ptr::addr_eq(
202+
Arc::as_ptr(&self.last_update.picker),
203+
Arc::as_ptr(&update.picker),
204+
)
205+
{
196206
return;
197207
}
208+
channel_controller.update_picker(update.clone());
209+
self.last_update = update;
210+
}
211+
212+
// Determines the appropriate state to output
213+
fn maybe_swap(&mut self, channel_controller: &mut dyn ChannelController) -> Option<LbState> {
214+
if !self.child_manager.child_updated() {
215+
return None;
216+
}
198217

199218
let active_name = self
200219
.child_manager
@@ -203,36 +222,33 @@ impl GracefulSwitchPolicy {
203222
.as_ref()
204223
.unwrap()
205224
.name();
206-
let mut could_switch = false;
207-
let mut active_state_if_updated = None;
208-
let mut pending_builder = None;
209-
let mut pending_state = None;
225+
226+
let mut active_child = None;
227+
let mut pending_child = None;
210228
for child in self.child_manager.children() {
211229
if child.builder.name() == active_name {
212-
could_switch |= child.state.connectivity_state != ConnectivityState::Ready;
213-
if child.updated {
214-
active_state_if_updated = Some(child.state.clone());
215-
}
230+
active_child = Some(child);
216231
} else {
217-
pending_builder = Some(child.builder.clone());
218-
pending_state = Some(child.state.clone());
219-
could_switch |= child.state.connectivity_state != ConnectivityState::Connecting;
232+
pending_child = Some(child);
220233
}
221234
}
222-
if could_switch && pending_builder.is_some() {
223-
self.child_manager
224-
.resolver_update(
225-
ResolverUpdate::default(),
226-
Some(&LbConfig::new(GracefulSwitchLbConfig::Swap(
227-
pending_builder.unwrap(),
228-
))),
229-
channel_controller,
230-
)
231-
.expect("resolver_update with an empty update should not fail");
232-
channel_controller.update_picker(pending_state.unwrap().clone());
233-
} else if active_state_if_updated.is_some() {
234-
channel_controller.update_picker(active_state_if_updated.unwrap());
235+
let active_child = active_child.expect("There should always be an active child policy");
236+
let Some(pending_child) = pending_child else {
237+
return Some(active_child.state.clone());
238+
};
239+
240+
if active_child.state.connectivity_state == ConnectivityState::Ready
241+
&& pending_child.state.connectivity_state == ConnectivityState::Connecting
242+
{
243+
return Some(active_child.state.clone());
235244
}
245+
246+
let config = &LbConfig::new(GracefulSwitchLbConfig::Swap(pending_child.builder.clone()));
247+
let state = pending_child.state.clone();
248+
self.child_manager
249+
.resolver_update(ResolverUpdate::default(), Some(config), channel_controller)
250+
.expect("resolver_update with an empty update should not fail");
251+
return Some(state);
236252
}
237253
}
238254

@@ -809,11 +825,8 @@ mod test {
809825
tcc.as_mut(),
810826
ConnectivityState::Connecting,
811827
);
812-
verify_correct_picker_from_policy(
813-
&mut rx_events,
814-
"stub-gracefulswitch_current_leaving_ready-one",
815-
)
816-
.await;
828+
// This should not produce an update.
829+
assert_channel_empty(&mut rx_events).await;
817830
move_subchannel_to_state(
818831
&mut *graceful_switch,
819832
current_subchannel,

grpc/src/client/load_balancing/registry.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ impl Default for LbPolicyRegistry {
3737

3838
/// The registry used if a local registry is not provided to a channel or if it
3939
/// does not exist in the local registry.
40-
pub(crate) static GLOBAL_LB_REGISTRY: LazyLock<LbPolicyRegistry> = LazyLock::new(LbPolicyRegistry::new);
40+
pub(crate) static GLOBAL_LB_REGISTRY: LazyLock<LbPolicyRegistry> =
41+
LazyLock::new(LbPolicyRegistry::new);

grpc/src/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) mod transport;
4545
///
4646
/// Channels may re-enter the Idle state if they are unused for longer than
4747
/// their configured idleness timeout.
48-
#[derive(Copy, Clone, PartialEq, Debug)]
48+
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
4949
pub enum ConnectivityState {
5050
Idle,
5151
Connecting,

0 commit comments

Comments
 (0)