diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..44045cd --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,26 @@ +# AGENTS.md - Hub for fsdr-blocks + +Building blocks for FutureSDR signal processing library for SDR and real-time DSP. + +## Tech Stack +- **Language:** Rust (Edition 2024, **Nightly channel**) +- **Core Library:** [FutureSDR](https://www.futuresdr.org) +- **Acceleration:** Explicit SIMD via `std::simd` (portable SIMD) +- **Serialization:** Serde, custom PMT (Polymorphic Types) +- **Testing:** Cargo test, QuickCheck, Criterion (benchmarks) + +## Critical Commands +- **Install:** `cargo build` +- **Lint:** `./check.sh` (Runs fmt, clippy, and tests with all features) +- **Test:** `cargo test --all-features` +- **Bench:** `cargo bench --all-features` + +## Documentation Index +- [Architecture](agent_docs/architecture.md): **Trigger:** Designing new blocks or understanding flowgraph connectivity. +- [Conventions](agent_docs/conventions.md): **Trigger:** Before writing any code to ensure alignment with Rust 2024 and FutureSDR idioms. +- [SDR & DSP](agent_docs/sdr_dsp.md): **Trigger:** Modifying signal processing logic, gain control, or frequency shifts. +- [SigMF](agent_docs/sigmf.md): **Trigger:** Working with Signal Metadata Format (SigMF) recordings or collections. + +## Verification Loop +You MUST run `./check.sh` and ensure all tests pass before declaring a task "done." +Always verify that your changes didn't break conditional feature flags (`crossbeam`, `async-channel`, `cw`). diff --git a/Cargo.toml b/Cargo.toml index 22ded0a..150f514 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ serde_json = "1" [features] default = [] +simd = [] crossbeam = ["dep:crossbeam-channel"] async-channel = ["dep:async-channel"] cw = ["dep:bimap"] @@ -70,3 +71,18 @@ name = "shared" path = "benches/cw/shared.rs" harness = false required-features = ["cw"] + +[[bench]] +name = "deinterleave" +path = "benches/stream/deinterleave.rs" +harness = false + +[[bench]] +name = "type_converters" +path = "benches/type_converters.rs" +harness = false + +[[bench]] +name = "freq_shift" +path = "benches/math/freq_shift.rs" +harness = false diff --git a/SIMD.md b/SIMD.md new file mode 100644 index 0000000..70cf1d4 --- /dev/null +++ b/SIMD.md @@ -0,0 +1,41 @@ +# SIMD Optimization Roadmap - fsdr-blocks + +This document outlines the prioritized blocks for SIMD refactoring based on their impact in typical SDR pipelines (e.g., `csdr` style flows). + +## Prioritization Matrix + +| Priority | Block | File | Rationale | +| :--- | :--- | :--- | :--- | +| **High** | `convert_u8_f` | `src/type_converters.rs` | Entry point for raw samples; high throughput, simple math. | +| **High** | `FreqShift` | `src/math/freq_shift.rs` | Per-sample complex rotation; solves loop-carried dependencies. | +| **Medium** | `FmDemod` | TBD | Quadrature math (cross-products) is "heavy" and benefits from vectorization. | +| **Medium** | `BinarySlicer` | TBD | Comparisons are perfect for SIMD masks and bit-packing. | +| **Low** | `Deinterleave` | `src/stream/deinterleave.rs` | **COMPLETED.** Memory-bound but established the pattern. | + +## Implementation Pattern: SIMD + Specialization + +To maintain compatibility and maximize performance, we use the following pattern: + +1. **Nightly Features:** Enable `#![feature(portable_simd)]` and `#![feature(specialization)]`. +2. **Specialization Trait:** Define a `*Supported` trait (e.g., `TypeConvertSupported`). +3. **Default Logic:** Provide a `default` scalar implementation for all types. +4. **SIMD Specialization:** Implement specialized SIMD versions for `f32`, `u8`, `i16`, etc. +5. **Generic Helpers:** Use internal `_scalar_logic` and `_simd_logic` functions to minimize duplication. +6. **Macros:** Employ `macro_rules!` to apply the same SIMD logic across multiple types (e.g., `u8`, `i8`). + +## Completed Optimizations + +### 1. Deinterleave (`src/stream/deinterleave.rs`) +- **Status:** Done. +- **Impact:** ~6% gain on `f32` (memory-bound). +- **Pattern:** `DeinterleaveSupported` trait with macro-based SIMD implementations. + +### 2. TypeConverter (`src/type_converters.rs`) +- **Status:** Done. +- **Impact:** ~305% gain (3.05x speedup) on `u8` -> `f32` scaled conversion. +- **Pattern:** `TypeConvertSupported` trait with specialized SIMD path for `u8` -> `f32`. + +### 3. FreqShift (`src/math/freq_shift.rs`) +- **Status:** Done. +- **Impact:** ~4.4x speedup on `Complex32` rotation compared to naive scalar loop. +- **Pattern:** `FreqShiftSupported` trait with SIMD complex multiplication and periodic NCO re-sync for precision. diff --git a/agent_docs/architecture.md b/agent_docs/architecture.md new file mode 100644 index 0000000..c7177c0 --- /dev/null +++ b/agent_docs/architecture.md @@ -0,0 +1,20 @@ +# Architecture - fsdr-blocks + +This project follows the **Flowgraph/Block** paradigm provided by FutureSDR. + +## Core Concepts +- **Blocks:** The fundamental units of processing. Defined using `#[derive(Block)]`. +- **Flowgraph:** A directed acyclic graph (usually) where blocks are nodes and streams/messages are edges. +- **Kernels:** The execution logic of a block. Most blocks in this repository are CPU-based and implement the `Kernel` trait. + +## Data Movement +- **Streams:** High-throughput data (e.g., IQ samples) passed via `CpuBufferReader`/`CpuBufferWriter`. +- **Messages:** Asynchronous control signals or metadata passed as `Pmt` (Polymorphic Types). + +## Dependency on FutureSDR +This project is tightly coupled with `futuresdr`. It uses a local path dependency in `Cargo.toml` by default (`../FutureSDR`), which indicates it is often developed alongside the core library. + +## Block Types +- **Sources/Sinks:** Handle I/O (e.g., `StdinSink`, `SigmfSource`). +- **Processing Blocks:** Transform data (e.g., `Agc`, `FreqShift`). +- **Adapters:** Connect different async runtimes or channel types (e.g., `CrossbeamSink`). diff --git a/agent_docs/conventions.md b/agent_docs/conventions.md new file mode 100644 index 0000000..304d4e8 --- /dev/null +++ b/agent_docs/conventions.md @@ -0,0 +1,32 @@ +# Conventions - fsdr-blocks + +## Rust Standards +- **Edition:** 2024. Use modern idioms (e.g., `async fn` in traits, let-else). +- **Formatting:** Strictly adhere to `cargo fmt`. + +## Block Implementation Boilerplate +Every block should typically include: +1. **Struct Definition:** Use `#[derive(Block)]` and specify `#[message_inputs(...)]` if applicable. +2. **Implementation:** A `new` method and message handler methods (returning `Result`). +3. **Kernel Trait:** Implement `async fn work` to handle the data processing loop. +4. **Builder Pattern:** Use a `BlockBuilder` struct for complex configuration (see `src/agc.rs`). + +## Performance Acceleration +For blocks in hot loops (AGC, Frequency Shift, Deinterleave), prefer explicit SIMD over compiler-dependent autovectorization: +1. **Feature Flags:** Ensure `portable_simd` and `specialization` are enabled in `src/lib.rs`. +2. **Specialization Pattern:** Define a `*Supported` trait (e.g., `DeinterleaveSupported`) with a `default` scalar implementation and specialized SIMD implementations for `f32`, `u8`, `i8`, `i16`. +3. **Macro Reuse:** Use macros to implement SIMD logic across different types to avoid code duplication. +4. **Benchmarking:** Every accelerated block MUST have a corresponding `Criterion` benchmark in `benches/`. + +## Error Handling +- Use `futuresdr::anyhow::Result` for block operations. +- Prefer `Context` from `anyhow` for descriptive error messages in I/O operations. + +## Feature Management +- Use `#[cfg(feature = "...")]` for blocks that depend on optional crates like `crossbeam-channel` or `async-channel`. +- Always test with `--all-features` to ensure no regressions in optional components. + +## Testing +- **Unit Tests:** Located in `tests/`. +- **Property-Based Testing:** Use `quickcheck` for robust validation of DSP algorithms. +- **Benchmarks:** Located in `benches/`, using `Criterion`. diff --git a/agent_docs/sdr_dsp.md b/agent_docs/sdr_dsp.md new file mode 100644 index 0000000..05a1599 --- /dev/null +++ b/agent_docs/sdr_dsp.md @@ -0,0 +1,16 @@ +# SDR & DSP - fsdr-blocks + +## Signal Processing Idioms +- **Complex Numbers:** Use `num_complex::Complex32` (or generic `T: ComplexFloat`). +- **Buffers:** Always use `input.slice()` and `output.slice()` in the `work` function. +- **Consumption/Production:** Explicitly call `input.consume(n)` and `output.produce(n)` after processing. + +## Key Blocks +- **AGC (Automatic Gain Control):** Implements a feedback loop to maintain target power. See `src/agc.rs`. +- **FreqShift:** Performs digital down-conversion/up-conversion. See `src/math/freq_shift.rs`. +- **Type Converters:** Crucial for translating between raw bytes and SDR-specific types. + +## Math Operations +- **Prefer Explicit SIMD:** Use `std::simd` and specialization (see `DeinterleaveSupported`) for performance-critical blocks in hot loops. This avoids dependency on brittle compiler autovectorization. +- **Precision:** Be mindful of floating-point precision and squelch thresholds. +- **Error Accumulation:** Periodically re-calculate phase in recurrence relations to prevent drift (e.g., in `FreqShift`). diff --git a/agent_docs/sigmf.md b/agent_docs/sigmf.md new file mode 100644 index 0000000..71f9d4c --- /dev/null +++ b/agent_docs/sigmf.md @@ -0,0 +1,16 @@ +# SigMF - fsdr-blocks + +## Overview +The `sigmf` crate (in `crates/sigmf`) provides a Rust implementation of the [Signal Metadata Format](https://github.com/sigmf/SigMF). + +## Structure +- **Global:** Top-level metadata about the recording. +- **Captures:** Segment-specific metadata (sample rate, frequency). +- **Annotations:** Time/frequency-indexed labels. + +## Usage in fsdr-blocks +- `SigmfSource`: Reads `.sigmf-meta` and `.sigmf-data` files into a FutureSDR flowgraph. +- `SigmfSink`: Records flowgraph data into SigMF-compliant files. + +## Extensions +Supports SigMF extensions (e.g., `AntennaExtension`). New extensions should be added as modules in `crates/sigmf/src/`. diff --git a/benches/math/freq_shift.rs b/benches/math/freq_shift.rs new file mode 100644 index 0000000..0bf20f2 --- /dev/null +++ b/benches/math/freq_shift.rs @@ -0,0 +1,31 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use fsdr_blocks::math::FrequencyShifter; +use futuresdr::num_complex::Complex32; +use futuresdr::runtime::mocker::{Mocker, Reader, Writer}; +use rand::RngExt; + +pub fn freq_shift_c32(c: &mut Criterion) { + let n_samp = 8192; + let mut rng = rand::rng(); + let input: Vec = (0..n_samp) + .map(|_| Complex32::new(rng.random(), rng.random())) + .collect(); + + let mut group = c.benchmark_group("math"); + group.throughput(Throughput::Elements(n_samp as u64)); + + group.bench_function("freq_shift_c32", |b| { + b.iter(|| { + let block: FrequencyShifter, Writer> = + FrequencyShifter::new(2000.0, 48000.0); + let mut mocker = Mocker::new(block); + mocker.input().set(input.clone()); + mocker.run(); + }); + }); + + group.finish(); +} + +criterion_group!(benches, freq_shift_c32); +criterion_main!(benches); diff --git a/benches/stream/deinterleave.rs b/benches/stream/deinterleave.rs new file mode 100644 index 0000000..b2e12b9 --- /dev/null +++ b/benches/stream/deinterleave.rs @@ -0,0 +1,28 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use fsdr_blocks::stream::Deinterleave; +use futuresdr::runtime::mocker::{Mocker, Reader, Writer}; +use rand::RngExt; + +pub fn deinterleave_f32(c: &mut Criterion) { + let n_samp = 8192; + let mut rng = rand::rng(); + let input: Vec = (0..n_samp).map(|_| rng.random()).collect(); + + let mut group = c.benchmark_group("deinterleave"); + group.throughput(Throughput::Elements(n_samp as u64)); + + group.bench_function("deinterleave_f32", |b| { + b.iter(|| { + let block: Deinterleave, Writer, Writer> = + Deinterleave::new(); + let mut mocker = Mocker::new(block); + mocker.input().set(input.clone()); + mocker.run(); + }); + }); + + group.finish(); +} + +criterion_group!(benches, deinterleave_f32); +criterion_main!(benches); diff --git a/benches/type_converters.rs b/benches/type_converters.rs new file mode 100644 index 0000000..cfd214f --- /dev/null +++ b/benches/type_converters.rs @@ -0,0 +1,27 @@ +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use fsdr_blocks::type_converters::TypeConverter; +use futuresdr::runtime::mocker::{Mocker, Reader, Writer}; +use rand::RngExt; + +pub fn scale_convert_u8_f32(c: &mut Criterion) { + let n_samp = 8192; + let mut rng = rand::rng(); + let input: Vec = (0..n_samp).map(|_| rng.random()).collect(); + + let mut group = c.benchmark_group("type_converters"); + group.throughput(Throughput::Elements(n_samp as u64)); + + group.bench_function("scale_convert_u8_f32", |b| { + b.iter(|| { + let block: TypeConverter, Writer> = TypeConverter::new(true); + let mut mocker = Mocker::new(block); + mocker.input().set(input.clone()); + mocker.run(); + }); + }); + + group.finish(); +} + +criterion_group!(benches, scale_convert_u8_f32); +criterion_main!(benches); diff --git a/src/lib.rs b/src/lib.rs index 5f08f41..7505912 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ -//! This library acts as a toolbox on top of [FutureSDR][`futuresdr`] to easily build your own flowgraph. +#![cfg_attr(feature = "simd", feature(portable_simd))] +#![cfg_attr(feature = "simd", feature(min_specialization))] +//! This library acts as a toolbox on top of [`futuresdr`] to easily build your own flowgraph. //! It is made by the community for the community. // #![feature(async_fn_in_trait)] diff --git a/src/math/freq_shift.rs b/src/math/freq_shift.rs index ea31957..616ff5c 100644 --- a/src/math/freq_shift.rs +++ b/src/math/freq_shift.rs @@ -3,6 +3,9 @@ use futuresdr::blocks::signal_source::NCO; use futuresdr::num_complex::Complex32; use futuresdr::prelude::*; +#[cfg(feature = "simd")] +use core::simd::prelude::*; + /// This blocks shift the signal in the frequency domain based on the [`NCO`] implementation. /// Currently implemented only for float and [`Complex32`] /// @@ -48,50 +51,113 @@ where } } -#[doc(hidden)] -impl Kernel for FrequencyShifter -where - I: CpuBufferReader, - O: CpuBufferWriter, -{ - async fn work( - &mut self, - io: &mut WorkIo, - _mio: &mut MessageOutputs, - _meta: &mut BlockMeta, - ) -> Result<()> { - let m = { - let i = self.input.slice(); - let o = self.output.slice(); +pub trait FreqShiftSupported: Copy { + fn freq_shift(nco: &mut NCO, phase_inc: FixedPointPhase, input: &[Self], output: &mut [Self]); +} - let m = std::cmp::min(i.len(), o.len()); - if m > 0 { - for (v, r) in i[..m].iter().zip(o[..m].iter_mut()) { - *r = (*v) * self.nco.phase.cos(); - self.nco.step(); +impl FreqShiftSupported for f32 { + fn freq_shift(nco: &mut NCO, _phase_inc: FixedPointPhase, input: &[Self], output: &mut [Self]) { + let n = input.len().min(output.len()); + for (v, r) in input[..n].iter().zip(output[..n].iter_mut()) { + *r = (*v) * nco.phase.cos(); + nco.step(); + } + } +} + +impl FreqShiftSupported for Complex32 { + fn freq_shift(nco: &mut NCO, phase_inc: FixedPointPhase, input: &[Self], output: &mut [Self]) { + let n = input.len().min(output.len()); + if n == 0 { + return; + } + + #[cfg(feature = "simd")] + { + const LANES: usize = 8; + let n_simd = n / LANES; + + if n_simd > 0 { + let block_rotation_angle = f32::from(&phase_inc) * LANES as f32; + let v_br_re = f32x8::splat(block_rotation_angle.cos()); + let v_br_im = f32x8::splat(block_rotation_angle.sin()); + + let mut block_phasor_cos = f32x8::splat(0.0); + let mut block_phasor_sin = f32x8::splat(0.0); + + let i_f32 = + unsafe { core::slice::from_raw_parts(input.as_ptr() as *const f32, n * 2) }; + let o_f32 = unsafe { + core::slice::from_raw_parts_mut(output.as_mut_ptr() as *mut f32, n * 2) + }; + + for i in 0..n_simd { + if i % 128 == 0 { + let mut temp_nco = *nco; + let mut cos_arr = [0.0f32; LANES]; + let mut sin_arr = [0.0f32; LANES]; + for j in 0..LANES { + cos_arr[j] = temp_nco.phase.cos(); + sin_arr[j] = temp_nco.phase.sin(); + temp_nco.step(); + } + block_phasor_cos = f32x8::from_array(cos_arr); + block_phasor_sin = f32x8::from_array(sin_arr); + } + + let v0 = f32x8::from_slice(&i_f32[i * LANES * 2..i * LANES * 2 + 8]); + let v1 = f32x8::from_slice(&i_f32[i * LANES * 2 + 8..i * LANES * 2 + 16]); + let (v_re, v_im) = v0.deinterleave(v1); + + let res_re = v_re * block_phasor_cos - v_im * block_phasor_sin; + let res_im = v_re * block_phasor_sin + v_im * block_phasor_cos; + + let (o0, o1) = res_re.interleave(res_im); + o0.copy_to_slice(&mut o_f32[i * LANES * 2..i * LANES * 2 + 8]); + o1.copy_to_slice(&mut o_f32[i * LANES * 2 + 8..i * LANES * 2 + 16]); + + let next_cos = block_phasor_cos * v_br_re - block_phasor_sin * v_br_im; + let next_sin = block_phasor_cos * v_br_im + block_phasor_sin * v_br_re; + block_phasor_cos = next_cos; + block_phasor_sin = next_sin; + nco.steps(LANES as i32); } } - m - }; - if m > 0 { - self.input.consume(m); - self.output.produce(m); + let tail_start = n_simd * LANES; + if tail_start < n { + let rotation = Complex32::new(phase_inc.cos(), phase_inc.sin()); + let mut current_phasor = Complex32::new(nco.phase.cos(), nco.phase.sin()); + for (v, r) in input[tail_start..n] + .iter() + .zip(output[tail_start..n].iter_mut()) + { + *r = (*v) * current_phasor; + current_phasor *= rotation; + } + nco.steps((n - tail_start) as i32); + } } - if self.input.finished() && self.input.slice().is_empty() { - io.finished = true; + #[cfg(not(feature = "simd"))] + { + let rotation = Complex32::new(phase_inc.cos(), phase_inc.sin()); + let mut current_phasor = Complex32::new(nco.phase.cos(), nco.phase.sin()); + for (v, r) in input[..n].iter().zip(output[..n].iter_mut()) { + *r = (*v) * current_phasor; + current_phasor *= rotation; + } + nco.steps(n as i32); } - - Ok(()) } } #[doc(hidden)] -impl Kernel for FrequencyShifter +impl Kernel for FrequencyShifter where - I: CpuBufferReader, - O: CpuBufferWriter, + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy + FreqShiftSupported, + I: CpuBufferReader, + O: CpuBufferWriter, { async fn work( &mut self, @@ -105,13 +171,7 @@ where let m = std::cmp::min(i.len(), o.len()); if m > 0 { - let rotation = Complex32::new(self.phase_inc.cos(), self.phase_inc.sin()); - let mut current_phasor = Complex32::new(self.nco.phase.cos(), self.nco.phase.sin()); - for (v, r) in i[..m].iter().zip(o[..m].iter_mut()) { - *r = (*v) * current_phasor; - current_phasor *= rotation; - } - self.nco.steps(m as i32); + A::freq_shift(&mut self.nco, self.phase_inc, &i[..m], &mut o[..m]); } m }; diff --git a/src/sigmf/mod.rs b/src/sigmf/mod.rs index 5482982..3580d3d 100644 --- a/src/sigmf/mod.rs +++ b/src/sigmf/mod.rs @@ -5,7 +5,7 @@ mod sigmf_sink; pub use sigmf::*; pub use sigmf_sink::{SigMFSink, SigMFSinkBuilder}; -use crate::type_converters::ScaledConverterBuilder; +use crate::type_converters::TypeConvertSupported; pub trait BytesConveter where @@ -26,44 +26,26 @@ impl BytesConveter for DatasetFormat { Rf32Be => f32::from_be_bytes(bytes[0..4].try_into().unwrap()), // Cf32Le => write!(f, "cf32_le"), // Cf32Be => write!(f, "cf32_be"), - Ri32Le => ScaledConverterBuilder::::convert(&i32::from_le_bytes( - bytes[0..4].try_into().unwrap(), - )), - Ri32Be => ScaledConverterBuilder::::convert(&i32::from_be_bytes( - bytes[0..4].try_into().unwrap(), - )), + Ri32Le => i32::convert_item(true, &i32::from_le_bytes(bytes[0..4].try_into().unwrap())), + Ri32Be => i32::convert_item(true, &i32::from_be_bytes(bytes[0..4].try_into().unwrap())), // Ci32Le => write!(f, "ci32_le"), // Ci32Be => write!(f, "ci32_be"), - Ri16Le => ScaledConverterBuilder::::convert(&i16::from_le_bytes( - bytes[0..2].try_into().unwrap(), - )), - Ri16Be => ScaledConverterBuilder::::convert(&i16::from_be_bytes( - bytes[0..2].try_into().unwrap(), - )), + Ri16Le => i16::convert_item(true, &i16::from_le_bytes(bytes[0..2].try_into().unwrap())), + Ri16Be => i16::convert_item(true, &i16::from_be_bytes(bytes[0..2].try_into().unwrap())), // Ci16Le => write!(f, "ci16_le"), // Ci16Be => write!(f, "ci16_be"), - Ru32Le => ScaledConverterBuilder::::convert(&u32::from_le_bytes( - bytes[0..4].try_into().unwrap(), - )), - Ru32Be => ScaledConverterBuilder::::convert(&u32::from_be_bytes( - bytes[0..4].try_into().unwrap(), - )), + Ru32Le => u32::convert_item(true, &u32::from_le_bytes(bytes[0..4].try_into().unwrap())), + Ru32Be => u32::convert_item(true, &u32::from_be_bytes(bytes[0..4].try_into().unwrap())), // Cu32Le => write!(f, "cu32_le"), // Cu32Be => write!(f, "cu32_be"), - Ru16Le => ScaledConverterBuilder::::convert(&u16::from_le_bytes( - bytes[0..2].try_into().unwrap(), - )), - Ru16Be => ScaledConverterBuilder::::convert(&u16::from_be_bytes( - bytes[0..2].try_into().unwrap(), - )), + Ru16Le => u16::convert_item(true, &u16::from_le_bytes(bytes[0..2].try_into().unwrap())), + Ru16Be => u16::convert_item(true, &u16::from_be_bytes(bytes[0..2].try_into().unwrap())), // Cu16Le => write!(f, "cu16_le"), // Cu16Be => write!(f, "cu16_be"), // CI8 => write!(f, "ci8"), // CU8 => write!(f, "cu8"), - RI8 => ScaledConverterBuilder::::convert(&i8::from_ne_bytes( - bytes[0..1].try_into().unwrap(), - )), - RU8 => ScaledConverterBuilder::::convert(&(bytes[0])), + RI8 => i8::convert_item(true, &i8::from_ne_bytes(bytes[0..1].try_into().unwrap())), + RU8 => u8::convert_item(true, &(bytes[0])), _ => todo!("not yet implemented"), } } diff --git a/src/stream/deinterleave.rs b/src/stream/deinterleave.rs index 389c3d6..567028f 100644 --- a/src/stream/deinterleave.rs +++ b/src/stream/deinterleave.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "simd")] +use core::simd::Simd; use futuresdr::prelude::*; /// This blocks deinterleave a unique stream into two separate stream. @@ -41,6 +43,130 @@ where } } +pub trait DeinterleaveSupported: Copy { + fn deinterleave( + first: &mut bool, + input: &[Self], + out0: &mut [Self], + out1: &mut [Self], + ) -> (usize, usize, usize); +} + +fn deinterleave_scalar_logic( + first: &mut bool, + input: &[A], + out0: &mut [A], + out1: &mut [A], +) -> (usize, usize, usize) { + let mut n_in = input.len(); + let n_o0 = out0.len(); + let n_o1 = out1.len(); + + let mut i_ptr = 0; + let mut o0_ptr = 0; + let mut o1_ptr = 0; + + if !*first && n_in > 0 && n_o1 > 0 { + out1[o1_ptr] = input[i_ptr]; + i_ptr += 1; + o1_ptr += 1; + n_in -= 1; + *first = true; + } + + let n = (n_in / 2).min(n_o0 - o0_ptr).min(n_o1 - o1_ptr); + for j in 0..n { + out0[o0_ptr + j] = input[i_ptr + 2 * j]; + out1[o1_ptr + j] = input[i_ptr + 2 * j + 1]; + } + + i_ptr += 2 * n; + o0_ptr += n; + o1_ptr += n; + n_in -= 2 * n; + + if *first && n_in > 0 && out0.len() > o0_ptr { + out0[o0_ptr] = input[i_ptr]; + i_ptr += 1; + o0_ptr += 1; + *first = false; + } + (i_ptr, o0_ptr, o1_ptr) +} + +#[cfg(feature = "simd")] +impl DeinterleaveSupported for A { + default fn deinterleave( + first: &mut bool, + input: &[Self], + out0: &mut [Self], + out1: &mut [Self], + ) -> (usize, usize, usize) { + deinterleave_scalar_logic(first, input, out0, out1) + } +} + +#[cfg(not(feature = "simd"))] +impl DeinterleaveSupported for A { + fn deinterleave( + first: &mut bool, + input: &[Self], + out0: &mut [Self], + out1: &mut [Self], + ) -> (usize, usize, usize) { + deinterleave_scalar_logic(first, input, out0, out1) + } +} + +#[cfg(feature = "simd")] +macro_rules! impl_deinterleave_simd { + ($($t:ty),*) => { + $( + impl DeinterleaveSupported for $t { + fn deinterleave(first: &mut bool, input: &[Self], out0: &mut [Self], out1: &mut [Self]) -> (usize, usize, usize) { + let mut n_in = input.len(); + let n_o0 = out0.len(); + let n_o1 = out1.len(); + + let mut i_ptr = 0; + let mut o0_ptr = 0; + let mut o1_ptr = 0; + + if !*first && n_in > 0 && n_o1 > 0 { + out1[o1_ptr] = input[i_ptr]; + i_ptr += 1; + o1_ptr += 1; + n_in -= 1; + *first = true; + } + + const LANES: usize = 8; + let n_simd = (n_in / (2 * LANES)).min((n_o0 - o0_ptr) / LANES).min((n_o1 - o1_ptr) / LANES); + + for _ in 0..n_simd { + let v0 = Simd::<$t, LANES>::from_slice(&input[i_ptr..i_ptr + LANES]); + let v1 = Simd::<$t, LANES>::from_slice(&input[i_ptr + LANES..i_ptr + 2 * LANES]); + + let (even, odd) = v0.deinterleave(v1); + even.copy_to_slice(&mut out0[o0_ptr..o0_ptr + LANES]); + odd.copy_to_slice(&mut out1[o1_ptr..o1_ptr + LANES]); + + i_ptr += 2 * LANES; + o0_ptr += LANES; + o1_ptr += LANES; + } + + let (i_rem, o0_rem, o1_rem) = deinterleave_scalar_logic(first, &input[i_ptr..], &mut out0[o0_ptr..], &mut out1[o1_ptr..]); + (i_ptr + i_rem, o0_ptr + o0_rem, o1_ptr + o1_rem) + } + } + )* + }; +} + +#[cfg(feature = "simd")] +impl_deinterleave_simd!(f32, u8, i8, i16); + impl Default for Deinterleave where A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, @@ -56,7 +182,7 @@ where #[doc(hidden)] impl Kernel for Deinterleave where - A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy + DeinterleaveSupported, I: CpuBufferReader, O0: CpuBufferWriter, O1: CpuBufferWriter, @@ -68,35 +194,11 @@ where _meta: &mut BlockMeta, ) -> Result<()> { let (m, m0, m1) = { - let i0 = self.input.slice(); + let i = self.input.slice(); let o0 = self.out0.slice(); let o1 = self.out1.slice(); - let mut m0 = 0; - let mut m1 = 0; - - let mut it0 = o0.iter_mut(); - let mut it1 = o1.iter_mut(); - - for x in i0.iter() { - if self.first { - if let Some(d) = it0.next() { - *d = *x; - m0 += 1; - } else { - break; - } - } else { - if let Some(d) = it1.next() { - *d = *x; - m1 += 1; - } else { - break; - } - } - self.first = !self.first; - } - (m0 + m1, m0, m1) + A::deinterleave(&mut self.first, i, o0, o1) }; self.input.consume(m); diff --git a/src/type_converters.rs b/src/type_converters.rs index bdb08d6..e831e15 100644 --- a/src/type_converters.rs +++ b/src/type_converters.rs @@ -18,19 +18,11 @@ //! # use fsdr_blocks::type_converters::TypeConvertersBuilder; //! let blk = TypeConvertersBuilder::convert::().build(); //! ``` -//! -//! Some other conversions are lossy because there is no natural conversion of all possible inputs. -//! Conversion of `f32` into `i16` is an example because `16.3` has no direct conversion, yet `16` is a good candidate. -//! But `f32` can also represent positive or negative infinity, and NaN (not a number) that are not convertible. -//! -//! ``` -//! # use fsdr_blocks::type_converters::TypeConvertersBuilder; -//! let blk = TypeConvertersBuilder::lossy_scale_convert_f32_i16().build(); -//! ``` use core::marker::PhantomData; - -use futuresdr::blocks::Apply; +#[cfg(feature = "simd")] +use core::simd::prelude::*; +use futuresdr::prelude::*; /// Main builder for type conversion blocks pub struct TypeConvertersBuilder {} @@ -38,11 +30,7 @@ pub struct TypeConvertersBuilder {} pub struct ConverterBuilder { marker_input: PhantomData, marker_output: PhantomData, -} - -pub struct ScaledConverterBuilder { - marker_input: PhantomData, - marker_output: PhantomData, + scaled: bool, } impl TypeConvertersBuilder { @@ -55,140 +43,327 @@ impl TypeConvertersBuilder { ConverterBuilder:: { marker_input: PhantomData, marker_output: PhantomData, + scaled: false, } } /// Full range conversion /// for example u8 [0..255] will be converted into f32 as [-1.0..1.0] - pub fn scale_convert() -> ScaledConverterBuilder - where - A: Copy + Send, - B: Copy + Send + From, - { - ScaledConverterBuilder:: { + pub fn scale_convert() -> ConverterBuilder { + ConverterBuilder:: { marker_input: PhantomData, marker_output: PhantomData, + scaled: true, } } - pub fn lossy_scale_convert_f32_u8() -> ScaledConverterBuilder { - ScaledConverterBuilder:: { + pub fn lossy_scale_convert_f32_u8() -> ConverterBuilder { + ConverterBuilder:: { marker_input: PhantomData, marker_output: PhantomData, + scaled: true, } } - pub fn lossy_scale_convert_f32_i8() -> ScaledConverterBuilder { - ScaledConverterBuilder:: { + pub fn lossy_scale_convert_f32_i8() -> ConverterBuilder { + ConverterBuilder:: { marker_input: PhantomData, marker_output: PhantomData, + scaled: true, } } - pub fn lossy_scale_convert_f32_i16() -> ScaledConverterBuilder { - ScaledConverterBuilder:: { + pub fn lossy_scale_convert_f32_i16() -> ConverterBuilder { + ConverterBuilder:: { marker_input: PhantomData, marker_output: PhantomData, + scaled: true, } } } -impl ConverterBuilder +#[derive(Block)] +pub struct TypeConverter< + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + B: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + I: CpuBufferReader = DefaultCpuReader, + O: CpuBufferWriter = DefaultCpuWriter, +> { + #[input] + input: I, + #[output] + output: O, + scaled: bool, +} + +impl TypeConverter where - A: Copy + Send + Sync + Default + std::fmt::Debug + 'static, - B: Copy + Send + Sync + Default + std::fmt::Debug + From + 'static, + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + B: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + I: CpuBufferReader, + O: CpuBufferWriter, { - pub fn build(self) -> Apply B + Send + 'static, A, B> { - Apply::new(|i: &A| -> B { (*i).into() }) + pub fn new(scaled: bool) -> Self { + Self { + input: I::default(), + output: O::default(), + scaled, + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, u8, f32> { - Apply::new(|i: &u8| -> f32 { ScaledConverterBuilder::::convert(i) }) - } +pub trait TypeConvertSupported: Copy { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [B]); + fn convert_item(scaled: bool, item: &Self) -> B; +} - pub fn convert(i: &u8) -> f32 { - (*i as f32) / ((u8::MAX as f32) / 2.0) - 1.0 - } +macro_rules! impl_type_convert_plain { + ($($a:ty => $b:ty),*) => { + $( + impl TypeConvertSupported<$b> for $a { + fn convert_slice(_scaled: bool, input: &[Self], output: &mut [$b]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = <$b>::from(input[i]); + } + } + + fn convert_item(_scaled: bool, item: &Self) -> $b { + <$b>::from(*item) + } + } + )* + }; } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, u16, f32> { - Apply::new(|i: &u16| -> f32 { ScaledConverterBuilder::::convert(i) }) - } +impl_type_convert_plain!( + u8 => u16, u8 => u32, u8 => u64, + u16 => u32, u16 => u64, + u32 => u64, + i8 => i16, i8 => i32, i8 => i64, + i16 => i32, i16 => i64, + i32 => i64, + f32 => f64, + u8 => f64, + u16 => f64, + u32 => f64, + i8 => f64, + i16 => f64, + i32 => f64 +); - pub fn convert(i: &u16) -> f32 { - (*i as f32) / ((u16::MAX as f32) / 2.0) - 1.0 +// Special case for f32 which don't have From +impl TypeConvertSupported for u32 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } + } + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) / ((u32::MAX as f32) / 2.0) - 1.0 + } else { + *item as f32 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, u32, f32> { - Apply::new(|i: &u32| -> f32 { ScaledConverterBuilder::::convert(i) }) +impl TypeConvertSupported for i32 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } } - - pub fn convert(i: &u32) -> f32 { - (*i as f32) / ((u32::MAX as f32) / 2.0) - 1.0 + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) / ((i32::MAX as f32) / 2.0) - 1.0 + } else { + *item as f32 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, i8, f32> { - Apply::new(|i: &i8| -> f32 { ScaledConverterBuilder::::convert(i) }) - } +// Types supported by scaled convert to f32 +impl TypeConvertSupported for u8 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + if scaled { + #[cfg(feature = "simd")] + { + const LANES: usize = 8; + let n_simd = n / LANES; + let offset = f32x8::splat(1.0); + let scale = f32x8::splat(2.0 / 255.0); - pub fn convert(i: &i8) -> f32 { - (*i as f32) / ((i8::MAX as f32) / 2.0) - 1.0 + for i in 0..n_simd { + let v = u8x8::from_slice(&input[i * LANES..(i + 1) * LANES]); + let v_f32 = v.cast::(); + let res = v_f32 * scale - offset; + res.copy_to_slice(&mut output[i * LANES..(i + 1) * LANES]); + } + + for i in (n_simd * LANES)..n { + output[i] = Self::convert_item(true, &input[i]); + } + } + #[cfg(not(feature = "simd"))] + { + for i in 0..n { + output[i] = Self::convert_item(true, &input[i]); + } + } + } else { + for i in 0..n { + output[i] = input[i] as f32; + } + } + } + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) * (2.0 / 255.0) - 1.0 + } else { + *item as f32 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, i16, f32> { - Apply::new(|i: &i16| -> f32 { ScaledConverterBuilder::::convert(i) }) +impl TypeConvertSupported for u16 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } } - - pub fn convert(i: &i16) -> f32 { - (*i as f32) / ((i16::MAX as f32) / 2.0) - 1.0 + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) / ((u16::MAX as f32) / 2.0) - 1.0 + } else { + *item as f32 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply f32 + Send + 'static, i32, f32> { - Apply::new(|i: &i32| -> f32 { ScaledConverterBuilder::::convert(i) }) +impl TypeConvertSupported for i8 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } } - - pub fn convert(i: &i32) -> f32 { - (*i as f32) / ((i32::MAX as f32) / 2.0) - 1.0 + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) / ((i8::MAX as f32) / 2.0) - 1.0 + } else { + *item as f32 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply u8 + Send + 'static, f32, u8> { - Apply::new(|i: &f32| -> u8 { ScaledConverterBuilder::::convert(i) }) +impl TypeConvertSupported for i16 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [f32]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } } + fn convert_item(scaled: bool, item: &Self) -> f32 { + if scaled { + (*item as f32) / ((i16::MAX as f32) / 2.0) - 1.0 + } else { + *item as f32 + } + } +} - pub fn convert(i: &f32) -> u8 { - (*i * (u8::MAX as f32) * 0.5 + 128.0) as u8 +// f32 to integer (scaled) +impl TypeConvertSupported for f32 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [u8]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } + } + fn convert_item(scaled: bool, item: &Self) -> u8 { + if scaled { + (*item * (u8::MAX as f32) * 0.5 + 128.0) as u8 + } else { + *item as u8 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply i8 + Send + 'static, f32, i8> { - Apply::new(|i: &f32| -> i8 { ScaledConverterBuilder::::convert(i) }) +impl TypeConvertSupported for f32 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [i8]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } } + fn convert_item(scaled: bool, item: &Self) -> i8 { + if scaled { + (*item * (i8::MAX as f32)) as i8 + } else { + *item as i8 + } + } +} - pub fn convert(i: &f32) -> i8 { - (*i * (i8::MAX as f32)) as i8 +impl TypeConvertSupported for f32 { + fn convert_slice(scaled: bool, input: &[Self], output: &mut [i16]) { + let n = input.len().min(output.len()); + for i in 0..n { + output[i] = Self::convert_item(scaled, &input[i]); + } + } + fn convert_item(scaled: bool, item: &Self) -> i16 { + if scaled { + (*item * (i16::MAX as f32)) as i16 + } else { + *item as i16 + } } } -impl ScaledConverterBuilder { - pub fn build(self) -> Apply i16 + Send + 'static, f32, i16> { - Apply::new(|i: &f32| -> i16 { ScaledConverterBuilder::::convert(i) }) +#[doc(hidden)] +impl Kernel for TypeConverter +where + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy + TypeConvertSupported, + B: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, + I: CpuBufferReader, + O: CpuBufferWriter, +{ + async fn work( + &mut self, + io: &mut WorkIo, + _mio: &mut MessageOutputs, + _meta: &mut BlockMeta, + ) -> Result<()> { + let i = self.input.slice(); + let o = self.output.slice(); + + let n = i.len().min(o.len()); + if n > 0 { + A::convert_slice(self.scaled, &i[..n], &mut o[..n]); + self.input.consume(n); + self.output.produce(n); + } + + if self.input.finished() && self.input.slice().is_empty() { + io.finished = true; + } + + Ok(()) } +} - pub fn convert(i: &f32) -> i16 { - (*i * (i16::MAX as f32)) as i16 +impl ConverterBuilder +where + A: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy + TypeConvertSupported, + B: Send + Sync + Default + Clone + std::fmt::Debug + 'static + Copy, +{ + pub fn build(self) -> TypeConverter { + TypeConverter::new(self.scaled) } } diff --git a/tests/stream/deinterleave.rs b/tests/stream/deinterleave.rs index d60f284..d50ca79 100644 --- a/tests/stream/deinterleave.rs +++ b/tests/stream/deinterleave.rs @@ -12,7 +12,7 @@ fn deinterleave_u8() -> Result<()> { let deinterleaver = Deinterleave::::new(); - let orig: Vec = vec![0, 1, 0, 1, 0, 1, 0, 1, 0, 1]; + let orig: Vec = (0..100).map(|i| (i % 2) as u8).collect(); let src = VectorSource::::new(orig.clone()); let vect_sink_0 = VectorSink::::new(1024); let vect_sink_1 = VectorSink::::new(1024); @@ -30,10 +30,42 @@ fn deinterleave_u8() -> Result<()> { let snk_1 = vect_sink_1.get()?; let snk_1 = snk_1.items(); - assert_eq!(snk_0.len(), orig.len() / 2); - assert_eq!(snk_0.len(), snk_1.len()); + assert_eq!(snk_0.len(), 50); + assert_eq!(snk_1.len(), 50); assert!(snk_0.iter().all(|v| *v == 0)); assert!(snk_1.iter().all(|v| *v == 1)); Ok(()) } + +#[test] +fn deinterleave_odd_f32() -> Result<()> { + let mut fg = Flowgraph::new(); + + let deinterleaver = Deinterleave::::new(); + + let orig: Vec = vec![0.0, 1.0, 2.0, 3.0, 4.0]; + let src = VectorSource::::new(orig.clone()); + let vect_sink_0 = VectorSink::::new(1024); + let vect_sink_1 = VectorSink::::new(1024); + + connect!(fg, + src > deinterleaver; + deinterleaver.out0 > vect_sink_0; + deinterleaver.out1 > vect_sink_1; + ); + Runtime::new().run(fg)?; + + let snk_0 = vect_sink_0.get()?; + let snk_0 = snk_0.items(); + + let snk_1 = vect_sink_1.get()?; + let snk_1 = snk_1.items(); + + assert_eq!(snk_0.len(), 3); + assert_eq!(snk_1.len(), 2); + assert_eq!(snk_0, &[0.0, 2.0, 4.0]); + assert_eq!(snk_1, &[1.0, 3.0]); + + Ok(()) +} diff --git a/tests/type_converters.rs b/tests/type_converters.rs index ba1cdf5..1f35167 100644 --- a/tests/type_converters.rs +++ b/tests/type_converters.rs @@ -32,6 +32,35 @@ fn convert_u8_f32() -> Result<()> { Ok(()) } +#[test] +fn scale_convert_u8_f32() -> Result<()> { + let mut fg = Flowgraph::new(); + + let convert_u8_f32 = TypeConvertersBuilder::scale_convert::().build(); + + let orig: Vec = vec![0, 127, 128, 255]; + let src = VectorSource::::new(orig.clone()); + let vect_sink = VectorSink::::new(1024); + + connect!(fg, + src > convert_u8_f32 > vect_sink; + ); + Runtime::new().run(fg)?; + + let snk = vect_sink.get()?; + let v = snk.items(); + + assert_eq!(v.len(), orig.len()); + // 0 -> -1.0 + // 127.5 -> 0.0 (but u8 is integer, so 127 or 128) + // 255 -> 1.0 + + assert!((v[0] - (-1.0)).abs() < 1e-6); + assert!((v[3] - 1.0).abs() < 1e-6); + + Ok(()) +} + // #[test] // fn convert_u8_f32_with_scale_3() -> Result<()> { // const SCALE_FACTOR: f32 = 3.0;