Skip to content

feat: add memory-aware collector for automatic buffer flushing #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: devel
Choose a base branch
from

Conversation

djudjuu
Copy link
Collaborator

@djudjuu djudjuu commented Aug 19, 2025

This PR introduces a memory-aware collector that automatically monitors RAM usage and flushes buffered data when memory consumption approaches user-defined limits. This prevents OOM errors while maintaining optimal performance by allowing buffers to fill up to safe thresholds.

Key Features

  • Configurable memory limits via environment variables, config.toml, or programmatically
  • Automatic buffer flushing when memory threshold exceeded (default 80%)
  • Graceful degradation when psutil unavailable
  • Works with thread-based parallelism (extract/load stages)
  • Uses weakref.WeakSet to track active writers without preventing GC

Configuration options:

  • max_memory_mb: Memory limit in MB (required to enable)
  • memory_check_interval: Check frequency in seconds (default: 2.0)
  • flush_threshold_percent: Flush trigger percentage (default: 0.8)

Usage:

import dlt
os.environ["DATA_WRITER__MAX_MEMORY_MB"] = "2048"
pipeline = dlt.pipeline(progress="memory_aware")

TOML Configuration

[data_writer]
max_memory_mb = 2048
memory_check_interval = 5.0
flush_threshold_percent = 0.8

Components added:

  • dlt/common/runtime/memory_collector.py: Core memory monitoring
  • Documentation and examples in performance guide

Components modified:

  • dlt/common/data_writers/buffered.py: Writer registration
  • dlt/pipeline/progress.py: Added "memory_aware" collector option

### 🛡️ **Robust Implementation**
- Uses `psutil` for accurate memory monitoring with graceful fallback
- `weakref.WeakSet` tracking prevents memory leaks
- Extends existing `LogCollector` for consistency with dlt's collector architecture
- Thread-safe operation with dlt's parallelism



## Technical Implementation Details

### Core Components

1. **`MemoryAwareCollector`** (`dlt/common/runtime/memory_collector.py`)
   - Extends `LogCollector` for consistency
   - Configurable via `@configspec` and `@with_config` decorators
   - Periodically checks memory usage during `update()` calls
   - Triggers buffer flushes when thresholds exceeded

2. **Writer Registration System**
   - Global `weakref.WeakSet` tracks active `BufferedDataWriter` instances
   - Automatic registration in `BufferedDataWriter.__init__()`
   - Safe cleanup when writers are garbage collected

3. **Memory Monitoring Logic**
   - Uses `psutil.Process().memory_info().rss` for accurate RSS memory measurement
   - Configurable check intervals to balance monitoring overhead vs. responsiveness
   - Threshold-based flushing with user-defined percentages

### Integration Points

- **`dlt/pipeline/progress.py`**: Added `"memory_aware"` collector option
- **`dlt/common/data_writers/buffered.py`**: Writer registration with fallback for missing dependency

### Memory Coverage


❌ **Does not cover**:
- Normalize stage: Limited coverage (main process only) so no process-based parallelism 
- External library memory usage (pandas, pyarrow internal buffers)

## Future Enhancements

__purely AI, i am just leaving them here__

Potential future improvements (not included in this PR):
- Process-based memory monitoring for normalize stage
- Integration with external memory monitoring tools (implement callbacks)
- Adaptive buffer sizing based on available memory (

Copy link

netlify bot commented Aug 19, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 174ee35
🔍 Latest deploy log https://app.netlify.com/projects/dlt-hub-docs/deploys/68a4941346457400073b432f

@djudjuu djudjuu marked this pull request as draft August 19, 2025 08:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant