@@ -50,6 +50,7 @@ pub struct ChildManager<T> {
5050 update_sharder : Box < dyn ResolverUpdateSharder < T > > ,
5151 pending_work : Arc < Mutex < HashSet < usize > > > ,
5252 runtime : Arc < dyn Runtime > ,
53+ updated : bool ,
5354}
5455
5556struct Child < T > {
@@ -94,6 +95,7 @@ impl<T> ChildManager<T> {
9495 children : Default :: default ( ) ,
9596 pending_work : Default :: default ( ) ,
9697 runtime,
98+ updated : false ,
9799 }
98100 }
99101
@@ -158,8 +160,14 @@ impl<T> ChildManager<T> {
158160 // Update the tracked state if the child produced an update.
159161 if let Some ( state) = channel_controller. picker_update {
160162 self . children [ child_idx] . state = state;
163+ self . updated = true ;
161164 } ;
162165 }
166+
167+ /// Returns true if a child has produced an update and resets flag to false.
168+ pub fn has_updated ( & mut self ) -> bool {
169+ mem:: take ( & mut self . updated )
170+ }
163171}
164172
165173impl < T : PartialEq + Hash + Eq + Send + Sync + ' static > LbPolicy for ChildManager < T > {
@@ -363,8 +371,8 @@ mod test {
363371 Child , ChildManager , ChildUpdate , ChildWorkScheduler , ResolverUpdateSharder ,
364372 } ;
365373 use crate :: client:: load_balancing:: test_utils:: {
366- self , StubPolicy , StubPolicyFuncs , TestChannelController , TestEvent , TestSubchannel ,
367- TestWorkScheduler ,
374+ self , StubPolicy , StubPolicyData , StubPolicyFuncs , TestChannelController , TestEvent ,
375+ TestSubchannel , TestWorkScheduler ,
368376 } ;
369377 use crate :: client:: load_balancing:: {
370378 ChannelController , LbPolicy , LbPolicyBuilder , LbPolicyOptions , LbState , ParsedJsonLbConfig ,
@@ -444,7 +452,7 @@ mod test {
444452 let ( tx_events, rx_events) = mpsc:: unbounded_channel :: < TestEvent > ( ) ;
445453 let tcc = Box :: new ( TestChannelController { tx_events } ) ;
446454 let builder: Arc < dyn LbPolicyBuilder > = GLOBAL_LB_REGISTRY . get_policy ( test_name) . unwrap ( ) ;
447- let endpoint_sharder = EndpointSharder { builder : builder } ;
455+ let endpoint_sharder = EndpointSharder { builder } ;
448456 let child_manager = ChildManager :: new ( Box :: new ( endpoint_sharder) , default_runtime ( ) ) ;
449457 ( rx_events, Box :: new ( child_manager) , tcc)
450458 }
@@ -517,25 +525,29 @@ mod test {
517525 // Defines the functions resolver_update and subchannel_update to test
518526 // aggregate_states.
519527 fn create_verifying_funcs_for_aggregate_tests ( ) -> StubPolicyFuncs {
528+ let data = StubPolicyData :: new ( ) ;
520529 StubPolicyFuncs {
521530 // Closure for resolver_update. resolver_update should only receive
522531 // one endpoint and create one subchannel for the endpoint it
523532 // receives.
524- resolver_update : Some ( move |update : ResolverUpdate , _, controller| {
525- assert_eq ! ( update. endpoints. iter( ) . len( ) , 1 ) ;
526- let endpoint = update. endpoints . unwrap ( ) . pop ( ) . unwrap ( ) ;
527- let subchannel = controller. new_subchannel ( & endpoint. addresses [ 0 ] ) ;
528- Ok ( ( ) )
529- } ) ,
533+ resolver_update : Some ( Arc :: new (
534+ move |data, update : ResolverUpdate , _, controller| {
535+ assert_eq ! ( update. endpoints. iter( ) . len( ) , 1 ) ;
536+ let endpoint = update. endpoints . unwrap ( ) . pop ( ) . unwrap ( ) ;
537+ let subchannel = controller. new_subchannel ( & endpoint. addresses [ 0 ] ) ;
538+ Ok ( ( ) )
539+ } ,
540+ ) ) ,
530541 // Closure for subchannel_update. Sends a picker of the same state
531542 // that was passed to it.
532- subchannel_update : Some ( move |updated_subchannel, state, controller| {
533- controller. update_picker ( LbState {
534- connectivity_state : state. connectivity_state ,
535- picker : Arc :: new ( QueuingPicker { } ) ,
536- } ) ;
537- } ) ,
538- ..Default :: default ( )
543+ subchannel_update : Some ( Arc :: new (
544+ move |data, updated_subchannel, state, controller| {
545+ controller. update_picker ( LbState {
546+ connectivity_state : state. connectivity_state ,
547+ picker : Arc :: new ( QueuingPicker { } ) ,
548+ } ) ;
549+ } ,
550+ ) ) ,
539551 }
540552 }
541553
0 commit comments