Skip to content

feat: add memory-aware collector for automatic buffer flushing #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Dockerfile.ram-demo
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Dockerfile for dlt memory management demo
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy the entire dlt repository
COPY . .

# Install dlt with duckdb and rest_api support, plus psutil for memory monitoring
RUN pip install -e ".[duckdb,rest_api]" psutil

# Copy demo scripts
COPY playground/ram-demo/demo_memory_pressure.py /app/demo_memory_pressure.py
COPY playground/ram-demo/jaffle_source.py /app/jaffle_source.py

# Set default environment
ENV PYTHONUNBUFFERED=1

# Default command runs the demo script
CMD ["python", "demo_memory_pressure.py"]
12 changes: 12 additions & 0 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.utils import uniq_id

# Import memory monitoring registration function
try:
from dlt.common.runtime.memory_collector import register_buffered_writer
except ImportError:
# Fallback if memory collector is not available
def register_buffered_writer(writer: Any) -> None:
pass


def new_file_id() -> str:
"""Creates new file id which is globally unique within table_name scope"""
Expand Down Expand Up @@ -76,6 +84,10 @@ def __init__(
self._created: float = None
self._last_modified: float = None
self._closed = False

# Register with memory monitoring system
register_buffered_writer(self)

try:
self._rotate_file()
except TypeError:
Expand Down
191 changes: 191 additions & 0 deletions dlt/common/runtime/memory_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import os
import sys
import logging
import time
import weakref
from typing import (
Any,
Dict,
List,
Optional,
Set,
TextIO,
Union,
)

from dlt.common import logger as dlt_logger
from dlt.common.configuration import with_config, known_sections, configspec
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.runtime.collector import LogCollector

# Global registry to track active BufferedDataWriter instances
_ACTIVE_WRITERS: Optional[weakref.WeakSet[Any]] = None


def register_buffered_writer(writer: Any) -> None:
"""Register a BufferedDataWriter instance for memory-based flushing"""
global _ACTIVE_WRITERS
if _ACTIVE_WRITERS is None:
_ACTIVE_WRITERS = weakref.WeakSet()
_ACTIVE_WRITERS.add(writer)


def get_active_writers() -> Set[Any]:
"""Get all currently active BufferedDataWriter instances"""
global _ACTIVE_WRITERS
if _ACTIVE_WRITERS is None:
return set()
return set(_ACTIVE_WRITERS)


@configspec
class MemoryAwareCollectorConfiguration(BaseConfiguration):
"""Configuration for memory-aware collector"""

max_memory_mb: Optional[int] = None # RAM limit in MB (None disables monitoring)
memory_check_interval: Optional[float] = 2.0 # How often to check memory (seconds)
flush_threshold_percent: Optional[float] = 0.8 # Flush when this % of limit is reached


class MemoryAwareCollector(LogCollector):
"""A Collector that monitors RAM usage and flushes buffers when memory limits are exceeded"""

@with_config(spec=MemoryAwareCollectorConfiguration, sections=known_sections.DATA_WRITER)
def __init__(
self,
log_period: float = 1.0,
logger: Union[logging.Logger, TextIO] = sys.stdout,
log_level: int = logging.INFO,
dump_system_stats: bool = True,
max_memory_mb: Optional[int] = None,
memory_check_interval: Optional[float] = 2.0,
flush_threshold_percent: Optional[float] = 0.8,
) -> None:
"""
Memory-aware collector that extends LogCollector with RAM monitoring.

Args:
max_memory_mb: Maximum memory usage in MB before triggering buffer flushes
memory_check_interval: How often to check memory usage (seconds). Defaults to 5.0
flush_threshold_percent: Flush buffers when this percentage of memory limit is reached. Defaults to 0.8 (80%)
"""
super().__init__(log_period, logger, log_level, dump_system_stats)

# Initialize parent class state (normally done in _start)
from collections import defaultdict

self.counters = defaultdict(int)
self.counter_info = {}
self.messages = {}
self.last_log_time = time.time()
self.step = "Memory Monitoring" # Set a default step name

# Set memory monitoring configuration
self.max_memory_mb = max_memory_mb
self.memory_check_interval = memory_check_interval
self.flush_threshold_percent = flush_threshold_percent
self.last_memory_check: float = 0

# Track if we've enabled memory monitoring
# Check if psutil is available
try:
import psutil

self.memory_monitoring_enabled = self.max_memory_mb is not None

if self.memory_monitoring_enabled:
self._log(
logging.INFO,
f"Memory monitoring enabled with limit: {self.max_memory_mb}MB "
f"(flush threshold: {self.flush_threshold_percent*100:.1f}%)",
)
except ImportError:
self.memory_monitoring_enabled = False
self._log(logging.WARNING, "`psutil` not available. Memory monitoring disabled.")

def update(
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
"""Update counters and check memory usage if enabled"""
# Call parent update method
super().update(name, inc, total, inc_total, message, label)

# Check memory usage periodically
if self.memory_monitoring_enabled:
self._check_memory_usage()

def _check_memory_usage(self) -> None:
"""Check current memory usage and flush buffers if necessary"""
current_time = time.time()
if current_time - self.last_memory_check < self.memory_check_interval:
return

self.last_memory_check = current_time

try:
import psutil

process = psutil.Process(os.getpid())
current_mem_mb = process.memory_info().rss / (1024**2)

threshold_mb = self.max_memory_mb * self.flush_threshold_percent

if current_mem_mb > threshold_mb:
self._log(
logging.WARNING,
f"Memory usage ({current_mem_mb:.1f}MB) exceeds threshold "
f"({threshold_mb:.1f}MB). Flushing buffers...",
)
self._flush_all_buffers()

# Check again after flushing
new_mem_mb = psutil.Process(os.getpid()).memory_info().rss / (1024**2)
freed_mb = current_mem_mb - new_mem_mb
self._log(
logging.INFO,
f"Buffer flush completed. Memory freed: {freed_mb:.1f}MB. "
f"Current usage: {new_mem_mb:.1f}MB",
)

except ImportError:
# psutil not available, disable memory monitoring
self.memory_monitoring_enabled = False
self._log(logging.WARNING, "psutil not available. Memory monitoring disabled.")
except Exception as e:
self._log(logging.ERROR, f"Error checking memory usage: {e}")

def _flush_all_buffers(self) -> None:
"""Flush all registered BufferedDataWriter instances"""
active_writers = get_active_writers()
flushed_count = 0
total_items_flushed = 0

for writer in active_writers:
try:
if hasattr(writer, "_flush_items") and hasattr(writer, "_buffered_items"):
buffered_count = getattr(writer, "_buffered_items_count", 0)

if writer._buffered_items: # Only flush if there are items
writer._flush_items()
flushed_count += 1
total_items_flushed += buffered_count
except Exception as e:
self._log(logging.ERROR, f"Error flushing buffer for writer {writer}: {e}")

if flushed_count > 0:
self._log(
logging.INFO,
f"Flushed {flushed_count} buffered writers ({total_items_flushed} items) due to"
" memory pressure",
)
elif len(active_writers) > 0:
self._log(
logging.DEBUG,
f"No buffered data found to flush in {len(active_writers)} active writers",
)
5 changes: 4 additions & 1 deletion dlt/pipeline/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
EnlightenCollector as enlighten,
AliveCollector as alive_progress,
)
from dlt.common.runtime.memory_collector import MemoryAwareCollector as memory_aware
from dlt.common.runtime.collector import Collector as _Collector, NULL_COLLECTOR as _NULL_COLLECTOR

TSupportedCollectors = Literal["tqdm", "enlighten", "log", "alive_progress"]
TSupportedCollectors = Literal["tqdm", "enlighten", "log", "alive_progress", "memory_aware"]
TCollectorArg = Union[_Collector, TSupportedCollectors]


Expand All @@ -27,5 +28,7 @@ def _from_name(collector: TCollectorArg) -> _Collector:
return log()
if collector == "alive_progress":
return alive_progress()
if collector == "memory_aware":
return memory_aware()
raise ValueError(collector)
return collector
34 changes: 34 additions & 0 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,40 @@ The default buffer is actually set to a moderately low value (**5000 items**), s
on IoT sensors or other tiny infrastructures, you might actually want to increase it to speed up
processing.

### Controlling memory usage

`dlt` provides automatic memory management to prevent out-of-memory errors during large data processing tasks. The **memory-aware collector** monitors RAM usage and automatically flushes buffered data when memory consumption exceeds configurable thresholds.

:::info
This feature requires the `psutil` package to be installed for memory monitoring.
:::

#### Enabling memory monitoring

<!--@@@DLT_SNIPPET ./performance_snippets/performance-snippets.py::memory_management-->

#### Configuration options

You can configure memory monitoring behavior using the following options in your `config.toml` under the `[data_writer]` section:

<!--@@@DLT_SNIPPET ./performance_snippets/toml-snippets.toml::memory_management_toml-->

| Option | Description | Default |
|--------|-------------|---------|
| `max_memory_mb` | Maximum memory usage in MB before triggering buffer flushes | None (disabled) |
| `memory_check_interval` | How often to check memory usage (seconds) | 2.0 |
| `flush_threshold_percent` | Percentage of memory limit that triggers flushing | 0.8 (80%) |

:::tip
Memory monitoring works for all threads spawned by the `dlt` main process, so multi-threading during extract and load stages is covered.
Memory monitoring for multi-processing (normalize) is not supported yet.
:::

:::note
If `psutil` is not available, memory monitoring will be automatically disabled and `dlt` will continue to work normally.
:::


### Controlling intermediary file size and rotation
`dlt` writes data to intermediary files. You can control the file size and the number of created files by setting the maximum number of data items stored in a single file or the maximum single file size. Keep in mind that the file size is computed after compression has been performed.
* `dlt` uses a custom version of the [JSON file format](../dlt-ecosystem/file-formats/jsonl.md) between the **extract** and **normalize** stages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,33 @@ async def _run_async():
# @@@DLT_SNIPPET_END parallel_pipelines


def memory_management_snippet() -> None:
# @@@DLT_SNIPPET_START memory_management
import os
import dlt

# Set memory limit via environment variable
os.environ["DATA_WRITER__MAX_MEMORY_MB"] = "2048"

# Use the memory-aware collector
pipeline = dlt.pipeline(
pipeline_name="memory_limited_pipeline",
destination="duckdb",
progress="memory_aware", # Enable memory monitoring
)

# Alternative: Create custom memory-aware collector
from dlt.common.runtime.memory_collector import MemoryAwareCollector

collector = MemoryAwareCollector(
max_memory_mb=1024, # 1GB memory limit
flush_threshold_percent=0.8, # Flush when 80% of limit reached
memory_check_interval=3.0, # Check memory every 3 seconds
)

pipeline = dlt.pipeline(progress=collector)
# @@@DLT_SNIPPET_END memory_management


def test_toml_snippets() -> None:
parse_toml_file("./toml-snippets.toml")
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,12 @@ next_item_mode="fifo"
# @@@DLT_SNIPPET_START compression_toml
[normalize.data_writer]
disable_compression=true
# @@@DLT_SNIPPET_END compression_toml
# @@@DLT_SNIPPET_END compression_toml


# @@@DLT_SNIPPET_START memory_management_toml
[data_writer]
max_memory_mb = 2048 # Maximum memory usage in MB (required)
memory_check_interval = 5.0 # How often to check memory usage in seconds
flush_threshold_percent = 0.8 # Flush buffers at 80% of memory limit
# @@@DLT_SNIPPET_END memory_management_toml
4 changes: 4 additions & 0 deletions playground/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[data_writer]
max_memory_mb = 200
memory_check_interval = 1.0
flush_threshold_percent = 0.8
Loading
Loading