Skip to content

Commit 7b9f660

Browse files
committed
add round robin
1 parent 4718a0d commit 7b9f660

File tree

3 files changed

+670
-654
lines changed

3 files changed

+670
-654
lines changed

grpc/src/client/load_balancing/child_manager.rs

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub struct ChildManager<T> {
5050
update_sharder: Box<dyn ResolverUpdateSharder<T>>,
5151
pending_work: Arc<Mutex<HashSet<usize>>>,
5252
runtime: Arc<dyn Runtime>,
53+
updated: bool,
5354
}
5455

5556
struct Child<T> {
@@ -94,6 +95,7 @@ impl<T> ChildManager<T> {
9495
children: Default::default(),
9596
pending_work: Default::default(),
9697
runtime,
98+
updated: false,
9799
}
98100
}
99101

@@ -158,8 +160,14 @@ impl<T> ChildManager<T> {
158160
// Update the tracked state if the child produced an update.
159161
if let Some(state) = channel_controller.picker_update {
160162
self.children[child_idx].state = state;
163+
self.updated = true;
161164
};
162165
}
166+
167+
/// Returns true if a child has produced an update and resets flag to false.
168+
pub fn has_updated(&mut self) -> bool {
169+
mem::take(&mut self.updated)
170+
}
163171
}
164172

165173
impl<T: PartialEq + Hash + Eq + Send + Sync + 'static> LbPolicy for ChildManager<T> {
@@ -363,10 +371,9 @@ mod test {
363371
Child, ChildManager, ChildUpdate, ChildWorkScheduler, ResolverUpdateSharder,
364372
};
365373
use crate::client::load_balancing::test_utils::{
366-
self, StubPolicy, StubPolicyFuncs, TestChannelController, TestEvent, TestSubchannel,
367-
TestWorkScheduler,
374+
self, StubPolicy, StubPolicyData, StubPolicyFuncs, TestChannelController, TestEvent,
375+
TestSubchannel, TestWorkScheduler,
368376
};
369-
use crate::client::load_balancing::utils::EndpointSharder;
370377
use crate::client::load_balancing::{
371378
ChannelController, LbPolicy, LbPolicyBuilder, LbPolicyOptions, LbState, ParsedJsonLbConfig,
372379
Pick, PickResult, Picker, QueuingPicker, Subchannel, SubchannelState, GLOBAL_LB_REGISTRY,
@@ -385,6 +392,37 @@ mod test {
385392
use tokio::sync::mpsc;
386393
use tonic::metadata::MetadataMap;
387394

395+
// TODO: This needs to be moved to a common place that can be shared between
396+
// round_robin and this test. This EndpointSharder maps endpoints to
397+
// children policies.
398+
struct EndpointSharder {
399+
builder: Arc<dyn LbPolicyBuilder>,
400+
}
401+
402+
impl ResolverUpdateSharder<Endpoint> for EndpointSharder {
403+
fn shard_update(
404+
&self,
405+
resolver_update: ResolverUpdate,
406+
) -> Result<Box<dyn Iterator<Item = ChildUpdate<Endpoint>>>, Box<dyn Error + Send + Sync>>
407+
{
408+
let mut sharded_endpoints = Vec::new();
409+
for endpoint in resolver_update.endpoints.unwrap().iter() {
410+
let child_update = ChildUpdate {
411+
child_identifier: endpoint.clone(),
412+
child_policy_builder: self.builder.clone(),
413+
child_update: ResolverUpdate {
414+
attributes: resolver_update.attributes.clone(),
415+
endpoints: Ok(vec![endpoint.clone()]),
416+
service_config: resolver_update.service_config.clone(),
417+
resolution_note: resolver_update.resolution_note.clone(),
418+
},
419+
};
420+
sharded_endpoints.push(child_update);
421+
}
422+
Ok(Box::new(sharded_endpoints.into_iter()))
423+
}
424+
}
425+
388426
// Sets up the test environment.
389427
//
390428
// Performs the following:
@@ -414,7 +452,7 @@ mod test {
414452
let (tx_events, rx_events) = mpsc::unbounded_channel::<TestEvent>();
415453
let tcc = Box::new(TestChannelController { tx_events });
416454
let builder: Arc<dyn LbPolicyBuilder> = GLOBAL_LB_REGISTRY.get_policy(test_name).unwrap();
417-
let endpoint_sharder = EndpointSharder { builder: builder };
455+
let endpoint_sharder = EndpointSharder { builder };
418456
let child_manager = ChildManager::new(Box::new(endpoint_sharder), default_runtime());
419457
(rx_events, Box::new(child_manager), tcc)
420458
}
@@ -487,25 +525,30 @@ mod test {
487525
// Defines the functions resolver_update and subchannel_update to test
488526
// aggregate_states.
489527
fn create_verifying_funcs_for_aggregate_tests() -> StubPolicyFuncs {
528+
let data = StubPolicyData::new();
490529
StubPolicyFuncs {
491530
// Closure for resolver_update. resolver_update should only receive
492531
// one endpoint and create one subchannel for the endpoint it
493532
// receives.
494-
resolver_update: Some(move |update: ResolverUpdate, _, controller| {
495-
assert_eq!(update.endpoints.iter().len(), 1);
496-
let endpoint = update.endpoints.unwrap().pop().unwrap();
497-
let subchannel = controller.new_subchannel(&endpoint.addresses[0]);
498-
Ok(())
499-
}),
533+
resolver_update: Some(Arc::new(
534+
move |data, update: ResolverUpdate, _, controller| {
535+
assert_eq!(update.endpoints.iter().len(), 1);
536+
let endpoint = update.endpoints.unwrap().pop().unwrap();
537+
let subchannel = controller.new_subchannel(&endpoint.addresses[0]);
538+
Ok(())
539+
},
540+
)),
500541
// Closure for subchannel_update. Sends a picker of the same state
501542
// that was passed to it.
502-
subchannel_update: Some(move |updated_subchannel, state, controller| {
503-
controller.update_picker(LbState {
504-
connectivity_state: state.connectivity_state,
505-
picker: Arc::new(QueuingPicker {}),
506-
});
507-
}),
508-
..Default::default()
543+
subchannel_update: Some(Arc::new(
544+
move |data, updated_subchannel, state, controller| {
545+
controller.update_picker(LbState {
546+
connectivity_state: state.connectivity_state,
547+
picker: Arc::new(QueuingPicker {}),
548+
});
549+
},
550+
)),
551+
exit_idle: None,
509552
}
510553
}
511554

0 commit comments

Comments
 (0)