@@ -20,9 +20,6 @@ use std::fs::create_dir;
2020use std:: io:: Read ;
2121use std:: path:: Path ;
2222use std:: sync:: atomic:: { AtomicU16 , AtomicU64 , Ordering } ;
23- use std:: sync:: Mutex ;
24- use std:: thread;
25- use std:: thread:: JoinHandle ;
2623use std:: time:: { Duration , Instant } ;
2724
2825use anyhow:: { anyhow, Context , Error , Result } ;
@@ -35,6 +32,7 @@ use log::{debug, error, warn};
3532use thread_priority:: * ;
3633
3734use crate :: measurement:: { Measurement , Timestamp } ;
35+ use crate :: watched_tasks:: WatchedTasksBuilder ;
3836
3937struct ChannelDesc {
4038 kernel_name : & ' static str ,
@@ -255,7 +253,6 @@ pub struct IioThread {
255253 ref_instant : Instant ,
256254 timestamp : AtomicU64 ,
257255 values : Vec < AtomicU16 > ,
258- join : Mutex < Option < JoinHandle < ( ) > > > ,
259256 channel_descs : & ' static [ ChannelDesc ] ,
260257}
261258
@@ -325,7 +322,8 @@ impl IioThread {
325322 }
326323
327324 async fn new (
328- thread_name : & str ,
325+ wtb : & mut WatchedTasksBuilder ,
326+ thread_name : & ' static str ,
329327 adc_name : & ' static str ,
330328 trigger_name : & ' static str ,
331329 sample_rate : i64 ,
@@ -342,102 +340,101 @@ impl IioThread {
342340 let ( thread_res_tx, thread_res_rx) = bounded ( 1 ) ;
343341
344342 // Spawn a high priority thread that updates the atomic values in `thread`.
345- let join = thread:: Builder :: new ( )
346- . name ( format ! ( "tacd {thread_name} iio" ) )
347- . spawn ( move || {
348- let adc_setup_res = Self :: adc_setup (
349- adc_name,
350- trigger_name,
351- sample_rate,
352- channel_descs,
353- buffer_len,
354- ) ;
355- let ( thread, channels, mut buf) = match adc_setup_res {
356- Ok ( ( channels, buf) ) => {
357- let thread = Arc :: new ( Self {
358- ref_instant : Instant :: now ( ) ,
359- timestamp : AtomicU64 :: new ( TIMESTAMP_ERROR ) ,
360- values : channels. iter ( ) . map ( |_| AtomicU16 :: new ( 0 ) ) . collect ( ) ,
361- join : Mutex :: new ( None ) ,
362- channel_descs,
363- } ) ;
364-
365- ( thread, channels, buf)
366- }
367- Err ( e) => {
368- // Can not fail in practice as the queue is known to be empty
369- // at this point.
370- thread_res_tx. try_send ( Err ( e) ) . unwrap ( ) ;
371- return ;
372- }
373- } ;
374-
375- let thread_weak = Arc :: downgrade ( & thread) ;
376- let mut signal_ready = Some ( ( thread, thread_res_tx) ) ;
377-
378- // Stop running as soon as the last reference to this Arc<IioThread>
379- // is dropped (e.g. the weak reference can no longer be upgraded).
380- while let Some ( thread) = thread_weak. upgrade ( ) {
381- if let Err ( e) = buf. refill ( ) {
382- thread. timestamp . store ( TIMESTAMP_ERROR , Ordering :: Relaxed ) ;
383-
384- error ! ( "Failed to refill {} ADC buffer: {}" , adc_name, e) ;
385-
386- // If the ADC has not yet produced any values we still have the
387- // queue at hand that signals readiness to the main thread.
388- // This gives us a chance to return an Err from new().
389- // If the queue was already used just print an error instead.
390- if let Some ( ( _, tx) ) = signal_ready. take ( ) {
391- // Can not fail in practice as the queue is only .take()n
392- // once and thus known to be empty.
393- tx. try_send ( Err ( Error :: new ( e) ) ) . unwrap ( ) ;
394- }
395-
396- break ;
397- }
398-
399- let values = channels. iter ( ) . map ( |ch| {
400- let buf_sum: u32 = buf. channel_iter :: < u16 > ( ch) . map ( |v| v as u32 ) . sum ( ) ;
401- ( buf_sum / ( buf. capacity ( ) as u32 ) ) as u16
343+ wtb. spawn_thread ( thread_name, move || {
344+ let adc_setup_res = Self :: adc_setup (
345+ adc_name,
346+ trigger_name,
347+ sample_rate,
348+ channel_descs,
349+ buffer_len,
350+ ) ;
351+ let ( thread, channels, mut buf) = match adc_setup_res {
352+ Ok ( ( channels, buf) ) => {
353+ let thread = Arc :: new ( Self {
354+ ref_instant : Instant :: now ( ) ,
355+ timestamp : AtomicU64 :: new ( TIMESTAMP_ERROR ) ,
356+ values : channels. iter ( ) . map ( |_| AtomicU16 :: new ( 0 ) ) . collect ( ) ,
357+ channel_descs,
402358 } ) ;
403359
404- for ( d, s) in thread. values . iter ( ) . zip ( values) {
405- d. store ( s, Ordering :: Relaxed )
406- }
360+ ( thread, channels, buf)
361+ }
362+ Err ( e) => {
363+ // Can not fail in practice as the queue is known to be empty
364+ // at this point.
365+ thread_res_tx
366+ . try_send ( Err ( e) )
367+ . expect ( "Failed to signal ADC setup error due to full queue" ) ;
368+ return Ok ( ( ) ) ;
369+ }
370+ } ;
371+
372+ let thread_weak = Arc :: downgrade ( & thread) ;
373+ let mut signal_ready = Some ( ( thread, thread_res_tx) ) ;
407374
408- // These should only fail if
409- // a) The monotonic time started running backward
410- // b) The tacd has been running for more than 2**64ns (584 years).
411- let ts: u64 = Instant :: now ( )
412- . checked_duration_since ( thread. ref_instant )
413- . and_then ( |d| d. as_nanos ( ) . try_into ( ) . ok ( ) )
414- . unwrap_or ( TIMESTAMP_ERROR ) ;
375+ // Stop running as soon as the last reference to this Arc<IioThread>
376+ // is dropped (e.g. the weak reference can no longer be upgraded).
377+ while let Some ( thread) = thread_weak. upgrade ( ) {
378+ if let Err ( e) = buf. refill ( ) {
379+ thread. timestamp . store ( TIMESTAMP_ERROR , Ordering :: Relaxed ) ;
415380
416- thread . timestamp . store ( ts , Ordering :: Release ) ;
381+ error ! ( "Failed to refill {} ADC buffer: {}" , adc_name , e ) ;
417382
418- // Now that we know that the ADC actually works and we have
419- // initial values: return a handle to it.
420- if let Some ( ( content, tx) ) = signal_ready. take ( ) {
383+ // If the ADC has not yet produced any values we still have the
384+ // queue at hand that signals readiness to the main thread.
385+ // This gives us a chance to return an Err from new().
386+ // If the queue was already used just print an error instead.
387+ if let Some ( ( _, tx) ) = signal_ready. take ( ) {
421388 // Can not fail in practice as the queue is only .take()n
422389 // once and thus known to be empty.
423- tx. try_send ( Ok ( content) ) . unwrap ( ) ;
390+ tx. try_send ( Err ( Error :: new ( e) ) )
391+ . expect ( "Failed to signal ADC setup error due to full queue" ) ;
424392 }
393+
394+ break ;
425395 }
426- } ) ?;
427396
428- let thread = thread_res_rx. recv ( ) . await ??;
397+ let values = channels. iter ( ) . map ( |ch| {
398+ let buf_sum: u32 = buf. channel_iter :: < u16 > ( ch) . map ( |v| v as u32 ) . sum ( ) ;
399+ ( buf_sum / ( buf. capacity ( ) as u32 ) ) as u16
400+ } ) ;
429401
430- // Locking the Mutex could only fail if the Mutex was poisoned by
431- // a thread that held the lock and panicked.
432- // At this point the Mutex has not yet been locked in another thread.
433- * thread. join . lock ( ) . unwrap ( ) = Some ( join) ;
402+ for ( d, s) in thread. values . iter ( ) . zip ( values) {
403+ d. store ( s, Ordering :: Relaxed )
404+ }
405+
406+ // These should only fail if
407+ // a) The monotonic time started running backward
408+ // b) The tacd has been running for more than 2**64ns (584 years).
409+ let ts: u64 = Instant :: now ( )
410+ . checked_duration_since ( thread. ref_instant )
411+ . and_then ( |d| d. as_nanos ( ) . try_into ( ) . ok ( ) )
412+ . unwrap_or ( TIMESTAMP_ERROR ) ;
413+
414+ thread. timestamp . store ( ts, Ordering :: Release ) ;
415+
416+ // Now that we know that the ADC actually works and we have
417+ // initial values: return a handle to it.
418+ if let Some ( ( content, tx) ) = signal_ready. take ( ) {
419+ // Can not fail in practice as the queue is only .take()n
420+ // once and thus known to be empty.
421+ tx. try_send ( Ok ( content) )
422+ . expect ( "Failed to signal ADC setup completion due to full queue" ) ;
423+ }
424+ }
425+
426+ Ok ( ( ) )
427+ } ) ?;
428+
429+ let thread = thread_res_rx. recv ( ) . await ??;
434430
435431 Ok ( thread)
436432 }
437433
438- pub async fn new_stm32 ( ) -> Result < Arc < Self > > {
434+ pub async fn new_stm32 ( wtb : & mut WatchedTasksBuilder ) -> Result < Arc < Self > > {
439435 Self :: new (
440- "stm32" ,
436+ wtb,
437+ "adc-stm32" ,
441438 "48003000.adc:adc@0" ,
442439 "tim4_trgo" ,
443440 80 ,
@@ -447,14 +444,23 @@ impl IioThread {
447444 . await
448445 }
449446
450- pub async fn new_powerboard ( ) -> Result < Arc < Self > > {
447+ pub async fn new_powerboard ( wtb : & mut WatchedTasksBuilder ) -> Result < Arc < Self > > {
451448 let hr_trigger_path = Path :: new ( TRIGGER_HR_PWR_DIR ) ;
452449
453450 if !hr_trigger_path. is_dir ( ) {
454451 create_dir ( hr_trigger_path) ?;
455452 }
456453
457- Self :: new ( "powerboard" , "lmp92064" , "tacd-pwr" , 20 , CHANNELS_PWR , 1 ) . await
454+ Self :: new (
455+ wtb,
456+ "adc-powerboard" ,
457+ "lmp92064" ,
458+ "tacd-pwr" ,
459+ 20 ,
460+ CHANNELS_PWR ,
461+ 1 ,
462+ )
463+ . await
458464 }
459465
460466 /// Use the channel names defined at the top of the file to get a reference
0 commit comments