From ba84eee2f89ef8b10af9108c4e304653e580ac63 Mon Sep 17 00:00:00 2001 From: Michael Atlan Date: Thu, 19 Feb 2026 10:08:09 +0100 Subject: [PATCH 1/2] Implement harmonic-domain metrics for waveform analysis Add harmonic-domain metrics including tauH, crest factor, and spectral entropy. --- .../arterial_waveform_shape_metrics.py | 376 +++++++++++------- 1 file changed, 236 insertions(+), 140 deletions(-) diff --git a/src/pipelines/arterial_waveform_shape_metrics.py b/src/pipelines/arterial_waveform_shape_metrics.py index 33f94f4..041f588 100644 --- a/src/pipelines/arterial_waveform_shape_metrics.py +++ b/src/pipelines/arterial_waveform_shape_metrics.py @@ -8,64 +8,14 @@ class ArterialSegExample(ProcessPipeline): """ Waveform-shape metrics on per-beat, per-branch, per-radius velocity waveforms. - Expected segment layout: - v_seg[t, beat, branch, radius] - i.e. v_seg shape: (n_t, n_beats, n_branches, n_radii) - - Outputs - ------- - A) Per-segment (flattened branch x radius): - by_segment/*_segment : shape (n_beats, n_segments) - n_segments = n_branches * n_radii - seg_idx = branch_idx * n_radii + radius_idx (branch-major) - - B) Aggregated: - by_segment/*_branch : shape (n_beats, n_branches) (median over radii) - by_segment/*_global : shape (n_beats,) (mean over all branches & radii) - - C) Independent global metrics (from global waveform path): - global/* : shape (n_beats,) - - Definitions (gain-invariant / shape metrics) - -------------------------------------------- - Rectification: - v <- max(v, 0) with NaNs preserved - - Basic: - tau_M1 = M1 / M0 - tau_M1_over_T = (M1/M0) / T - - RI = 1 - vmin/vmax (robust) - PI = (vmax - vmin) / mean(v) (robust) - - RVTI (paper) = D1 / (D2 + eps), split at 1/2 T (ratio_rvti = 0.5) - - New: - SF_VTI (systolic fraction) = D1_1/3 / (D1_1/3 + D2_2/3 + eps) - where D1_1/3 is integral over first 1/3 of samples, D2_2/3 over remaining 2/3 - - Normalized central moments (shape, not scale): - mu2_norm = mu2 / (M0 * T^2 + eps) (variance-like) - - - with central moments around t_bar = tau_M1: - mu2 = sum(v * (t - t_bar)^2) - - - Quantile timing (on cumulative integral): - C(t) = cumsum(v) / sum(v) - t10_over_T,t25_over_T, t50_over_T,t75_over_T, t90_over_T - - Spectral shape ratios (per beat): - Compute FFT power P(f) of v(t). Define harmonic index h = f * T (cycles/beat). - E_total = sum_{h>=0} P - E_low = sum_{h in [1..H_LOW]} P - E_high = sum_{h in [H_HIGH1..H_HIGH2]} P - Return E_low_over_E_total and E_high_over_E_total - - Default bands: - low: 1..3 harmonics - high: 4..8 harmonics + Adds harmonic-domain aggregated metrics: + - tauH (magnitude-only damping time constant proxy) + - crest_factor (from band-limited synthesis n=0..10) + - spectral_entropy (harmonic magnitude distribution entropy, n=1..10) + - phi1, phi2, phi3 (harmonic phases) + - Delta_phi1, Delta_phi2 as phase-coupling: + Delta_phi1 = wrap(phi2 - 2*phi1) (aka Δϕ2) + Delta_phi2 = wrap(phi3 - 3*phi1) (aka Δϕ3) """ description = "Waveform shape metrics (segment + aggregates + global), gain-invariant and robust." @@ -95,6 +45,9 @@ class ArterialSegExample(ProcessPipeline): H_HIGH_MIN = 4 H_HIGH_MAX = 8 + # Harmonic panel max for harmonic-domain metrics + H_MAX = 10 # use n=0..10 for synthesis; n=1..10 for distributions/aggregation + # ------------------------- # Helpers # ------------------------- @@ -132,6 +85,13 @@ def _ensure_time_by_beat(v2: np.ndarray, n_beats: int) -> np.ndarray: # Fallback: if ambiguous, assume (n_t, n_beats) return v2 + @staticmethod + def _wrap_pi(x: float) -> float: + """Wrap angle to [-pi, pi].""" + if not np.isfinite(x): + return np.nan + return float((x + np.pi) % (2.0 * np.pi) - np.pi) + def _quantile_time_over_T(self, v: np.ndarray, Tbeat: float, q: float) -> float: """ v: rectified 1D waveform (NaNs allowed) @@ -167,37 +127,19 @@ def _spectral_ratios(self, v: np.ndarray, Tbeat: float) -> tuple[float, float]: return np.nan, np.nan vv = np.where(np.isfinite(v), v, 0.0) - n = vv.size if n < 2: return np.nan, np.nan - # Remove DC? For "shape" we typically keep DC in total energy but exclude it from low/high - # Here: total includes all bins (including DC). Low/high exclude DC by construction (harmonics >= 1). fs = n / Tbeat # Hz X = np.fft.rfft(vv) P = np.abs(X) ** 2 f = np.fft.rfftfreq(n, d=1.0 / fs) # Hz - h = f * Tbeat # cycles per beat (harmonic index, continuous) - # vv_no_mean = vv - np.mean(vv) - # idx_fund = np.argmax(A[1:]) + 1 - # f1 = f[idx_fund] - # V1 = A[idx_fund] - # if V1 <= 0: - # return np.nan, np.nan, np.nan - # HRI_2_10 = float(np.nan) - """for k in range(2, 11): - target_freq = k * f1 - - # trouver le bin le plus proche - idx = np.argmin(np.abs(f - target_freq)) - - # éviter de sortir du spectre - if idx < len(A): - HRI_2_10 += A[idx] / V1""" + h = f * Tbeat # harmonic index (cycles/beat) + E_total = float(np.sum(P)) if not np.isfinite(E_total) or E_total <= 0: - return np.nan, np.nan, np.nan + return np.nan, np.nan low_mask = (h >= 1.0) & (h <= float(self.H_LOW_MAX)) high_mask = (h >= float(self.H_HIGH_MIN)) & (h <= float(self.H_HIGH_MAX)) @@ -207,6 +149,173 @@ def _spectral_ratios(self, v: np.ndarray, Tbeat: float) -> tuple[float, float]: return float(E_low / E_total), float(E_high / E_total) + def _harmonic_pack(self, v: np.ndarray, Tbeat: float) -> dict: + """ + Compute complex harmonic coefficients Vn for n=0..H, with H=min(H_MAX, n_rfft-1), + and synthesize band-limited waveform vb(t) using harmonics 0..H. + + Returns: + - V (complex array length H+1) [Fourier series-style coefficients] + - H (int) + - vb (float array length n) + - Vmax (float) = max_t vb(t) + - omega0 (float) = 2*pi/Tbeat + """ + if (not np.isfinite(Tbeat)) or Tbeat <= 0: + return {"V": None, "H": 0, "vb": None, "Vmax": np.nan, "omega0": np.nan} + + if v.size == 0 or not np.any(np.isfinite(v)): + return {"V": None, "H": 0, "vb": None, "Vmax": np.nan, "omega0": np.nan} + + vv = np.where(np.isfinite(v), v, 0.0) + n = vv.size + if n < 2: + return {"V": None, "H": 0, "vb": None, "Vmax": np.nan, "omega0": np.nan} + + # Coeffs scaled so that irfft(V*n) reconstructs the signal (Fourier-series-like) + Vfull = np.fft.rfft(vv) / float(n) + H = int(min(self.H_MAX, Vfull.size - 1)) + V = Vfull[: H + 1].copy() + + # Band-limited synthesis using harmonics 0..H + Vtrunc = np.zeros_like(Vfull) + Vtrunc[: H + 1] = V + vb = np.fft.irfft(Vtrunc * float(n), n=n) + + Vmax = float(np.nanmax(vb)) if vb.size else np.nan + omega0 = float(2.0 * np.pi / Tbeat) + + return {"V": V, "H": H, "vb": vb, "Vmax": Vmax, "omega0": omega0} + + def _tauH_from_harmonics(self, V: np.ndarray, Vmax: float, omega0: float) -> float: + """ + Magnitude-only damping proxy tauH: + Xn = Vn / Vmax + tau_|H|,n = (1/omega_n) * sqrt(1/|Xn|^2 - 1) for |Xn| in (0,1] + tauH = sum_{n=1..H} |Vn| * tau_n / sum_{n=1..H} |Vn| + """ + if V is None or (not np.isfinite(Vmax)) or Vmax <= 0 or (not np.isfinite(omega0)) or omega0 <= 0: + return np.nan + + H = int(V.size - 1) + if H < 1: + return np.nan + + weights = [] + taus = [] + + for n in range(1, H + 1): + Vn = V[n] + an = float(np.abs(Vn)) + if not np.isfinite(an) or an <= 0: + continue + + Xn = an / Vmax + if (not np.isfinite(Xn)) or Xn <= 0: + continue + + # mapping is only valid if |Xn| <= 1 + if Xn > 1.0 + 1e-9: + continue + + omega_n = float(n) * omega0 + # numeric safety + inside = (1.0 / (Xn * Xn + self.eps)) - 1.0 + if inside <= 0: + continue + + tau_n = (1.0 / omega_n) * float(np.sqrt(inside)) + if np.isfinite(tau_n) and tau_n > 0: + weights.append(an) + taus.append(tau_n) + + if len(weights) == 0: + return np.nan + + w = np.asarray(weights, dtype=float) + t = np.asarray(taus, dtype=float) + return float(np.sum(w * t) / (np.sum(w) + self.eps)) + + def _spectral_entropy_from_harmonics(self, V: np.ndarray) -> float: + """ + Spectral entropy of harmonic magnitude distribution over n=1..H: + p_n = |Vn| / sum_{k=1..H} |Vk| + Hspec = - sum p_n log(p_n) + """ + if V is None: + return np.nan + H = int(V.size - 1) + if H < 1: + return np.nan + + mags = np.abs(V[1:]) + mags = np.where(np.isfinite(mags), mags, 0.0) + s = float(np.sum(mags)) + if s <= 0: + return np.nan + + p = mags / s + p = np.clip(p, self.eps, 1.0) + return float(-np.sum(p * np.log(p))) + + def _crest_factor_from_vb(self, vb: np.ndarray) -> float: + """ + Crest factor on band-limited waveform vb: + CF = max(vb) / rms(vb) + """ + if vb is None or vb.size == 0: + return np.nan + vb = np.asarray(vb, dtype=float) + if not np.any(np.isfinite(vb)): + return np.nan + x = np.where(np.isfinite(vb), vb, 0.0) + rms = float(np.sqrt(np.mean(x * x))) + if rms <= 0: + return np.nan + return float(np.max(x) / rms) + + def _harmonic_phases(self, V: np.ndarray) -> dict: + """ + Return phi1,phi2,phi3 and Delta_phi1,Delta_phi2 (phase-coupling wrt fundamental). + Delta_phi1 = wrap(phi2 - 2*phi1) + Delta_phi2 = wrap(phi3 - 3*phi1) + """ + out = { + "phi1": np.nan, + "phi2": np.nan, + "phi3": np.nan, + "Delta_phi1": np.nan, + "Delta_phi2": np.nan, + } + if V is None: + return out + + H = int(V.size - 1) + if H < 1: + return out + + def phase_if_strong(Vn: complex) -> float: + if not np.isfinite(Vn.real) or not np.isfinite(Vn.imag): + return np.nan + if np.abs(Vn) <= self.eps: + return np.nan + return self._wrap_pi(float(np.angle(Vn))) + + phi1 = phase_if_strong(V[1]) if H >= 1 else np.nan + phi2 = phase_if_strong(V[2]) if H >= 2 else np.nan + phi3 = phase_if_strong(V[3]) if H >= 3 else np.nan + + out["phi1"] = phi1 + out["phi2"] = phi2 + out["phi3"] = phi3 + + if np.isfinite(phi1) and np.isfinite(phi2): + out["Delta_phi1"] = self._wrap_pi(phi2 - 2.0 * phi1) + if np.isfinite(phi1) and np.isfinite(phi3): + out["Delta_phi2"] = self._wrap_pi(phi3 - 3.0 * phi1) + + return out + def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: """ Canonical metric kernel: compute all waveform-shape metrics from a single 1D waveform v(t). @@ -217,13 +326,10 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: if n <= 0: return {k: np.nan for k in self._metric_keys()} - # If Tbeat invalid, many metrics become NaN if (not np.isfinite(Tbeat)) or Tbeat <= 0: return {k: np.nan for k in self._metric_keys()} - vv = np.where( - np.isfinite(v), v, np.nan - ) # vv = np.where(np.isfinite(v), v, 0.0) + vv = np.where(np.isfinite(v), v, np.nan) m0 = float(np.nansum(vv)) if m0 <= 0: return {k: np.nan for k in self._metric_keys()} @@ -238,8 +344,8 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: # RI / PI robust vmax = float(np.nanmax(vv)) - vmin = float(np.nanmin(v)) # vmin = float(np.min(vv)) - meanv = float(self._safe_nanmean(v)) # meanv = float(np.mean(vv)) + vmin = float(np.nanmin(v)) # preserve NaNs already + meanv = float(self._safe_nanmean(v)) if vmax <= 0: RI = np.nan @@ -254,7 +360,7 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: PI = (vmax - vmin) / meanv PI = float(PI) if np.isfinite(PI) else np.nan - # RVTI (paper, split 1/2) + # RVTI (split 1/2) k_rvti = int(np.ceil(n * self.ratio_rvti)) k_rvti = max(0, min(n, k_rvti)) D1_rvti = float(np.sum(vv[:k_rvti])) if k_rvti > 0 else np.nan @@ -268,23 +374,34 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: D2_sf = float(np.nansum(vv[k_sf:])) if k_sf < n else np.nan SF_VTI = D1_sf / (D1_sf + D2_sf + self.eps) - # Central moments around tau_M1 (t_bar) - # mu2 = sum(v*(t-tau)^2) + # Central moments around tau_M1 dtau = t - tau_M1 mu2 = float(np.nansum(vv * (dtau**2))) tau_M2 = np.sqrt(mu2 / m0 + self.eps) tau_M2_over_T = tau_M2 / Tbeat - # Quantile timing features (on cumulative integral) + # Quantile timing features t10_over_T = self._quantile_time_over_T(vv, Tbeat, 0.10) t25_over_T = self._quantile_time_over_T(vv, Tbeat, 0.25) t50_over_T = self._quantile_time_over_T(vv, Tbeat, 0.50) t75_over_T = self._quantile_time_over_T(vv, Tbeat, 0.75) t90_over_T = self._quantile_time_over_T(vv, Tbeat, 0.90) - # Spectral ratios + # Spectral ratios (FFT power bands) E_low_over_E_total, E_high_over_E_total = self._spectral_ratios(vv, Tbeat) + # Harmonic-domain extras (n=0..10 synthesis; n=1..10 metrics) + hp = self._harmonic_pack(np.where(np.isfinite(vv), vv, 0.0), Tbeat) + V = hp["V"] + vb = hp["vb"] + Vmax_bl = hp["Vmax"] + omega0 = hp["omega0"] + + tauH = self._tauH_from_harmonics(V, Vmax_bl, omega0) + crest_factor = self._crest_factor_from_vb(vb) + spectral_entropy = self._spectral_entropy_from_harmonics(V) + ph = self._harmonic_phases(V) + return { "tau_M1": float(tau_M1), "tau_M1_over_T": float(tau_M1_over_T), @@ -301,7 +418,16 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: "t90_over_T": float(t90_over_T), "E_low_over_E_total": float(E_low_over_E_total), "E_high_over_E_total": float(E_high_over_E_total), - # "HRI_2_10_total": float(HRI_2_10_total), + + # NEW harmonic-domain requested metrics + "tauH": float(tauH) if np.isfinite(tauH) else np.nan, + "crest_factor": float(crest_factor) if np.isfinite(crest_factor) else np.nan, + "spectral_entropy": float(spectral_entropy) if np.isfinite(spectral_entropy) else np.nan, + "phi1": float(ph["phi1"]) if np.isfinite(ph["phi1"]) else np.nan, + "phi2": float(ph["phi2"]) if np.isfinite(ph["phi2"]) else np.nan, + "phi3": float(ph["phi3"]) if np.isfinite(ph["phi3"]) else np.nan, + "Delta_phi1": float(ph["Delta_phi1"]) if np.isfinite(ph["Delta_phi1"]) else np.nan, + "Delta_phi2": float(ph["Delta_phi2"]) if np.isfinite(ph["Delta_phi2"]) else np.nan, } @staticmethod @@ -322,7 +448,16 @@ def _metric_keys() -> list[str]: "t90_over_T", "E_low_over_E_total", "E_high_over_E_total", - # "HRI_2_10_total", + + # NEW requested metrics + "tauH", + "crest_factor", + "spectral_entropy", + "phi1", + "phi2", + "phi3", + "Delta_phi1", + "Delta_phi2", ] def _compute_block_segment(self, v_block: np.ndarray, T: np.ndarray): @@ -355,11 +490,9 @@ def _compute_block_segment(self, v_block: np.ndarray, T: np.ndarray): for beat_idx in range(n_beats): Tbeat = float(T[0][beat_idx]) - # For global aggregate at this beat gl_vals = {k: [] for k in self._metric_keys()} for branch_idx in range(n_branches): - # For branch aggregate over radii br_vals = {k: [] for k in self._metric_keys()} for radius_idx in range(n_radii): @@ -372,13 +505,13 @@ def _compute_block_segment(self, v_block: np.ndarray, T: np.ndarray): br_vals[k].append(m[k]) gl_vals[k].append(m[k]) - # Branch aggregates: median over radii (nanmedian) + # Branch aggregates: median over radii for k in self._metric_keys(): br[k][beat_idx, branch_idx] = self._safe_nanmedian( np.asarray(br_vals[k], dtype=float) ) - # Global aggregates: mean over all branches & radii (nanmean) + # Global aggregates: mean over all branches & radii for k in self._metric_keys(): gl[k][beat_idx] = self._safe_nanmean( np.asarray(gl_vals[k], dtype=float) @@ -439,59 +572,20 @@ def run(self, h5file) -> ProcessResult: seg_note_b + " | WARNING: raw/band branch/radius dims differ." ) - # Helper to pack dict-of-arrays into HDF5 metric keys def pack(prefix: str, d: dict, attrs_common: dict): for k, arr in d.items(): metrics[f"{prefix}/{k}"] = with_attrs(arr, attrs_common) - # Per-segment outputs (compat dataset names) + # Per-segment outputs pack( "by_segment/bandlimited_segment", - { - "tau_M1": seg_b["tau_M1"], - "tau_M1_over_T": seg_b["tau_M1_over_T"], - "RI": seg_b["RI"], - "PI": seg_b["PI"], - "R_VTI": seg_b["R_VTI"], - "SF_VTI": seg_b["SF_VTI"], - "tau_M2_over_T": seg_b["tau_M2_over_T"], - "tau_M2": seg_b["tau_M2"], - "t10_over_T": seg_b["t10_over_T"], - "t25_over_T": seg_b["t25_over_T"], - "t50_over_T": seg_b["t50_over_T"], - "t75_over_T": seg_b["t75_over_T"], - "t90_over_T": seg_b["t90_over_T"], - "E_low_over_E_total": seg_b["E_low_over_E_total"], - "E_high_over_E_total": seg_b["E_high_over_E_total"], - # "HRI_2_10_total": seg_b["HRI_2_10_total"], - }, - { - "segment_indexing": [seg_note], - }, + seg_b, + {"segment_indexing": [seg_note]}, ) pack( "by_segment/raw_segment", - { - "tau_M1": seg_r["tau_M1"], - "tau_M1_over_T": seg_r["tau_M1_over_T"], - "RI": seg_r["RI"], - "PI": seg_r["PI"], - "R_VTI": seg_r["R_VTI"], - "SF_VTI": seg_r["SF_VTI"], - "tau_M2_over_T": seg_r["tau_M2_over_T"], - "tau_M2": seg_r["tau_M2"], - "t10_over_T": seg_r["t10_over_T"], - "t25_over_T": seg_r["t25_over_T"], - "t50_over_T": seg_r["t50_over_T"], - "t75_over_T": seg_r["t75_over_T"], - "t90_over_T": seg_r["t90_over_T"], - "E_low_over_E_total": seg_r["E_low_over_E_total"], - "E_high_over_E_total": seg_r["E_high_over_E_total"], - # "HRI_2_10_total": seg_r["HRI_2_10_total"], - }, - { - "segment_indexing": [seg_note], - }, + seg_r, + {"segment_indexing": [seg_note]}, ) # Branch aggregates (median over radii) @@ -535,6 +629,7 @@ def pack(prefix: str, d: dict, attrs_common: dict): metrics["by_segment/params/H_HIGH_MAX"] = np.asarray( self.H_HIGH_MAX, dtype=int ) + metrics["by_segment/params/H_MAX"] = np.asarray(self.H_MAX, dtype=int) # ------------------------- # Independent global metrics (raw + bandlimited) @@ -564,5 +659,6 @@ def pack(prefix: str, d: dict, attrs_common: dict): metrics["global/params/H_LOW_MAX"] = np.asarray(self.H_LOW_MAX, dtype=int) metrics["global/params/H_HIGH_MIN"] = np.asarray(self.H_HIGH_MIN, dtype=int) metrics["global/params/H_HIGH_MAX"] = np.asarray(self.H_HIGH_MAX, dtype=int) + metrics["global/params/H_MAX"] = np.asarray(self.H_MAX, dtype=int) return ProcessResult(metrics=metrics) From 947d7625d2c2a296c59c401df308bafdcedbbd79 Mon Sep 17 00:00:00 2001 From: gregoire Date: Thu, 19 Feb 2026 12:00:19 +0100 Subject: [PATCH 2/2] new harmonic metrics --- .../arterial_waveform_shape_metrics.py | 74 +++++++++++-------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/src/pipelines/arterial_waveform_shape_metrics.py b/src/pipelines/arterial_waveform_shape_metrics.py index 041f588..fc57c44 100644 --- a/src/pipelines/arterial_waveform_shape_metrics.py +++ b/src/pipelines/arterial_waveform_shape_metrics.py @@ -13,9 +13,9 @@ class ArterialSegExample(ProcessPipeline): - crest_factor (from band-limited synthesis n=0..10) - spectral_entropy (harmonic magnitude distribution entropy, n=1..10) - phi1, phi2, phi3 (harmonic phases) - - Delta_phi1, Delta_phi2 as phase-coupling: - Delta_phi1 = wrap(phi2 - 2*phi1) (aka Δϕ2) - Delta_phi2 = wrap(phi3 - 3*phi1) (aka Δϕ3) + - delta_phi2, delta_phi3 as phase-coupling: + delta_phi2 = wrap(phi2 - 2*phi1) (aka Δϕ2) + delta_phi3 = wrap(phi3 - 3*phi1) (aka Δϕ3) """ description = "Waveform shape metrics (segment + aggregates + global), gain-invariant and robust." @@ -103,7 +103,7 @@ def _quantile_time_over_T(self, v: np.ndarray, Tbeat: float, q: float) -> float: if v.size == 0 or not np.any(np.isfinite(v)): return np.nan - vv = np.where(np.isfinite(v), v, 0.0) + vv = np.where(np.isfinite(v), v, np.nan) m0 = float(np.sum(vv)) if m0 <= 0: return np.nan @@ -126,7 +126,7 @@ def _spectral_ratios(self, v: np.ndarray, Tbeat: float) -> tuple[float, float]: if v.size == 0 or not np.any(np.isfinite(v)): return np.nan, np.nan - vv = np.where(np.isfinite(v), v, 0.0) + vv = np.where(np.isfinite(v), v, np.nan) n = vv.size if n < 2: return np.nan, np.nan @@ -141,8 +141,10 @@ def _spectral_ratios(self, v: np.ndarray, Tbeat: float) -> tuple[float, float]: if not np.isfinite(E_total) or E_total <= 0: return np.nan, np.nan - low_mask = (h >= 1.0) & (h <= float(self.H_LOW_MAX)) - high_mask = (h >= float(self.H_HIGH_MIN)) & (h <= float(self.H_HIGH_MAX)) + low_mask = (h >= 0.9) & (h <= float(self.H_LOW_MAX) + 0.1) + high_mask = (h >= float(self.H_HIGH_MIN) - 0.1) & ( + h <= float(self.H_HIGH_MAX) + 0.1 + ) E_low = float(np.sum(P[low_mask])) E_high = float(np.sum(P[high_mask])) @@ -167,7 +169,7 @@ def _harmonic_pack(self, v: np.ndarray, Tbeat: float) -> dict: if v.size == 0 or not np.any(np.isfinite(v)): return {"V": None, "H": 0, "vb": None, "Vmax": np.nan, "omega0": np.nan} - vv = np.where(np.isfinite(v), v, 0.0) + vv = np.where(np.isfinite(v), v, np.nan) n = vv.size if n < 2: return {"V": None, "H": 0, "vb": None, "Vmax": np.nan, "omega0": np.nan} @@ -194,7 +196,13 @@ def _tauH_from_harmonics(self, V: np.ndarray, Vmax: float, omega0: float) -> flo tau_|H|,n = (1/omega_n) * sqrt(1/|Xn|^2 - 1) for |Xn| in (0,1] tauH = sum_{n=1..H} |Vn| * tau_n / sum_{n=1..H} |Vn| """ - if V is None or (not np.isfinite(Vmax)) or Vmax <= 0 or (not np.isfinite(omega0)) or omega0 <= 0: + if ( + V is None + or (not np.isfinite(Vmax)) + or Vmax <= 0 + or (not np.isfinite(omega0)) + or omega0 <= 0 + ): return np.nan H = int(V.size - 1) @@ -249,14 +257,14 @@ def _spectral_entropy_from_harmonics(self, V: np.ndarray) -> float: return np.nan mags = np.abs(V[1:]) - mags = np.where(np.isfinite(mags), mags, 0.0) - s = float(np.sum(mags)) + mags = np.where(np.isfinite(mags), mags, np.nan) + s = float(np.nansum(mags)) if s <= 0: return np.nan p = mags / s p = np.clip(p, self.eps, 1.0) - return float(-np.sum(p * np.log(p))) + return float(-np.nansum(p * np.log(p))) def _crest_factor_from_vb(self, vb: np.ndarray) -> float: """ @@ -268,24 +276,24 @@ def _crest_factor_from_vb(self, vb: np.ndarray) -> float: vb = np.asarray(vb, dtype=float) if not np.any(np.isfinite(vb)): return np.nan - x = np.where(np.isfinite(vb), vb, 0.0) - rms = float(np.sqrt(np.mean(x * x))) + x = np.where(np.isfinite(vb), vb, np.nan) + rms = float(np.sqrt(self._safe_nanmean(x * x))) if rms <= 0: return np.nan return float(np.max(x) / rms) def _harmonic_phases(self, V: np.ndarray) -> dict: """ - Return phi1,phi2,phi3 and Delta_phi1,Delta_phi2 (phase-coupling wrt fundamental). - Delta_phi1 = wrap(phi2 - 2*phi1) - Delta_phi2 = wrap(phi3 - 3*phi1) + Return phi1,phi2,phi3 and delta_phi2,delta_phi3 (phase-coupling wrt fundamental). + delta_phi2 = wrap(phi2 - 2*phi1) + delta_phi3 = wrap(phi3 - 3*phi1) """ out = { "phi1": np.nan, "phi2": np.nan, "phi3": np.nan, - "Delta_phi1": np.nan, - "Delta_phi2": np.nan, + "delta_phi2": np.nan, + "delta_phi3": np.nan, } if V is None: return out @@ -310,9 +318,9 @@ def phase_if_strong(Vn: complex) -> float: out["phi3"] = phi3 if np.isfinite(phi1) and np.isfinite(phi2): - out["Delta_phi1"] = self._wrap_pi(phi2 - 2.0 * phi1) + out["delta_phi2"] = self._wrap_pi(phi2 - 2.0 * phi1) if np.isfinite(phi1) and np.isfinite(phi3): - out["Delta_phi2"] = self._wrap_pi(phi3 - 3.0 * phi1) + out["delta_phi3"] = self._wrap_pi(phi3 - 3.0 * phi1) return out @@ -391,7 +399,7 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: E_low_over_E_total, E_high_over_E_total = self._spectral_ratios(vv, Tbeat) # Harmonic-domain extras (n=0..10 synthesis; n=1..10 metrics) - hp = self._harmonic_pack(np.where(np.isfinite(vv), vv, 0.0), Tbeat) + hp = self._harmonic_pack(np.where(np.isfinite(vv), vv, np.nan), Tbeat) V = hp["V"] vb = hp["vb"] Vmax_bl = hp["Vmax"] @@ -418,16 +426,23 @@ def _compute_metrics_1d(self, v: np.ndarray, Tbeat: float) -> dict: "t90_over_T": float(t90_over_T), "E_low_over_E_total": float(E_low_over_E_total), "E_high_over_E_total": float(E_high_over_E_total), - # NEW harmonic-domain requested metrics "tauH": float(tauH) if np.isfinite(tauH) else np.nan, - "crest_factor": float(crest_factor) if np.isfinite(crest_factor) else np.nan, - "spectral_entropy": float(spectral_entropy) if np.isfinite(spectral_entropy) else np.nan, + "crest_factor": float(crest_factor) + if np.isfinite(crest_factor) + else np.nan, + "spectral_entropy": float(spectral_entropy) + if np.isfinite(spectral_entropy) + else np.nan, "phi1": float(ph["phi1"]) if np.isfinite(ph["phi1"]) else np.nan, "phi2": float(ph["phi2"]) if np.isfinite(ph["phi2"]) else np.nan, "phi3": float(ph["phi3"]) if np.isfinite(ph["phi3"]) else np.nan, - "Delta_phi1": float(ph["Delta_phi1"]) if np.isfinite(ph["Delta_phi1"]) else np.nan, - "Delta_phi2": float(ph["Delta_phi2"]) if np.isfinite(ph["Delta_phi2"]) else np.nan, + "delta_phi2": float(ph["delta_phi2"]) + if np.isfinite(ph["delta_phi2"]) + else np.nan, + "delta_phi3": float(ph["delta_phi3"]) + if np.isfinite(ph["delta_phi3"]) + else np.nan, } @staticmethod @@ -448,7 +463,6 @@ def _metric_keys() -> list[str]: "t90_over_T", "E_low_over_E_total", "E_high_over_E_total", - # NEW requested metrics "tauH", "crest_factor", @@ -456,8 +470,8 @@ def _metric_keys() -> list[str]: "phi1", "phi2", "phi3", - "Delta_phi1", - "Delta_phi2", + "delta_phi2", + "delta_phi3", ] def _compute_block_segment(self, v_block: np.ndarray, T: np.ndarray):