@@ -341,84 +341,83 @@ impl IioThread {
341341
342342 // Spawn a high priority thread that updates the atomic values in `thread`.
343343 wtb. spawn_thread ( thread_name, move || {
344- {
345- let adc_setup_res = Self :: adc_setup (
346- adc_name,
347- trigger_name,
348- sample_rate,
349- channel_descs,
350- buffer_len,
351- ) ;
352- let ( thread, channels, mut buf) = match adc_setup_res {
353- Ok ( ( channels, buf) ) => {
354- let thread = Arc :: new ( Self {
355- ref_instant : Instant :: now ( ) ,
356- timestamp : AtomicU64 :: new ( TIMESTAMP_ERROR ) ,
357- values : channels. iter ( ) . map ( |_| AtomicU16 :: new ( 0 ) ) . collect ( ) ,
358- channel_descs,
359- } ) ;
360- ( thread, channels, buf)
361- }
362- Err ( e) => {
363- // Can not fail in practice as the queue is known to be empty
364- // at this point.
365- thread_res_tx. try_send ( Err ( e) ) . unwrap ( ) ;
366- return Ok ( ( ) ) ;
367- }
368- } ;
369-
370- let thread_weak = Arc :: downgrade ( & thread) ;
371- let mut signal_ready = Some ( ( thread, thread_res_tx) ) ;
372-
373- // Stop running as soon as the last reference to this Arc<IioThread>
374- // is dropped (e.g. the weak reference can no longer be upgraded).
375- while let Some ( thread) = thread_weak. upgrade ( ) {
376- if let Err ( e) = buf. refill ( ) {
377- thread. timestamp . store ( TIMESTAMP_ERROR , Ordering :: Relaxed ) ;
378-
379- error ! ( "Failed to refill {} ADC buffer: {}" , adc_name, e) ;
380-
381- // If the ADC has not yet produced any values we still have the
382- // queue at hand that signals readiness to the main thread.
383- // This gives us a chance to return an Err from new().
384- // If the queue was already used just print an error instead.
385- if let Some ( ( _, tx) ) = signal_ready. take ( ) {
386- // Can not fail in practice as the queue is only .take()n
387- // once and thus known to be empty.
388- tx. try_send ( Err ( Error :: new ( e) ) ) . unwrap ( ) ;
389- }
390-
391- break ;
392- }
393-
394- let values = channels. iter ( ) . map ( |ch| {
395- let buf_sum: u32 = buf. channel_iter :: < u16 > ( ch) . map ( |v| v as u32 ) . sum ( ) ;
396- ( buf_sum / ( buf. capacity ( ) as u32 ) ) as u16
344+ let adc_setup_res = Self :: adc_setup (
345+ adc_name,
346+ trigger_name,
347+ sample_rate,
348+ channel_descs,
349+ buffer_len,
350+ ) ;
351+ let ( thread, channels, mut buf) = match adc_setup_res {
352+ Ok ( ( channels, buf) ) => {
353+ let thread = Arc :: new ( Self {
354+ ref_instant : Instant :: now ( ) ,
355+ timestamp : AtomicU64 :: new ( TIMESTAMP_ERROR ) ,
356+ values : channels. iter ( ) . map ( |_| AtomicU16 :: new ( 0 ) ) . collect ( ) ,
357+ channel_descs,
397358 } ) ;
398359
399- for ( d, s) in thread. values . iter ( ) . zip ( values) {
400- d. store ( s, Ordering :: Relaxed )
401- }
360+ ( thread, channels, buf)
361+ }
362+ Err ( e) => {
363+ // Can not fail in practice as the queue is known to be empty
364+ // at this point.
365+ thread_res_tx. try_send ( Err ( e) ) . unwrap ( ) ;
366+ return Ok ( ( ) ) ;
367+ }
368+ } ;
369+
370+ let thread_weak = Arc :: downgrade ( & thread) ;
371+ let mut signal_ready = Some ( ( thread, thread_res_tx) ) ;
402372
403- // These should only fail if
404- // a) The monotonic time started running backward
405- // b) The tacd has been running for more than 2**64ns (584 years).
406- let ts: u64 = Instant :: now ( )
407- . checked_duration_since ( thread. ref_instant )
408- . and_then ( |d| d. as_nanos ( ) . try_into ( ) . ok ( ) )
409- . unwrap_or ( TIMESTAMP_ERROR ) ;
373+ // Stop running as soon as the last reference to this Arc<IioThread>
374+ // is dropped (e.g. the weak reference can no longer be upgraded).
375+ while let Some ( thread) = thread_weak. upgrade ( ) {
376+ if let Err ( e) = buf. refill ( ) {
377+ thread. timestamp . store ( TIMESTAMP_ERROR , Ordering :: Relaxed ) ;
410378
411- thread . timestamp . store ( ts , Ordering :: Release ) ;
379+ error ! ( "Failed to refill {} ADC buffer: {}" , adc_name , e ) ;
412380
413- // Now that we know that the ADC actually works and we have
414- // initial values: return a handle to it.
415- if let Some ( ( content, tx) ) = signal_ready. take ( ) {
381+ // If the ADC has not yet produced any values we still have the
382+ // queue at hand that signals readiness to the main thread.
383+ // This gives us a chance to return an Err from new().
384+ // If the queue was already used just print an error instead.
385+ if let Some ( ( _, tx) ) = signal_ready. take ( ) {
416386 // Can not fail in practice as the queue is only .take()n
417387 // once and thus known to be empty.
418- tx. try_send ( Ok ( content ) ) . unwrap ( ) ;
388+ tx. try_send ( Err ( Error :: new ( e ) ) ) . unwrap ( ) ;
419389 }
390+
391+ break ;
420392 }
421- } ;
393+
394+ let values = channels. iter ( ) . map ( |ch| {
395+ let buf_sum: u32 = buf. channel_iter :: < u16 > ( ch) . map ( |v| v as u32 ) . sum ( ) ;
396+ ( buf_sum / ( buf. capacity ( ) as u32 ) ) as u16
397+ } ) ;
398+
399+ for ( d, s) in thread. values . iter ( ) . zip ( values) {
400+ d. store ( s, Ordering :: Relaxed )
401+ }
402+
403+ // These should only fail if
404+ // a) The monotonic time started running backward
405+ // b) The tacd has been running for more than 2**64ns (584 years).
406+ let ts: u64 = Instant :: now ( )
407+ . checked_duration_since ( thread. ref_instant )
408+ . and_then ( |d| d. as_nanos ( ) . try_into ( ) . ok ( ) )
409+ . unwrap_or ( TIMESTAMP_ERROR ) ;
410+
411+ thread. timestamp . store ( ts, Ordering :: Release ) ;
412+
413+ // Now that we know that the ADC actually works and we have
414+ // initial values: return a handle to it.
415+ if let Some ( ( content, tx) ) = signal_ready. take ( ) {
416+ // Can not fail in practice as the queue is only .take()n
417+ // once and thus known to be empty.
418+ tx. try_send ( Ok ( content) ) . unwrap ( ) ;
419+ }
420+ }
422421
423422 Ok ( ( ) )
424423 } ) ?;
0 commit comments