diff --git a/CHANGELOG.md b/CHANGELOG.md index ed1a1777e..c59c6802d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Added + +- Support for PTP devices as sources (subject to the "ptp" compile flag, enabled by default). See docs/guide/ptp.md for details. + ## [1.6.1] - 2025-07-16 ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 6b23f553a..e7b9784f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -598,6 +598,7 @@ dependencies = [ "libc", "ntp-proto", "pps-time", + "ptp-time", "rand", "rustls", "serde", @@ -690,6 +691,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "ptp-time" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86c9caac3b5aca362c2c897a5ae1c86222c703c8e9881dd7407c343e3adacb92" +dependencies = [ + "libc", +] + [[package]] name = "quote" version = "1.0.40" diff --git a/Cargo.toml b/Cargo.toml index 92bb7cd7a..0fa6723ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ toml = { version = ">=0.6.0,<0.9.0", default-features = false, features = ["pars timestamped-socket = "0.2.2" clock-steering = "0.2.1" pps-time = "0.2.3" +ptp-time = "0.1.0" # TLS rustls23 = { package = "rustls", version = "0.23.16", features = ["logging", "std"] } diff --git a/docs/development/code-structure.md b/docs/development/code-structure.md index 041d73cb7..64cb2dc94 100644 --- a/docs/development/code-structure.md +++ b/docs/development/code-structure.md @@ -14,8 +14,9 @@ ntpd-rs is split into several crates with three goals in mind: straightforward to verify. The main `ntp-proto` and `ntpd` crates are set up such that neither contains any -unsafe code. Unsafe code is limited to the (external) `timestamped-socket`, `clock-steering` and `pps-time` crates, -which are purposefully kept small and only offer a safe API. +unsafe code. Unsafe code is limited to the (external) `timestamped-socket`, +`clock-steering`, `pps-time`, and `ptp-time` crates, which are purposefully kept +small and only offer a safe API. ### ntp-proto @@ -47,6 +48,11 @@ clock. Touching the system clock uses `libc` and is inherently unsafe. The [`pps-time` crate](https://github.com/pendulum-project/pps-time) wraps the system calls needed to interact with a PPS device. To interact with PPS devices ioctl system calls are used, which uses `libc` and is inherently unsafe. +### ptp-time + +The [`ptp-time` crate](https://github.com/paulgear/ptp-time) wraps the system calls needed to interact with a PTP device. To +interact with PTP devices ioctl system calls are used, which uses `libc` and is inherently unsafe. + ### ntpd The `ntpd` crate contains the code for all three end-user binaries that our @@ -140,4 +146,3 @@ an example of how to interact with this socket. Because this task reads from its socket, it is advised to restrict the permissions on this socket. - diff --git a/docs/development/prompts/ptp/README.md b/docs/development/prompts/ptp/README.md new file mode 100644 index 000000000..33c72defa --- /dev/null +++ b/docs/development/prompts/ptp/README.md @@ -0,0 +1 @@ +This directory contains some of the prompts which were used in the AI-assisted development of the PTP driver. They may be deleted when no longer considered useful. diff --git a/docs/development/prompts/ptp/driver-analysis-combined.md b/docs/development/prompts/ptp/driver-analysis-combined.md new file mode 100644 index 000000000..348b21455 --- /dev/null +++ b/docs/development/prompts/ptp/driver-analysis-combined.md @@ -0,0 +1,442 @@ +# NTP Source Driver Analysis Report + +## Executive Summary + +The ntpd-rs codebase implements a modular architecture for NTP time sources through a spawner/source pattern. There are two main categories of drivers: + +1. **Network-based sources** (Standard, Pool, NTS) - Event-driven with async I/O +2. **Hardware/local sources** (Socket, PPS) - Event-driven with blocking I/O wrapped in async tasks + +All sources follow a consistent event-driven architecture using Tokio's async runtime, with no traditional polling mechanisms. However, the system provides guidance for implementing polled drivers when needed. + +## Architecture Overview + +### Core Components + +- **Spawners** (`ntpd/src/daemon/spawn/`): Responsible for creating and managing source instances +- **Sources** (`ntpd/src/daemon/`): Handle actual time measurement and communication +- **System** (`ntpd/src/daemon/system.rs`): Coordinates all sources and manages the time synchronization algorithm + +### Communication Pattern + +``` +Spawner -> SpawnEvent -> System -> SourceCreateParameters -> Source Task + | +System <- MsgForSystem <- Source Task (measurements/status) +``` + +## Source Driver Analysis + +### 1. Standard NTP Source (`standard.rs`) + +**Type**: Network-based, single server + +**Architecture**: Event-driven with timer-based polling + +**Location**: `ntpd/src/daemon/spawn/standard.rs` (spawner), `ntpd/src/daemon/ntp_source.rs` (source) + +**Key Characteristics**: +- Creates a single NTP source for a given server address +- Handles DNS resolution with retry on failure +- Re-resolves DNS on unreachable errors +- Uses UDP sockets with timestamping support + +**Event Mechanism**: +- `tokio::select!` on timer, socket receive, and system updates +- Timer-driven polling intervals determined by NTP algorithm +- Socket events trigger packet processing +- System events update source configuration + +**Interface with System**: +- Spawner implements `Spawner` trait +- Source communicates via `MsgForSystem` enum +- Receives system updates via broadcast channel + +### 2. Pool NTP Source (`pool.rs`) + +**Type**: Network-based, multiple servers from pool + +**Architecture**: Event-driven with timer-based polling + +**Location**: `ntpd/src/daemon/spawn/pool.rs` (spawner), uses same `ntp_source.rs` + +**Key Characteristics**: +- Manages multiple sources from a single pool address +- DNS resolution returns multiple IPs +- Maintains desired count of active sources +- Supports ignore list for problematic servers + +**Event Mechanism**: +- Same as Standard source for individual connections +- Spawner manages lifecycle of multiple source instances +- Automatic replacement of failed sources + +**Interface with System**: +- Spawner creates multiple `NtpSourceCreateParameters` +- Each spawned source operates independently +- Pool logic handled entirely in spawner + +### 3. NTS (Network Time Security) Source (`nts.rs`) + +**Type**: Network-based, encrypted NTP + +**Architecture**: Event-driven with timer-based polling + +**Location**: `ntpd/src/daemon/spawn/nts.rs` (spawner), uses same `ntp_source.rs` + +**Key Characteristics**: +- Performs TLS key exchange before NTP communication +- Single source per NTS server +- Timeout handling for key exchange (5 seconds) +- Certificate validation support + +**Event Mechanism**: +- Key exchange phase: async TLS connection with timeout +- NTP phase: same as Standard source +- Failure triggers re-key exchange + +**Interface with System**: +- Spawner handles key exchange complexity +- Passes NTS data to source via `SourceNtsData` +- Source handles encrypted NTP packets + +### 4. Socket Source (`sock.rs`) + +**Type**: Local hardware/GPS via Unix socket + +**Architecture**: Event-driven (async wrapper around blocking I/O) + +**Location**: `ntpd/src/daemon/spawn/sock.rs` (spawner), `ntpd/src/daemon/sock_source.rs` (source) + +**Key Characteristics**: +- Receives time data from GPSd via Unix domain socket +- Processes binary time samples (40-byte format) +- One-way time source (no polling) +- Creates and manages Unix socket lifecycle + +**Event Mechanism**: +- `tokio::select!` on socket receive and system updates +- Socket events trigger sample processing +- No outbound polling - purely reactive + +**Interface with System**: +- Uses `OneWaySourceUpdate` instead of `NtpSourceUpdate` +- Communicates via `MsgForSystem::OneWaySourceUpdate` +- No bidirectional communication + +### 5. PPS (Pulse Per Second) Source (`pps.rs`) + +**Type**: Hardware timing source + +**Architecture**: Event-driven with blocking I/O in separate thread + +**Location**: `ntpd/src/daemon/spawn/pps.rs` (spawner), `ntpd/src/daemon/pps_source.rs` (source) + +**Key Characteristics**: +- Interfaces with PPS hardware devices +- Blocking I/O handled in separate thread +- High precision timing source +- Requires PPS_CANWAIT capability + +**Event Mechanism**: +- Blocking `fetch_blocking()` in dedicated thread +- Thread communicates via mpsc channel +- Main task uses `tokio::select!` on channel and system updates + +**Interface with System**: +- Similar to Socket source - one-way updates +- Uses `OneWaySourceUpdate` pattern +- No polling - event-driven by hardware pulses + +## Notable Differences Between Drivers + +### 1. Communication Patterns + +**Network Sources** (Standard, Pool, NTS): +- Bidirectional communication (send/receive) +- Timer-based polling intervals +- Socket-based I/O with timestamping +- Use `NtpSourceUpdate` for measurements + +**Local Sources** (Socket, PPS): +- Unidirectional (receive only) +- No polling - purely event-driven +- Use `OneWaySourceUpdate` for measurements +- Different measurement delay types (NtpDuration vs ()) + +### 2. Error Handling + +**Network Sources**: +- Network error detection and recovery +- DNS resolution retry logic +- Connection state management +- Unreachable/demobilize states + +**Local Sources**: +- Hardware/file system error handling +- No network-related error recovery +- Simpler state management + +### 3. Threading Models + +**Network Sources**: +- Single async task per source +- All I/O is async + +**Socket Source**: +- Single async task +- Unix socket I/O is async + +**PPS Source**: +- Dual-task model: blocking thread + async coordinator +- Required due to blocking PPS API + +### 4. Lifecycle Management + +**Standard/NTS**: +- Single source per spawner +- Restart on failure + +**Pool**: +- Multiple sources per spawner +- Dynamic source replacement + +**Socket/PPS**: +- Single source per spawner +- Restart on failure + +## System Integration + +### Spawner Interface + +All spawners implement the `Spawner` trait: + +```rust +trait Spawner { + async fn try_spawn(&mut self, action_tx: &mpsc::Sender) -> Result<(), Self::Error>; + fn is_complete(&self) -> bool; + async fn handle_source_removed(&mut self, event: SourceRemovedEvent) -> Result<(), Self::Error>; + async fn handle_registered(&mut self, event: SourceCreateParameters) -> Result<(), Self::Error>; + fn get_id(&self) -> SpawnerId; + fn get_addr_description(&self) -> String; + fn get_description(&self) -> &str; +} +``` + +### Source Communication + +Sources communicate with the system via: + +```rust +enum MsgForSystem { + MustDemobilize(SourceId), + NetworkIssue(SourceId), + Unreachable(SourceId), + SourceUpdate(SourceId, NtpSourceUpdate), + OneWaySourceUpdate(SourceId, OneWaySourceUpdate), +} +``` + +### Event Loop Integration + +All sources use `tokio::select!` for event multiplexing: +- Timer events (network sources only) +- I/O events (socket/channel receive) +- System update events (configuration changes) + +## Driver Comparison Table + +| Driver Type | Architecture | Communication | Polling | Measurement Type | Threading Model | +|-------------|--------------|---------------|---------|------------------|-----------------| +| Standard NTP | Event-driven with polling | Two-way (request/response) | Timer-based | `NtpDuration` | Single async task | +| Pool NTP | Event-driven with polling | Two-way (request/response) | Timer-based | `NtpDuration` | Single async task | +| NTS | Event-driven with polling | Two-way (request/response) | Timer-based | `NtpDuration` | Single async task | +| Socket | Event-driven (async wrapper) | One-way (receive only) | None | `()` | Single async task | +| PPS | Event-driven (blocking thread) | One-way (receive only) | None | `()` | Dual-task model | + +## Guidelines for Implementing a New Polled NTP Source Driver + +### 1. Architecture Decision + +**Event-Driven Approach (Recommended)**: +- Follow existing patterns using `tokio::select!` +- Implement timer-based polling if needed +- Use async I/O where possible + +**Key Considerations**: +- All existing sources are event-driven, not traditionally polled +- The system expects async operation +- Blocking I/O requires separate thread (see PPS example) + +### 2. Implementation Steps + +#### Step 1: Create Spawner + +```rust +pub struct MySourceSpawner { + config: MySourceConfig, + source_config: SourceConfig, + id: SpawnerId, + has_spawned: bool, +} + +impl Spawner for MySourceSpawner { + type Error = MySpawnError; + + async fn try_spawn(&mut self, action_tx: &mpsc::Sender) -> Result<(), Self::Error> { + // Create source parameters + // Send SpawnEvent::Create + // Set has_spawned = true + } + + fn is_complete(&self) -> bool { + self.has_spawned + } + + async fn handle_source_removed(&mut self, event: SourceRemovedEvent) -> Result<(), Self::Error> { + // Reset has_spawned if not demobilized + // Handle any cleanup + } + + // Implement other required methods +} +``` + +#### Step 2: Create Source Task + +**For Network-like Sources**: +```rust +pub struct MySourceTask { + index: SourceId, + clock: C, + channels: SourceChannels, + source: NtpSource, // or OneWaySource + // Your specific fields +} + +impl MySourceTask { + async fn run(&mut self) { + loop { + tokio::select! { + // Timer event (if needed) + () = &mut self.poll_timer => { + // Handle polling + } + // I/O event + result = self.receive_data() => { + // Process received data + } + // System updates + result = self.channels.system_update_receiver.recv() => { + // Handle system updates + } + } + } + } +} +``` + +**For Hardware/Local Sources**: +```rust +// Similar to Socket/PPS pattern +// Use OneWaySource instead of NtpSource +// Send OneWaySourceUpdate instead of SourceUpdate +``` + +#### Step 3: Integrate with System + +1. Add config type to `NtpSourceConfig` enum +2. Add spawner creation in `system.rs` +3. Add source creation parameters if needed +4. Update configuration parsing + +### 3. Key Design Patterns + +#### Timer Management +```rust +// For polled sources, use tokio timer +let mut poll_timer = tokio::time::interval(poll_interval); + +tokio::select! { + _ = poll_timer.tick() => { + // Perform poll operation + } + // Other events... +} +``` + +#### Error Handling +```rust +// Network errors should trigger restart +self.channels.msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await.ok(); + +// Unreachable should trigger address re-resolution +self.channels.msg_for_system_sender + .send(MsgForSystem::Unreachable(self.index)) + .await.ok(); +``` + +#### Measurement Reporting +```rust +// For two-way sources +let update = NtpSourceUpdate { /* ... */ }; +self.channels.msg_for_system_sender + .send(MsgForSystem::SourceUpdate(self.index, update)) + .await.ok(); + +// For one-way sources +let update = OneWaySourceUpdate { /* ... */ }; +self.channels.msg_for_system_sender + .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) + .await.ok(); +``` + +### 4. Testing Considerations + +- Follow existing test patterns in each source module +- Test spawner lifecycle (create, remove, restart) +- Test source communication with system +- Mock external dependencies +- Test error conditions and recovery + +### 5. Configuration Integration + +Add to configuration types: +```rust +// In config module +#[derive(Debug, Clone)] +pub struct MySourceConfig { + // Your configuration fields +} + +// In NtpSourceConfig enum +pub enum NtpSourceConfig { + // Existing variants... + MySource(MySourceConfig), +} +``` + +### 6. Common Pitfalls to Avoid + +1. **Don't use traditional polling loops** - use event-driven patterns +2. **Handle all error cases** - network, I/O, parsing errors +3. **Implement proper cleanup** - remove from snapshots on exit +4. **Use appropriate update types** - NtpSourceUpdate vs OneWaySourceUpdate +5. **Follow async patterns** - don't block the runtime +6. **Test spawner state management** - handle restart scenarios + +### 7. Performance Considerations + +- Use efficient I/O patterns (async where possible) +- Minimize allocations in hot paths +- Consider batching if processing many measurements +- Use appropriate buffer sizes +- Profile memory usage and async task overhead + +## Conclusion + +The ntpd-rs source driver architecture is consistently event-driven across all source types. New drivers should follow this pattern rather than implementing traditional polling loops. The modular spawner/source design provides good separation of concerns and makes it straightforward to add new source types while maintaining consistency with the existing codebase. + +The key to successful implementation is understanding the async event-driven patterns used throughout the codebase and following the established communication protocols between spawners, sources, and the system coordinator. diff --git a/docs/development/prompts/ptp/prd-ptp-driver.md b/docs/development/prompts/ptp/prd-ptp-driver.md new file mode 100644 index 000000000..0636dc57f --- /dev/null +++ b/docs/development/prompts/ptp/prd-ptp-driver.md @@ -0,0 +1,80 @@ +# Product Requirements Document: PTP Driver for ntpd-rs + +## Introduction/Overview +This document specifies the requirements for implementing a new PTP (Precision Time Protocol) source driver for ntpd-rs. The driver will interface with Linux PTP Hardware Clocks (PHC) using the ptp-time crate, providing high-precision time measurements similar to the existing PPS driver but with timer-based polling instead of device-triggered events. + +## Goals +1. Enable ntpd-rs to utilize Linux PTP Hardware Clocks for high-precision time synchronization +2. Follow the established driver patterns in ntpd-rs while adapting to PTP-specific requirements +3. Provide configurable polling intervals for optimal performance across different use cases +4. Maintain consistency with existing source driver architecture and error handling patterns + +## User Stories +1. **As a system administrator**, I want to configure ntpd-rs to use a PTP hardware clock so that I can achieve sub-microsecond time synchronization accuracy. +2. **As a system administrator**, I want to configure the polling interval for PTP sources so that I can balance precision and system resource usage. +3. **As a system administrator**, I want the PTP driver to handle hardware errors gracefully so that my NTP service remains stable even when PTP devices are temporarily unavailable. + +## Functional Requirements +1. **PTP Device Interface** + - The driver must interface with Linux PTP Hardware Clocks via `/dev/ptp*` device files + - Must use the ptp-time crate for safe access to PTP ioctls + - Should support standard PTP device capabilities detection + +2. **Timer-Based Polling Architecture** + - Implement timer-based polling instead of device-triggered events (unlike PPS driver) + - Use configurable polling intervals with default bounds of 0.5s (2^-1) to 64s (2^6) + - Follow the dual-task threading model pattern established by the PPS driver + +3. **Source Communication Pattern** + - Implement one-way communication (receive-only) like the PPS driver + - Use `OneWaySourceUpdate` for measurements instead of `NtpSourceUpdate` + - Provide high-precision time measurements with appropriate timestamping + +4. **Configuration Support** + - Accept device path configuration (e.g., `/dev/ptp0`) + - Support configurable polling interval settings + - Include precision estimation based on PTP capabilities + +5. **Error Handling** + - Graceful handling of PTP device unavailability + - Proper error reporting to system coordinator + - Recovery mechanisms for transient hardware issues + +6. **Timestamp Capability Detection** + - Auto-detect available timestamping capabilities on driver initialization + - Prefer precise timestamps when available + - Fall back to extended timestamps if precise timestamps are not available + - Use standard timestamps only as last resort + - Do not attempt capability detection on every poll + +## Non-Goals (Out of Scope) +1. Support for non-Linux PTP implementations or non-Hardware Clock sources +2. Full NTP client/server protocol implementation - only time measurement capabilities +3. Complex PTP network synchronization features beyond simple timestamp acquisition +4. Integration with PTPv2 or PTPv3 protocol layers (focus on hardware clock access) +5. Support for PTP device pin configuration or event generation + +## Design Considerations +- Follow the same pattern as the PPS driver for consistency with existing codebase +- Use the dual-task threading model with blocking I/O in separate thread +- Implement timer-based polling using Tokio's interval mechanism +- Maintain compatibility with existing ntpd-rs source management architecture +- Support multiple PTP devices simultaneously through separate user configurations + +## Technical Considerations +1. **Dependencies**: Must integrate with ptp-time crate (version specified in Cargo.toml) +2. **Threading Model**: Dual-task model similar to PPS driver - async coordinator with blocking thread for device I/O +3. **Timing Precision**: Should provide sub-microsecond timing precision comparable to PPS driver +4. **System Integration**: Follow established `MsgForSystem::OneWaySourceUpdate` communication pattern +5. **Capability Detection**: Auto-detect timestamp capabilities at initialization time only + +## Success Metrics +1. **Performance**: Achieve sub-microsecond timing precision consistent with PPS driver +2. **Reliability**: 99.9% uptime for PTP device operations when devices are available +3. **Configuration**: Support all required configuration parameters through ntpd-rs config system +4. **Error Handling**: Graceful degradation when PTP devices are unavailable or error + +## Open Questions +1. Should the PTP driver support multiple PTP devices simultaneously? (Answer: Yes, separate user-supplied configuration for each device) +2. How should precision estimation be handled? (Answer: Use the pattern from the PPS driver) +3. What timestamp capability fallback behavior should be implemented? (Answer: Precise -> Extended -> Standard timestamps with auto-detection at initialization only) diff --git a/docs/development/prompts/ptp/ptp-driver-prompt-1.md b/docs/development/prompts/ptp/ptp-driver-prompt-1.md new file mode 100644 index 000000000..769bd28b8 --- /dev/null +++ b/docs/development/prompts/ptp/ptp-driver-prompt-1.md @@ -0,0 +1,8 @@ +Using the analysis in plans/driver-analysis-combined.md, create a PRD for a new source driver for ntpd-rs. It should use the pps-time crate from crates.io, and should follow mostly the same pattern as the PPS driver. That is, a blocking thread architecture, one-way (receive-only) communication, and a dual-task threading model. However, the invocation must be timer-based rather than device-triggered. +Use the GitHub or fetch URL tool to retrieve an example of using the ptp-time crate. It is available at https://raw.githubusercontent.com/paulgear/ptp-time/refs/heads/main/examples/demo.rs +It should have configurable polling intervals like other network sources. The default polling interval bounds should be 2^-1 (0.5 seconds) for the minimum and 2^6 (64 seconds) for the maximum. +Please update the PRD with the following responses to open questions: +1. The PTP driver should support multiple PTP devices simultaneously. A separate, user-supplied configuration for each device is required. +2. Use the pattern from the PPS driver for precision estimation. +3. The PTP capability for precise timestamps is the main target. If precise timestamps are not available it should fall back to extended timestamps, and only use standard timestamps if both of the above fail. This should be auto-detected on driver initialisation and not attempted on every poll. +Then proceed to write the PRD to the target file as requested. diff --git a/docs/development/prompts/ptp/tasks-prd-ptp-driver.md b/docs/development/prompts/ptp/tasks-prd-ptp-driver.md new file mode 100644 index 000000000..fd354f591 --- /dev/null +++ b/docs/development/prompts/ptp/tasks-prd-ptp-driver.md @@ -0,0 +1,53 @@ +## Tasks + +- [x] 1. Create PTP Driver Spawner + - [x] 1.1 Implement `PtpSourceSpawner` struct with device path and polling interval configuration + - [x] 1.2 Implement `Spawner` trait methods for PTP driver (try_spawn, is_complete, etc.) + - [x] 1.3 Add PTP source configuration to `NtpSourceConfig` enum + - [x] 1.4 Integrate PTP spawner creation in system coordinator + - [x] 1.5 Create unit tests for spawner lifecycle management + - [x] 1.6 Run unit tests for spawner and fix any problems encountered. +- [x] 2. Implement PTP Source Task with Dual-Thread Architecture + - [x] 2.1 Create `PtpSourceTask` struct with blocking thread channel communication + - [x] 2.2 Implement blocking I/O thread for PTP device access using ptp-time crate + - [x] 2.3 Implement async coordinator task that manages polling timer + - [x] 2.4 Implement timestamp capability detection at initialization only + - [x] 2.5 Implement measurement reporting via `OneWaySourceUpdate` + - [x] 2.6 Create integration tests for dual-thread communication pattern + - [x] 2.7 Run integration tests and fix any problems encountered. +- [x] 3. Add Configuration Support and Error Handling + - [x] 3.1 Implement PTP configuration parsing from ntpd-rs config files + - [x] 3.3 Implement graceful error handling for device unavailability + - [x] 3.5 Test configuration loading and error scenarios + - [x] 3.6 Run configuration tests and fix any problems encountered. +- [x] 4. Integrate with System Coordinator and Communication Patterns + - [x] 4.1 Implement `MsgForSystem::OneWaySourceUpdate` communication pattern + - [x] 4.2 Follow established error handling patterns (NetworkIssue, Unreachable) + - [x] 4.3 Ensure compatibility with existing source management architecture + - [x] 4.4 Test system integration and message passing + - [x] 4.5 Run system integration tests and fix any problems encountered. +- [x] 5. Create Integration Tests for PTP Driver Implementation + - [x] 5.1 Review the changes made in tasks 1-4 + - [x] 5.2 Review the patterns and techniques used elsewhere in the codebase + - [x] 5.3 Create comprehensive integration tests for the PTP driver + - [x] 5.4 Run the integration tests and fix any problems encountered. + +## Relevant Files + +- `ntp-proto/src/config.rs` - Configuration type definitions +- `ntpd/src/daemon/config/mod.rs` - Main configuration module +- `ntpd/src/daemon/config/ntp_source.rs` - NTP source configuration data structures and parsing +- `ntpd/src/daemon/pps_source.rs` - Reference implementation for dual-task threading pattern +- `ntpd/src/daemon/ptp_source.rs` - Source task implementation with dual-threading model +- `ntpd/src/daemon/sock_source.rs` - Reference for one-way communication pattern +- `ntpd/src/daemon/spawn/mod.rs` - System spawning controllers and data structures +- `ntpd/src/daemon/spawn/pps.rs` - Reference spawner for dual-task pattern +- `ntpd/src/daemon/spawn/ptp.rs` - Main spawner implementation for PTP driver +- `ntpd/src/daemon/spawn/sock.rs` - Reference spawner for one-way pattern +- `ntpd/src/daemon/system.rs` - System coordinator integration +- `ntpd/src/daemon/ptp_source.rs` - Updated to use fixed poll interval from configuration instead of adaptive polling +- `ntpd/src/daemon/config/ntp_source.rs` - Added comprehensive PTP configuration parsing test +- `ntpd/src/daemon/ptp_source.rs` - Enhanced error handling with consecutive error tracking, recovery detection, and proper NetworkIssue/Unreachable patterns +- `ntpd/src/daemon/ptp_source_integration_test.rs` - Comprehensive integration tests for PTP driver system integration and message passing patterns +- `ntpd/src/daemon/ptp_integration_test.rs` - Existing integration tests for PTP spawner and source task behavior +- `ntpd/src/daemon/mod.rs` - Added PTP integration test modules diff --git a/docs/guide/ptp.md b/docs/guide/ptp.md new file mode 100644 index 000000000..1207d3214 --- /dev/null +++ b/docs/guide/ptp.md @@ -0,0 +1,38 @@ +# PTP time sources + +## Time source +It is possible to use local PTP devices as time sources for ntpd-rs in addition to the usual NTP sources. The most common use case for this is virtual machines which use the KVM, Hyper-V, or AWS Nitro hypervisors, all of which supply a PTP Hardware Clock (PHC) device for use by guests (VMs). At the time of this writing, only KVM PTP devices have been tested. + +To configure a KVM guest, simply run: +```sh +sudo modprobe ptp_kvm +``` + +This will typically create a PTP device file in `/dev` that is immediately available for use: +``` +# ls -la /dev/ptp* +crw------- 1 root root 249, 0 Aug 25 13:40 /dev/ptp0 +lrwxrwxrwx 1 root root 4 Aug 25 13:40 /dev/ptp_kvm -> ptp0 +``` + +For [AWS instance types which support the ENA PHC](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configure-ec2-ntp.html#connect-to-the-ptp-hardware-clock), place the following in a file in `/etc/modprobe.d/`: +``` +options ena phc_enable=1 +``` + +The system must be restarted for the ENA `phc_enable` option to take effect. The AWS ENA PHC may be used in conjunction with the KVM PHC on supported instances; in such cases you should see both `/dev/ptp0` and `/dev/ptp1` present on the system. + +## ntpd-rs configuration +The PTP device can then be added as a time source for ntpd-rs by adding the following to the configuration: +```toml +[[source]] +mode = "ptp" +path = "/dev/ptp0" # or any other PTP device file; mandatory parameter +delay = 0.00004 # optional, defaults to 0.0; may be used to set the delay to something close to the host's real root delay +interval = 2 # optional, defaults to 0 (1 second) +precision = 1e-7 # optional, defaults to 1 nanosecond +stratum = 1 # optional, defaults to 0; may be used to set the stratum to a value which + # makes more sense in the context of the network in which ntpd-rs operates. +``` + +ntpd-rs is unable to estimate the uncertainty of the timing data from a PTP device. Therefore, you should provide an estimate (corresponding to 1 standard deviation) of this noise yourself through the `precision` field. diff --git a/docs/includes/glossary.md b/docs/includes/glossary.md index e11ca131f..1903e3e0c 100644 --- a/docs/includes/glossary.md +++ b/docs/includes/glossary.md @@ -1,13 +1,15 @@ -*[NTS-KE]: NTS Key Exchange -*[NTP]: Network Time Protocol -*[NTS]: Network Time Security -*[TLS]: Transport Layer Security -*[TCP]: Transmission Control Protocol -*[UDP]: User Datagram Protocol *[CA]: Certificate Authority -*[OS]: Operating System -*[DNS]: Domain Name System *[CIDR]: Classless Inter-Domain Routing +*[DNS]: Domain Name System *[GPS]: Global Positioning System +*[NTP]: Network Time Protocol +*[NTS-KE]: NTS Key Exchange +*[NTS]: Network Time Security +*[OS]: Operating System +*[PHC]: PTP Hardware Clock *[PPS]: Pulse Per Second +*[PTP]: Precision Time Protocol +*[TCP]: Transmission Control Protocol +*[TLS]: Transport Layer Security +*[UDP]: User Datagram Protocol *[ioctl]: input/output control diff --git a/docs/man/ntp.toml.5.md b/docs/man/ntp.toml.5.md index 01e5902d0..714fa158b 100644 --- a/docs/man/ntp.toml.5.md +++ b/docs/man/ntp.toml.5.md @@ -52,6 +52,12 @@ with any of these options: assumed to send a pulse every rounded second. As these devices only provide periodic data, they do not count towards `minimum-agreeing-sources`. +`ptp` +: A PTP source connects to a Precision Time Protocol device, which may be + polled like an NTP source. Common PTP devices include the KVM, Hyper-V, + and AWS Nitro hypervisor PHC devices. Multiple PTP sources may be + configured provided that they use different paths. + # CONFIGURATION ## `[source-defaults]` @@ -82,8 +88,8 @@ sources. `mode` = *mode* : Specify one of the source modes that ntpd-rs supports: `server`, `pool`, - `nts`, `sock` or `pps`. For a description of the different source modes, see - the *SOURCE MODES* section. + `nts`, `sock`, `pps`, or `ptp`. For a description of the different source + modes, see the *SOURCE MODES* section. `address` = *address* : Specify the remote address of the source. For server sources this will be @@ -111,28 +117,14 @@ sources. : `pool` mode only. Specifies a list of IP addresses of servers in the pool which should not be used. For example: `["127.0.0.1"]`. Empty by default. -`measurement_noise_estimate` = *Noise variance (seconds squared)* -: `pps` and `sock` mode only. Deprecated, use `precision` instead. - -`precision` = *Noise standard deviation (seconds)* -: `pps` and `sock` mode only. Precision of the source. This should be an estimate - of the size of the expected measurement noise. Technically defined as the - 1-standard deviation bound on the measurement error. This is needed as - `sock` and `pps` sources don't have a good way to estimate their own error. - -`poll-interval-limits` = { `min` = *min*, `max` = *max* } (defaults from `[source-defaults]`) -: Specifies the limit on how often a source is queried for a new time. For - most instances the defaults will be adequate. The min and max are given as - the log2 of the number of seconds (i.e. two to the power of the interval). - An interval of 4 equates to 16 seconds, 10 results in an interval of 1024 - seconds. If only one of the two boundaries is specified, the other is - inherited from `[source-defaults]` - `initial-poll-interval` = *interval* (defaults from `[source-defaults]`) : Initial poll interval used on startup. The value is given as the log2 of the number of seconds (i.e. two to the power of the interval). The default value of 4 results in an interval of 16 seconds. +`measurement_noise_estimate` = *Noise variance (seconds squared)* +: `pps` and `sock` mode only. Deprecated, use `precision` instead. + `ntp-version` = `4` | `5` | `"auto"` (**4**) : Which NTP version to use for this source. By default this uses NTP version 4. You can use `5` to set the protocol version to the draft NTPv5 @@ -143,6 +135,23 @@ sources. NTPv5 support is currently in beta and can still change in a backwards incompatible way. +`path` = *file path* +: `pps`, `ptp` and `sock` mode only. The path to the device file or socket. + +`poll-interval-limits` = { `min` = *min*, `max` = *max* } (defaults from `[source-defaults]`) +: Specifies the limit on how often a source is queried for a new time. For + most instances the defaults will be adequate. The min and max are given as + the log2 of the number of seconds (i.e. two to the power of the interval). + An interval of 4 equates to 16 seconds, 10 results in an interval of 1024 + seconds. If only one of the two boundaries is specified, the other is + inherited from `[source-defaults]` + +`precision` = *Noise standard deviation (seconds)* +: `pps`, `ptp` and `sock` mode only. Precision of the source. This should be an + estimate of the size of the expected measurement noise. Technically defined + as the 1-standard deviation bound on the measurement error. This is needed + because these sources don't have a good way to estimate their own error. + ## `[[server]]` The NTP daemon can be configured to distribute time via any number of `[[server]]` sections. If no such sections have been defined, the daemon runs in diff --git a/mkdocs.yml b/mkdocs.yml index cafe0e905..d28c64e36 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -34,6 +34,7 @@ nav: - guide/installation.md - guide/server-setup.md - guide/gps-pps.md + - guide/ptp.md - guide/exporting-metrics.md - guide/nts.md - guide/ntpv5.md diff --git a/ntp-proto/src/identifiers.rs b/ntp-proto/src/identifiers.rs index 7f1cb7efa..7dd395e45 100644 --- a/ntp-proto/src/identifiers.rs +++ b/ntp-proto/src/identifiers.rs @@ -14,6 +14,7 @@ impl ReferenceId { pub const NONE: ReferenceId = ReferenceId(u32::from_be_bytes(*b"XNON")); pub const SOCK: ReferenceId = ReferenceId(u32::from_be_bytes(*b"SOCK")); pub const PPS: ReferenceId = ReferenceId(u32::from_be_bytes(*b"PPS\0")); + pub const PTP: ReferenceId = ReferenceId(u32::from_be_bytes(*b"XPTP")); // Network Time Security (NTS) negative-acknowledgment (NAK), from rfc8915 pub const KISS_NTSN: ReferenceId = ReferenceId(u32::from_be_bytes(*b"NTSN")); diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index 5b82c3f55..edb96f87f 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -297,6 +297,26 @@ impl Result< + OneWaySource, + ::Error, + > { + self.ensure_controller_control()?; + let controller = self.controller.add_one_way_source( + id, + source_config, + 0.0, // Assume no noise from PTP sources for now + Some(period), + ); + self.sources.insert(id, None); + Ok(OneWaySource::new(controller)) + } + #[allow(clippy::type_complexity)] pub fn create_ntp_source( &mut self, diff --git a/ntpd/Cargo.toml b/ntpd/Cargo.toml index 43646b442..41335bdaf 100644 --- a/ntpd/Cargo.toml +++ b/ntpd/Cargo.toml @@ -23,6 +23,7 @@ libc.workspace = true timestamped-socket.workspace = true clock-steering.workspace = true pps-time = { workspace = true, optional = true } +ptp-time = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true @@ -34,10 +35,11 @@ ntp-proto = { workspace = true, features = ["__internal-test",] } tokio-rustls.workspace = true [features] -default = [ "pps" ] +default = [ "pps", "ptp" ] hardware-timestamping = [] unstable_nts-pool = ["ntp-proto/nts-pool"] pps = [ "dep:pps-time" ] +ptp = [ "dep:ptp-time" ] [lib] name = "ntpd" diff --git a/ntpd/src/daemon/config/mod.rs b/ntpd/src/daemon/config/mod.rs index 8aaf9e94b..40a744004 100644 --- a/ntpd/src/daemon/config/mod.rs +++ b/ntpd/src/daemon/config/mod.rs @@ -446,6 +446,8 @@ impl Config { NtpSourceConfig::Sock(_) => count += 1, #[cfg(feature = "pps")] NtpSourceConfig::Pps(_) => {} // PPS sources don't count + #[cfg(feature = "ptp")] + NtpSourceConfig::Ptp(_) => count += 1, } } count @@ -479,7 +481,10 @@ impl Config { if self.sources.iter().any(|config| match config { NtpSourceConfig::Sock(_) => false, + #[cfg(feature = "pps")] NtpSourceConfig::Pps(_) => false, + #[cfg(feature = "ptp")] + NtpSourceConfig::Ptp(_) => false, NtpSourceConfig::Standard(config) => { matches!(config.first.ntp_version, ProtocolVersion::V5) } diff --git a/ntpd/src/daemon/config/ntp_source.rs b/ntpd/src/daemon/config/ntp_source.rs index 9f91e27b1..74620fe79 100644 --- a/ntpd/src/daemon/config/ntp_source.rs +++ b/ntpd/src/daemon/config/ntp_source.rs @@ -296,6 +296,133 @@ pub struct PpsSourceConfig { pub period: f64, } +#[cfg(feature = "ptp")] +#[derive(Debug, PartialEq, Clone)] +pub struct PtpSourceConfig { + pub delay: f64, + pub interval: PollInterval, + pub path: PathBuf, + pub precision: f64, + pub stratum: u8, +} + +#[cfg(feature = "ptp")] +impl<'de> Deserialize<'de> for PtpSourceConfig { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(field_identifier, rename_all = "snake_case")] + enum Field { + Delay, + Interval, + Path, + Precision, + Stratum, + } + + struct PtpSourceConfigVisitor; + + impl<'de> serde::de::Visitor<'de> for PtpSourceConfigVisitor { + type Value = PtpSourceConfig; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + formatter.write_str("struct PtpSourceConfig") + } + + fn visit_map(self, mut map: V) -> Result + where + V: serde::de::MapAccess<'de>, + { + let mut delay = None; + let mut interval = None; + let mut path = None; + let mut precision = None; + let mut stratum = None; + while let Some(key) = map.next_key()? { + match key { + Field::Delay => { + if delay.is_some() { + return Err(de::Error::duplicate_field("delay")); + } + let delay_raw: f64 = map.next_value()?; + if delay_raw.partial_cmp(&0.0) != Some(core::cmp::Ordering::Greater) || delay_raw >= 16.0 { + return Err(de::Error::invalid_value( + serde::de::Unexpected::Float(delay_raw), + &"delay should be positive and less than 16", + )); + } + delay = Some(delay_raw); + } + Field::Interval => { + if interval.is_some() { + return Err(de::Error::duplicate_field("interval")); + } + let interval_value: PollInterval = map.next_value()?; + if interval_value.as_log() < 0 || interval_value.as_log() > 17 { + return Err(de::Error::invalid_value( + serde::de::Unexpected::Signed(interval_value.as_log() as i64), + &"interval should be between 0 and 17", + )); + } + interval = Some(interval_value); + } + Field::Path => { + if path.is_some() { + return Err(de::Error::duplicate_field("path")); + } + path = Some(map.next_value()?); + } + Field::Precision => { + if precision.is_some() { + return Err(de::Error::duplicate_field("precision")); + } + let precision_raw: f64 = map.next_value()?; + if precision_raw.partial_cmp(&0.0) != Some(core::cmp::Ordering::Greater) + { + return Err(de::Error::invalid_value( + serde::de::Unexpected::Float(precision_raw), + &"precision should be positive", + )); + } + precision = Some(precision_raw); + } + Field::Stratum => { + if stratum.is_some() { + return Err(de::Error::duplicate_field("stratum")); + } + let stratum_raw: u8 = map.next_value()?; + if stratum_raw >= 16 { + return Err(de::Error::invalid_value( + serde::de::Unexpected::Unsigned(stratum_raw as u64), + &"stratum should be less than 16", + )); + } + stratum = Some(stratum_raw); + } + } + } + let delay = delay.unwrap_or(0.0); + let interval = interval.unwrap_or(PollInterval::from_byte(0)); // Default to 1 second + let path = path.ok_or_else(|| serde::de::Error::missing_field("path"))?; + let precision = precision.unwrap_or(0.000000001); // Default to 1 nanosecond + let stratum = stratum.unwrap_or(0); + Ok(PtpSourceConfig { + delay, + interval, + path, + precision, + stratum, + }) + } + } + + const FIELDS: &[&str] = &["delay", "interval", "path", "precision", "stratum"]; + deserializer.deserialize_struct("PtpSourceConfig", FIELDS, PtpSourceConfigVisitor) + } +} + impl<'de> Deserialize<'de> for PpsSourceConfig { fn deserialize(deserializer: D) -> Result where @@ -415,6 +542,9 @@ pub enum NtpSourceConfig { #[cfg(feature = "pps")] #[serde(rename = "pps")] Pps(PpsSourceConfig), + #[cfg(feature = "ptp")] + #[serde(rename = "ptp")] + Ptp(PtpSourceConfig), } /// A normalized address has a host and a port part. However, the host may be @@ -669,6 +799,8 @@ mod tests { NtpSourceConfig::Sock(_c) => "".to_string(), #[cfg(feature = "pps")] NtpSourceConfig::Pps(_c) => "".to_string(), + #[cfg(feature = "ptp")] + NtpSourceConfig::Ptp(_c) => "".to_string(), } } @@ -1134,6 +1266,202 @@ mod tests { assert!(test.is_err()); } + #[cfg(feature = "ptp")] + #[test] + fn test_ptp_config_parsing() { + let TestConfig { + source: NtpSourceConfig::Ptp(test), + } = toml::from_str( + r#" + [source] + interval = 1 + mode = "ptp" + path = "/dev/ptp0" + precision = 0.000001 + "#, + ) + .unwrap() + else { + panic!("Unexpected source type"); + }; + assert_eq!(test.delay, 0.0); + assert_eq!(test.interval, ntp_proto::PollInterval::from_byte(1)); + assert_eq!(test.path, PathBuf::from("/dev/ptp0")); + assert_eq!(test.precision, 0.000001); + assert_eq!(test.stratum, 0); + + // Test with default interval + let TestConfig { + source: NtpSourceConfig::Ptp(test), + } = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp1" + precision = 0.000005 + "#, + ) + .unwrap() + else { + panic!("Unexpected source type"); + }; + assert_eq!(test.interval, ntp_proto::PollInterval::from_byte(0)); // Default interval + assert_eq!(test.path, PathBuf::from("/dev/ptp1")); + assert_eq!(test.precision, 0.000005); + + // Test validation - missing path should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + precision = 0.000001 + "#, + ); + assert!(test.is_err()); + + // Test with default precision (1 nanosecond) + let TestConfig { + source: NtpSourceConfig::Ptp(test), + } = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + "#, + ) + .unwrap() + else { + panic!("Unexpected source type"); + }; + assert_eq!(test.interval, ntp_proto::PollInterval::from_byte(0)); // Default interval + assert_eq!(test.path, PathBuf::from("/dev/ptp0")); + assert_eq!(test.precision, 0.000000001); // Default precision (1 nanosecond) + + // Test validation - negative precision should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + precision = -0.000001 + "#, + ); + assert!(test.is_err()); + + // Test validation - negative interval should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + precision = 0.000001 + interval = -1 + "#, + ); + assert!(test.is_err()); + + // Test validation - zero precision should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + precision = 0.0 + "#, + ); + assert!(test.is_err()); + + // Test validation - interval too high should fail + let test: Result = toml::from_str( + r#" + [source] + interval = 100 + mode = "ptp" + path = "/dev/ptp0" + precision = 0.000001 + "#, + ); + assert!(test.is_err()); + + // Test validation - unknown field should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + precision = 0.000001 + unknown_field = 5 + "#, + ); + assert!(test.is_err()); + + // Test validation - stratum >= 16 should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + stratum = 16 + "#, + ); + assert!(test.is_err()); + + // Test validation - non-integral stratum should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + stratum = 4.972 + "#, + ); + assert!(test.is_err()); + + // Test validation - non-numeric stratum should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + stratum = teststratum + "#, + ); + assert!(test.is_err()); + + // Test validation - stratum < 0 should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + stratum = -3 + "#, + ); + assert!(test.is_err()); + + // Test validation - negative delay should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + delay = -1.0 + "#, + ); + assert!(test.is_err()); + + // Test validation - delay >= 16 should fail + let test: Result = toml::from_str( + r#" + [source] + mode = "ptp" + path = "/dev/ptp0" + delay = 16.0 + "#, + ); + assert!(test.is_err()); + } + #[test] fn test_normalize_addr() { let addr = NormalizedAddress::from_string_ntp("[::1]:456".into()).unwrap(); diff --git a/ntpd/src/daemon/mod.rs b/ntpd/src/daemon/mod.rs index f380bbdbb..b34b8404c 100644 --- a/ntpd/src/daemon/mod.rs +++ b/ntpd/src/daemon/mod.rs @@ -7,6 +7,12 @@ pub mod nts_key_provider; pub mod observer; #[cfg(feature = "pps")] mod pps_source; +#[cfg(feature = "ptp")] +mod ptp_source; +#[cfg(feature = "ptp")] +mod ptp_integration_test; +#[cfg(feature = "ptp")] +mod ptp_source_integration_test; mod server; mod sock_source; pub mod sockets; diff --git a/ntpd/src/daemon/ptp_integration_test.rs b/ntpd/src/daemon/ptp_integration_test.rs new file mode 100644 index 000000000..49bde10b4 --- /dev/null +++ b/ntpd/src/daemon/ptp_integration_test.rs @@ -0,0 +1,158 @@ +#[cfg(test)] +mod tests { + use std::{collections::HashMap, path::PathBuf, sync::{Arc, RwLock}, time::Duration}; + + use ntp_proto::{ + AlgorithmConfig, KalmanClockController, NtpClock, NtpDuration, NtpLeapIndicator, + NtpTimestamp, SourceConfig, SynchronizationConfig, + }; + use tokio::sync::mpsc; + + use crate::daemon::{ + config::PtpSourceConfig, + ntp_source::{MsgForSystem, SourceChannels}, + ptp_source::PtpSourceTask, + spawn::{SourceId, ptp::PtpSpawner, Spawner}, + util::EPOCH_OFFSET, + }; + + #[derive(Debug, Clone, Default)] + struct TestClock {} + + impl NtpClock for TestClock { + type Error = std::time::SystemTimeError; + + fn now(&self) -> std::result::Result { + let cur = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH)?; + + Ok(NtpTimestamp::from_seconds_nanos_since_ntp_era( + EPOCH_OFFSET.wrapping_add(cur.as_secs() as u32), + cur.subsec_nanos(), + )) + } + + fn set_frequency(&self, _freq: f64) -> Result { + self.now() + } + + fn get_frequency(&self) -> Result { + Ok(0.0) + } + + fn step_clock(&self, _offset: NtpDuration) -> Result { + panic!("Shouldn't be called by source"); + } + + fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> { + Ok(()) + } + + fn error_estimate_update( + &self, + _est_error: NtpDuration, + _max_error: NtpDuration, + ) -> Result<(), Self::Error> { + panic!("Shouldn't be called by source"); + } + + fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> { + Ok(()) + } + } + + #[tokio::test] + async fn test_ptp_spawner_integration() { + // Test that PTP spawner properly creates spawn events + let device_path = PathBuf::from("/dev/ptp0"); + let mut spawner = PtpSpawner::new( + PtpSourceConfig { + delay: 0.0, + interval: ntp_proto::PollInterval::from_byte(0), + path: device_path.clone(), + precision: 1e-9, + stratum: 0, + }, + SourceConfig::default(), + ); + + let (action_tx, mut action_rx) = mpsc::channel(1); + + // Initially not complete + assert!(!spawner.is_complete()); + + // Try to spawn should succeed + spawner.try_spawn(&action_tx).await.unwrap(); + + // Should now be complete + assert!(spawner.is_complete()); + + // Should receive spawn event + let event = action_rx.recv().await.unwrap(); + assert_eq!(event.id, spawner.get_id()); + + // Verify spawn action contains correct parameters + match event.action { + crate::daemon::spawn::SpawnAction::Create( + crate::daemon::spawn::SourceCreateParameters::Ptp(params) + ) => { + assert_eq!(params.path, device_path); + assert_eq!(params.interval, ntp_proto::PollInterval::from_byte(0)); + } + _ => panic!("Expected PTP source create parameters"), + } + } + + #[tokio::test] + async fn test_ptp_source_message_passing() { + // Test that PTP source properly handles system messages + // Note: This test will fail if no PTP device is available, which is expected + let (_system_update_sender, system_update_receiver) = tokio::sync::broadcast::channel(1); + let (msg_for_system_sender, mut msg_for_system_receiver) = mpsc::channel(1); + + let index = SourceId::new(); + let clock = TestClock {}; + let mut system: ntp_proto::System<_, KalmanClockController<_, _>> = + ntp_proto::System::new( + clock.clone(), + SynchronizationConfig::default(), + AlgorithmConfig::default(), + Arc::new([]), + ).unwrap(); + + let device_path = PathBuf::from("/dev/ptp0"); + + // Create PTP source - this will likely fail due to no device, but that's expected + let source = system.create_ptp_source(index, SourceConfig::default(), 1.0).unwrap(); + + let handle = PtpSourceTask::spawn( + index, + device_path, + ntp_proto::PollInterval::from_byte(0), + clock, + SourceChannels { + msg_for_system_sender, + system_update_receiver, + source_snapshots: Arc::new(RwLock::new(HashMap::new())), + }, + source, + 0, + 0.0, + ); + + // Should receive NetworkIssue message due to device unavailability + let msg = tokio::time::timeout(Duration::from_millis(100), msg_for_system_receiver.recv()) + .await + .unwrap() + .unwrap(); + + match msg { + MsgForSystem::NetworkIssue(source_id) => { + assert_eq!(source_id, index); + } + _ => panic!("Expected NetworkIssue message due to device unavailability"), + } + + handle.abort(); + } +} diff --git a/ntpd/src/daemon/ptp_source.rs b/ntpd/src/daemon/ptp_source.rs new file mode 100644 index 000000000..03b7254b8 --- /dev/null +++ b/ntpd/src/daemon/ptp_source.rs @@ -0,0 +1,390 @@ +use std::path::PathBuf; + +use ntp_proto::{ + Measurement, NtpClock, NtpDuration, NtpInstant, NtpLeapIndicator, OneWaySource, + OneWaySourceSnapshot, OneWaySourceUpdate, ReferenceId, SourceController, SystemSourceUpdate, +}; +use ptp_time::{PtpDevice, ptp::{ptp_sys_offset_precise, ptp_sys_offset_extended, ptp_sys_offset}}; +use tokio::sync::mpsc; +use tracing::{Instrument, Span, debug, error, info, instrument, warn}; + +use crate::daemon::ntp_source::MsgForSystem; + +use super::{ntp_source::SourceChannels, spawn::SourceId}; + +#[derive(Debug, Clone)] +enum TimestampCapability { + Precise, + Extended, + Standard, +} + +enum PtpTimestamp { + Precise(ptp_sys_offset_precise), + Extended(ptp_sys_offset_extended), + Standard(ptp_sys_offset), +} + +impl PtpTimestamp { + // Convert PTP timestamp to NTP duration (seconds) + fn calculate_offset(&self) -> Option { + match self { + PtpTimestamp::Precise(precise) => { + let ptp_time = precise.device.sec as f64 + (precise.device.nsec as f64 / 1_000_000_000.0); + let sys_time = precise.sys_realtime.sec as f64 + (precise.sys_realtime.nsec as f64 / 1_000_000_000.0); + Some(sys_time - ptp_time) + } + PtpTimestamp::Extended(extended) => { + if extended.n_samples > 0 { + let ptp_time = extended.ts[0][1].sec as f64 + (extended.ts[0][1].nsec as f64 / 1_000_000_000.0); + let sys_time = extended.ts[0][0].sec as f64 + (extended.ts[0][0].nsec as f64 / 1_000_000_000.0); + Some(sys_time - ptp_time) + } else { + None + } + } + PtpTimestamp::Standard(standard) => { + if standard.n_samples > 0 { + let ptp_time = standard.ts[1].sec as f64 + (standard.ts[1].nsec as f64 / 1_000_000_000.0); + let sys_time = standard.ts[0].sec as f64 + (standard.ts[0].nsec as f64 / 1_000_000_000.0); + Some(sys_time - ptp_time) + } else { + None + } + } + } + } +} + +impl std::fmt::Debug for PtpTimestamp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PtpTimestamp::Precise(precise) => { + f.debug_struct("Precise") + .field("offset", &format_args!("{:.9}s", self.calculate_offset().unwrap_or(0.0))) + .field("device", &format_args!("{}.{:09}", precise.device.sec, precise.device.nsec)) + .field("sys_realtime", &format_args!("{}.{:09}", precise.sys_realtime.sec, precise.sys_realtime.nsec)) + .finish() + } + PtpTimestamp::Extended(extended) => { + f.debug_struct("Extended") + .field("offset", &format_args!("{:.9}s", self.calculate_offset().unwrap_or(0.0))) + .field("n_samples", &extended.n_samples) + .field("samples", &extended.ts.iter().take(extended.n_samples as usize).map(|ts| + format!("sys:{}.{:09} dev:{}.{:09}", ts[0].sec, ts[0].nsec, ts[1].sec, ts[1].nsec) + ).collect::>()) + .finish() + } + PtpTimestamp::Standard(standard) => { + f.debug_struct("Standard") + .field("offset", &format_args!("{:.9}s", self.calculate_offset().unwrap_or(0.0))) + .field("n_samples", &standard.n_samples) + .field("sys_time", &format_args!("{}.{:09}", standard.ts[0].sec, standard.ts[0].nsec)) + .field("dev_time", &format_args!("{}.{:09}", standard.ts[1].sec, standard.ts[1].nsec)) + .finish() + } + } + } +} + +struct PtpDeviceFetchTask { + ptp: PtpDevice, + fetch_sender: mpsc::Sender>, + poll_receiver: mpsc::Receiver<()>, + device_path: PathBuf, + capability: TimestampCapability, +} + +impl PtpDeviceFetchTask { + fn run(&mut self) { + info!("PTP device fetch task started for {:?}", self.device_path); + + loop { + // Wait for poll request from coordinator + if self.poll_receiver.blocking_recv().is_none() { + info!("PTP device fetch task terminating: coordinator disconnected"); + break; // Channel closed, exit + } + + let result = match self.capability { + TimestampCapability::Precise => { + self.ptp.get_sys_offset_precise().map(PtpTimestamp::Precise) + } + TimestampCapability::Extended => { + self.ptp.get_sys_offset_extended().map(PtpTimestamp::Extended) + } + TimestampCapability::Standard => { + self.ptp.get_sys_offset().map(PtpTimestamp::Standard) + } + }; + + match result { + Err(e) => { + let error_msg = format!("PTP device error: {}", e); + error!("{}", error_msg); + if self.fetch_sender.blocking_send(Err(error_msg)).is_err() { + info!("PTP device fetch task terminating: coordinator disconnected"); + break; + } + } + Ok(data) => { + if self.fetch_sender.blocking_send(Ok(data)).is_err() { + info!("PTP device fetch task terminating: coordinator disconnected"); + break; + } + } + } + } + + info!("PTP device fetch task terminated for {:?}", self.device_path); + } +} + +pub(crate) struct PtpSourceTask< + C: 'static + NtpClock + Send, + Controller: SourceController, +> { + index: SourceId, + clock: C, + channels: SourceChannels, + path: PathBuf, + source: OneWaySource, + fetch_receiver: mpsc::Receiver>, + poll_sender: mpsc::Sender<()>, + poll_interval: ntp_proto::PollInterval, + stratum: u8, + delay: f64, +} + +impl> PtpSourceTask +where + C: 'static + NtpClock + Send + Sync, +{ + async fn run(&mut self) { + let mut poll_timer = tokio::time::interval(self.poll_interval.as_system_duration()); + poll_timer.tick().await; // Skip first immediate tick + + loop { + enum SelectResult { + Timer, + PtpRecv(Option>), + SystemUpdate( + Result< + SystemSourceUpdate, + tokio::sync::broadcast::error::RecvError, + >, + ), + } + + let selected: SelectResult = tokio::select! { + _ = poll_timer.tick() => { + SelectResult::Timer + }, + result = self.fetch_receiver.recv() => { + SelectResult::PtpRecv(result) + }, + result = self.channels.system_update_receiver.recv() => { + SelectResult::SystemUpdate(result) + } + }; + + match selected { + SelectResult::Timer => { + debug!("PTP poll timer triggered"); + // Send poll request to blocking thread + if self.poll_sender.send(()).await.is_err() { + warn!("PTP device fetch task terminated, attempting to restart source"); + self.channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await + .ok(); + break; + } + } + SelectResult::PtpRecv(result) => match result { + Some(Ok(data)) => { + debug!("received {:?}", data); + + let time = match self.clock.now() { + Ok(time) => time, + Err(e) => { + error!(error = ?e, "There was an error retrieving the current time"); + continue; + } + }; + + let offset_seconds = match data.calculate_offset() { + Some(offset) => offset, + None => { + warn!("Timestamp has no samples"); + continue; + } + }; + + let measurement = Measurement { + delay: (), + offset: NtpDuration::from_seconds(offset_seconds), + localtime: time, + monotime: NtpInstant::now(), + stratum: self.stratum, + root_delay: NtpDuration::from_seconds(self.delay), + root_dispersion: NtpDuration::ZERO, + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }; + + let controller_message = self.source.handle_measurement(measurement); + + let update = OneWaySourceUpdate { + snapshot: OneWaySourceSnapshot { + source_id: ReferenceId::PTP, + stratum: self.stratum, + }, + message: controller_message, + }; + + self.channels + .msg_for_system_sender + .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) + .await + .ok(); + + // Create custom ObservableSourceState with correct poll interval + let mut observable_state = self.source.observe( + "PTP device".to_string(), + self.path.display().to_string(), + self.index, + ); + observable_state.poll_interval = self.poll_interval; + + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .insert(self.index, observable_state); + } + Some(Err(error_msg)) => { + error!("PTP device error: {}", error_msg); + self.channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await + .ok(); + break; + } + None => { + warn!("PTP device fetch task terminated, attempting to restart source"); + self.channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await + .ok(); + break; + } + }, + SelectResult::SystemUpdate(result) => match result { + Ok(update) => { + self.source.handle_message(update.message); + } + Err(e) => { + error!("Error receiving system update: {:?}", e) + } + }, + }; + } + } + + #[allow(clippy::too_many_arguments)] + #[instrument(level = tracing::Level::ERROR, name = "Ptp Source", skip(clock, channels, source))] + pub fn spawn( + index: SourceId, + device_path: PathBuf, + poll_interval: ntp_proto::PollInterval, + clock: C, + channels: SourceChannels, + source: OneWaySource, + stratum: u8, + delay: f64, + ) -> tokio::task::JoinHandle<()> { + // Handle device opening errors gracefully + let ptp = match PtpDevice::new(device_path.clone()) { + Ok(ptp) => { + info!("Successfully opened PTP device at {:?}", device_path); + ptp + } + Err(e) => { + error!(error = ?e, "Failed to open PTP device at {:?}", device_path); + // Send a NetworkIssue message to trigger system coordinator recovery + tokio::spawn(async move { + channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(index)) + .await + .ok(); + }); + // Return a dummy task that completes immediately + return tokio::spawn(async { + info!("PTP source task terminated due to device unavailability"); + }); + } + }; + + // Detect timestamp capabilities at initialization + let capability = detect_timestamp_capability(&ptp); + + let (fetch_sender, fetch_receiver) = mpsc::channel(1); + let (poll_sender, poll_receiver) = mpsc::channel(1); + let device_path_clone = device_path.clone(); + + tokio::task::spawn_blocking(move || { + let mut process = PtpDeviceFetchTask { + ptp, + fetch_sender, + poll_receiver, + device_path: device_path_clone, + capability, + }; + + process.run(); + }); + + tokio::spawn( + (async move { + let mut process = PtpSourceTask { + index, + clock, + channels, + path: device_path, + source, + fetch_receiver, + poll_sender, + poll_interval, + stratum, + delay, + }; + + process.run().await; + }) + .instrument(Span::current()), + ) + } +} + +fn detect_timestamp_capability(ptp: &PtpDevice) -> TimestampCapability { + // Try precise timestamps first + if ptp.get_sys_offset_precise().is_ok() { + info!("PTP device supports precise timestamps"); + return TimestampCapability::Precise; + } + + // Fall back to extended timestamps + if ptp.get_sys_offset_extended().is_ok() { + info!("PTP device supports extended timestamps"); + return TimestampCapability::Extended; + } + + // Fall back to standard timestamps + info!("PTP device using standard timestamps"); + TimestampCapability::Standard +} diff --git a/ntpd/src/daemon/ptp_source_integration_test.rs b/ntpd/src/daemon/ptp_source_integration_test.rs new file mode 100644 index 000000000..a1d1ce837 --- /dev/null +++ b/ntpd/src/daemon/ptp_source_integration_test.rs @@ -0,0 +1,252 @@ +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::time::Duration; + use tokio::sync::mpsc; + use tokio::time::timeout; + + use ntp_proto::SourceConfig; + use crate::daemon::{ + config::PtpSourceConfig, + ntp_source::MsgForSystem, + spawn::{ + SourceCreateParameters, SpawnAction, Spawner, + ptp::PtpSpawner, SourceId, SourceRemovedEvent, SourceRemovalReason, + }, + system::MESSAGE_BUFFER_SIZE, + }; + + #[tokio::test] + async fn test_ptp_spawner_system_integration() { + // Create a temporary path for testing + let test_path = std::env::temp_dir().join("test_ptp_device"); + + // Create PTP spawner + let mut spawner = PtpSpawner::new( + PtpSourceConfig { + delay: 0.0, + interval: ntp_proto::PollInterval::from_byte(0), + path: test_path.clone(), + precision: 1e-6, + stratum: 0, + }, + SourceConfig::default(), + ); + + let spawner_id = spawner.get_id(); + let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); + + // Test spawner creation + assert!(!spawner.is_complete()); + assert_eq!(spawner.get_description(), "PTP"); + assert_eq!(spawner.get_addr_description(), test_path.display().to_string()); + + // Test spawn action + spawner.try_spawn(&action_tx).await.unwrap(); + assert!(spawner.is_complete()); + + // Verify spawn event + let spawn_event = action_rx.try_recv().unwrap(); + assert_eq!(spawn_event.id, spawner_id); + + let SpawnAction::Create(create_params) = spawn_event.action; + let SourceCreateParameters::Ptp(ptp_params) = create_params else { + panic!("Expected PTP source create parameters"); + }; + + assert_eq!(ptp_params.path, test_path); + assert_eq!(ptp_params.interval, ntp_proto::PollInterval::from_byte(0)); + } + + #[tokio::test] + async fn test_ptp_spawner_source_removal_handling() { + let test_path = std::env::temp_dir().join("test_ptp_device_removal"); + + let mut spawner = PtpSpawner::new( + PtpSourceConfig { + delay: 0.0, + interval: ntp_proto::PollInterval::from_byte(1), + path: test_path, + precision: 1e-6, + stratum: 0, + }, + SourceConfig::default(), + ); + + let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); + + // Spawn a source + spawner.try_spawn(&action_tx).await.unwrap(); + assert!(spawner.is_complete()); + + // Consume the spawn event + let _spawn_event = action_rx.try_recv().unwrap(); + + // Test network issue removal (should allow respawn) + let source_id = SourceId::new(); + let removal_event = SourceRemovedEvent { + id: source_id, + reason: SourceRemovalReason::NetworkIssue, + }; + + spawner.handle_source_removed(removal_event).await.unwrap(); + assert!(!spawner.is_complete()); // Should be ready to respawn + + // Test demobilized removal (should not allow respawn) + spawner.try_spawn(&action_tx).await.unwrap(); + assert!(spawner.is_complete()); + + // Consume the second spawn event + let _spawn_event2 = action_rx.try_recv().unwrap(); + + let demobilized_event = SourceRemovedEvent { + id: source_id, + reason: SourceRemovalReason::Demobilized, + }; + + spawner.handle_source_removed(demobilized_event).await.unwrap(); + assert!(spawner.is_complete()); // Should remain complete + } + + #[tokio::test] + async fn test_ptp_message_passing_patterns() { + // Test that PTP sources use the correct message types + let (msg_tx, mut msg_rx) = mpsc::channel::>(MESSAGE_BUFFER_SIZE); + let source_id = SourceId::new(); + + // Simulate OneWaySourceUpdate message (what PTP should send) + let update = ntp_proto::OneWaySourceUpdate { + snapshot: ntp_proto::OneWaySourceSnapshot { + source_id: ntp_proto::ReferenceId::PTP, + stratum: 0, + }, + message: Some(()), + }; + + msg_tx.send(MsgForSystem::OneWaySourceUpdate(source_id, update)).await.unwrap(); + + // Verify message was received correctly + let received_msg = msg_rx.try_recv().unwrap(); + match received_msg { + MsgForSystem::OneWaySourceUpdate(id, update) => { + assert_eq!(id, source_id); + assert_eq!(update.snapshot.source_id, ntp_proto::ReferenceId::PTP); + assert_eq!(update.snapshot.stratum, 0); + } + _ => panic!("Expected OneWaySourceUpdate message"), + } + } + + #[tokio::test] + async fn test_ptp_error_handling_patterns() { + let (msg_tx, mut msg_rx) = mpsc::channel::>(MESSAGE_BUFFER_SIZE); + let source_id = SourceId::new(); + + // Test NetworkIssue error pattern + msg_tx.send(MsgForSystem::NetworkIssue(source_id)).await.unwrap(); + + let received_msg = msg_rx.try_recv().unwrap(); + match received_msg { + MsgForSystem::NetworkIssue(id) => { + assert_eq!(id, source_id); + } + _ => panic!("Expected NetworkIssue message"), + } + + // Test Unreachable error pattern + msg_tx.send(MsgForSystem::Unreachable(source_id)).await.unwrap(); + + let received_msg = msg_rx.try_recv().unwrap(); + match received_msg { + MsgForSystem::Unreachable(id) => { + assert_eq!(id, source_id); + } + _ => panic!("Expected Unreachable message"), + } + } + + #[tokio::test] + async fn test_ptp_source_create_parameters() { + let test_path = PathBuf::from("/dev/ptp0"); + let source_id = SourceId::new(); + let config = SourceConfig::default(); + let interval = ntp_proto::PollInterval::from_byte(0); + + let params = SourceCreateParameters::Ptp(crate::daemon::spawn::PtpSourceCreateParameters { + id: source_id, + path: test_path.clone(), + config, + interval, + stratum: 0, + delay: 0.0, + }); + + // Test parameter accessors + assert_eq!(params.get_id(), source_id); + assert_eq!(params.get_addr(), test_path.display().to_string()); + + // Test parameter extraction + let SourceCreateParameters::Ptp(ptp_params) = params else { + panic!("Expected PTP parameters"); + }; + + assert_eq!(ptp_params.id, source_id); + assert_eq!(ptp_params.path, test_path); + assert_eq!(ptp_params.interval, interval); + } + + #[tokio::test] + async fn test_ptp_spawner_lifecycle() { + let test_path = std::env::temp_dir().join("test_ptp_lifecycle"); + + let mut spawner = PtpSpawner::new( + PtpSourceConfig { + delay: 0.0, + interval: ntp_proto::PollInterval::from_byte(2), + path: test_path.clone(), + precision: 1e-9, + stratum: 0, + }, + SourceConfig::default(), + ); + + let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); + + // Initial state + assert!(!spawner.is_complete()); + + // Spawn source + spawner.try_spawn(&action_tx).await.unwrap(); + assert!(spawner.is_complete()); + + // Verify spawn event details + let spawn_event = action_rx.try_recv().unwrap(); + let SpawnAction::Create(SourceCreateParameters::Ptp(params)) = spawn_event.action else { + panic!("Expected PTP create parameters"); + }; + + assert_eq!(params.path, test_path); + assert_eq!(params.interval, ntp_proto::PollInterval::from_byte(2)); + + // Test that spawner doesn't spawn again when complete + let result = timeout(Duration::from_millis(100), action_rx.recv()).await; + assert!(result.is_err()); // Should timeout since no new spawn events + + // Test source removal and respawn capability + let removal_event = SourceRemovedEvent { + id: params.id, + reason: SourceRemovalReason::NetworkIssue, + }; + + spawner.handle_source_removed(removal_event).await.unwrap(); + assert!(!spawner.is_complete()); + + // Should be able to spawn again + spawner.try_spawn(&action_tx).await.unwrap(); + assert!(spawner.is_complete()); + + // Verify second spawn event + let second_spawn_event = action_rx.try_recv().unwrap(); + assert!(matches!(second_spawn_event.action, SpawnAction::Create(SourceCreateParameters::Ptp(_)))); + } +} diff --git a/ntpd/src/daemon/spawn/mod.rs b/ntpd/src/daemon/spawn/mod.rs index d3446965b..d398afa36 100644 --- a/ntpd/src/daemon/spawn/mod.rs +++ b/ntpd/src/daemon/spawn/mod.rs @@ -15,6 +15,8 @@ pub mod nts_pool; pub mod pool; #[cfg(feature = "pps")] pub mod pps; +#[cfg(feature = "ptp")] +pub mod ptp; pub mod sock; pub mod standard; @@ -142,6 +144,8 @@ pub enum SourceCreateParameters { Sock(SockSourceCreateParameters), #[cfg(feature = "pps")] Pps(PpsSourceCreateParameters), + #[cfg(feature = "ptp")] + Ptp(PtpSourceCreateParameters), } impl SourceCreateParameters { @@ -151,6 +155,8 @@ impl SourceCreateParameters { Self::Sock(params) => params.id, #[cfg(feature = "pps")] Self::Pps(params) => params.id, + #[cfg(feature = "ptp")] + Self::Ptp(params) => params.id, } } @@ -160,6 +166,8 @@ impl SourceCreateParameters { Self::Sock(params) => params.path.display().to_string(), #[cfg(feature = "pps")] Self::Pps(params) => params.path.display().to_string(), + #[cfg(feature = "ptp")] + Self::Ptp(params) => params.path.display().to_string(), } } } @@ -192,6 +200,17 @@ pub struct PpsSourceCreateParameters { pub period: f64, } +#[cfg(feature = "ptp")] +#[derive(Debug)] +pub struct PtpSourceCreateParameters { + pub id: SourceId, + pub path: PathBuf, + pub config: SourceConfig, + pub interval: ntp_proto::PollInterval, + pub stratum: u8, + pub delay: f64, +} + pub trait Spawner { type Error: std::error::Error + Send; diff --git a/ntpd/src/daemon/spawn/ptp.rs b/ntpd/src/daemon/spawn/ptp.rs new file mode 100644 index 000000000..b18004387 --- /dev/null +++ b/ntpd/src/daemon/spawn/ptp.rs @@ -0,0 +1,130 @@ +use ntp_proto::SourceConfig; +use tokio::sync::mpsc; + +use crate::daemon::config::PtpSourceConfig; + +use super::{ + PtpSourceCreateParameters, SourceCreateParameters, SourceId, SourceRemovalReason, + SourceRemovedEvent, SpawnAction, SpawnEvent, Spawner, SpawnerId, standard::StandardSpawnError, +}; + +#[cfg(feature = "ptp")] +pub struct PtpSpawner { + config: PtpSourceConfig, + source_config: SourceConfig, + id: SpawnerId, + has_spawned: bool, +} + +#[cfg(feature = "ptp")] +impl PtpSpawner { + pub fn new(config: PtpSourceConfig, source_config: SourceConfig) -> PtpSpawner { + PtpSpawner { + config, + source_config, + id: Default::default(), + has_spawned: false, + } + } +} + +#[cfg(feature = "ptp")] +impl Spawner for PtpSpawner { + type Error = StandardSpawnError; + + async fn try_spawn( + &mut self, + action_tx: &mpsc::Sender, + ) -> Result<(), StandardSpawnError> { + action_tx + .send(SpawnEvent::new( + self.id, + SpawnAction::Create(SourceCreateParameters::Ptp(PtpSourceCreateParameters { + id: SourceId::new(), + path: self.config.path.clone(), + config: self.source_config, + interval: self.config.interval, + stratum: self.config.stratum, + delay: self.config.delay, + })), + )) + .await?; + self.has_spawned = true; + Ok(()) + } + + fn is_complete(&self) -> bool { + self.has_spawned + } + + async fn handle_source_removed( + &mut self, + removed_source: SourceRemovedEvent, + ) -> Result<(), StandardSpawnError> { + if removed_source.reason != SourceRemovalReason::Demobilized { + self.has_spawned = false; + } + Ok(()) + } + + fn get_id(&self) -> SpawnerId { + self.id + } + + fn get_addr_description(&self) -> String { + self.config.path.display().to_string() + } + + fn get_description(&self) -> &str { + "PTP" + } +} + +#[cfg(test)] +mod tests { + use ntp_proto::SourceConfig; + use tokio::sync::mpsc; + + use crate::{ + daemon::{ + config::PtpSourceConfig, + spawn::{SourceCreateParameters, SpawnAction, Spawner, ptp::PtpSpawner}, + system::MESSAGE_BUFFER_SIZE, + }, + test::alloc_port, + }; + + #[tokio::test] + async fn creates_a_source() { + let socket_path = std::env::temp_dir().join(format!("ntp-test-stream-{}", alloc_port())); + let precision = 1e-3; + let mut spawner = PtpSpawner::new( + PtpSourceConfig { + path: socket_path.clone(), + precision, + interval: ntp_proto::PollInterval::from_byte(0), + stratum: 0, + delay: 0.0, + }, + SourceConfig::default(), + ); + let spawner_id = spawner.get_id(); + let (action_tx, mut action_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); + + assert!(!spawner.is_complete()); + spawner.try_spawn(&action_tx).await.unwrap(); + let res = action_rx.try_recv().unwrap(); + assert_eq!(res.id, spawner_id); + + let SpawnAction::Create(create_params) = res.action; + assert_eq!(create_params.get_addr(), socket_path.display().to_string()); + + let SourceCreateParameters::Ptp(params) = create_params else { + panic!("did not receive PTP source create parameters!"); + }; + assert_eq!(params.path, socket_path); + + // Should be complete after spawning + assert!(spawner.is_complete()); + } +} diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index b8bf1de17..48c4bcbc3 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -20,6 +20,10 @@ use super::{ #[cfg(feature = "pps")] use super::spawn::pps::PpsSpawner; +#[cfg(feature = "ptp")] +use super::spawn::ptp::PtpSpawner; +#[cfg(feature = "ptp")] +use crate::daemon::ptp_source::PtpSourceTask; use std::{ collections::HashMap, @@ -159,6 +163,10 @@ pub async fn spawn { system.add_spawner(PpsSpawner::new(cfg.clone(), source_defaults_config)); } + #[cfg(feature = "ptp")] + NtpSourceConfig::Ptp(cfg) => { + system.add_spawner(PtpSpawner::new(cfg.clone(), source_defaults_config)); + } } } @@ -553,6 +561,28 @@ impl { + let source = self.system.create_ptp_source( + source_id, + params.config, + params.interval.as_duration().to_seconds(), + )?; + PtpSourceTask::spawn( + source_id, + params.path.clone(), + params.interval, + self.clock.clone(), + SourceChannels { + msg_for_system_sender: self.msg_for_system_tx.clone(), + system_update_receiver: self.system_update_sender.subscribe(), + source_snapshots: self.source_snapshots.clone(), + }, + source, + params.stratum, + params.delay, + ); + } }; // Try and find a related spawner and notify that spawner. diff --git a/ntpd/src/force_sync/mod.rs b/ntpd/src/force_sync/mod.rs index 58768a119..6a0fad4d6 100644 --- a/ntpd/src/force_sync/mod.rs +++ b/ntpd/src/force_sync/mod.rs @@ -136,6 +136,8 @@ pub(crate) fn force_sync(config: Option) -> std::io::Result { | config::NtpSourceConfig::Sock(_) => total_sources += 1, #[cfg(feature = "pps")] config::NtpSourceConfig::Pps(_) => {} // PPS sources don't count + #[cfg(feature = "ptp")] + config::NtpSourceConfig::Ptp(_) => total_sources += 1, config::NtpSourceConfig::Pool(cfg) => total_sources += cfg.first.count, #[cfg(feature = "unstable_nts-pool")] config::NtpSourceConfig::NtsPool(cfg) => total_sources += cfg.first.count,