Skip to content

perf: implement memory optimization for file processing and content handling #409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions src/gitingest/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,28 @@ async def _write_output(tree: str, content: str, target: str | None) -> None:
The path to the output file. If ``None``, the results are not written to a file.

"""
data = f"{tree}\n{content}"
loop = asyncio.get_running_loop()

if target == "-":
await loop.run_in_executor(None, sys.stdout.write, data)
# Write to stdout in chunks to avoid large memory allocation
await loop.run_in_executor(None, sys.stdout.write, tree)
await loop.run_in_executor(None, sys.stdout.write, "\n")
await loop.run_in_executor(None, sys.stdout.write, content)
await loop.run_in_executor(None, sys.stdout.flush)
elif target is not None:
await loop.run_in_executor(None, Path(target).write_text, data, "utf-8")
# Write to file in chunks to avoid large memory allocation
target_path = Path(target)

# Define synchronous functions for file operations
def write_tree() -> None:
with target_path.open("w", encoding="utf-8") as f:
f.write(tree)
f.write("\n")

def append_content() -> None:
with target_path.open("a", encoding="utf-8") as f:
f.write(content)

# Execute file operations
await loop.run_in_executor(None, write_tree)
await loop.run_in_executor(None, append_content)
37 changes: 34 additions & 3 deletions src/gitingest/ingestion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
"""Functions to ingest and analyze a codebase directory or single file."""
"""Functions to ingest and analyze a codebase directory or single file.

Memory optimization:
- Lazy loading: File content is only loaded when accessed and cached to avoid repeated reads
- Chunked reading: Large files are read in chunks to avoid loading everything at once
- Content cache clearing: Periodically clears content cache to free memory during processing
- Memory limits: Skips files that would cause excessive memory usage
- Early termination: Stops processing when limits are reached
"""

from __future__ import annotations

Expand Down Expand Up @@ -65,7 +73,12 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:
msg = f"File {file_node.name} has no content"
raise ValueError(msg)

return format_node(file_node, query=query)
result = format_node(file_node, query=query)

# Clear content cache to free memory
file_node.clear_content_cache()

return result

root_node = FileSystemNode(
name=path.name,
Expand All @@ -78,7 +91,12 @@ def ingest_query(query: IngestionQuery) -> tuple[str, str, str]:

_process_node(node=root_node, query=query, stats=stats)

return format_node(root_node, query=query)
result = format_node(root_node, query=query)

# Clear content cache to free memory after formatting
root_node.clear_content_cache()

return result


def _process_node(node: FileSystemNode, query: IngestionQuery, stats: FileSystemStats) -> None:
Expand Down Expand Up @@ -173,6 +191,8 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat
This function checks the file's size, increments the statistics, and reads its content.
If the file size exceeds the maximum allowed, it raises an error.

Implements memory optimization by checking limits before processing files.

Parameters
----------
path : Path
Expand All @@ -194,6 +214,11 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat
print(f"Skipping file {path}: would exceed total size limit")
return

# Skip very large files that would consume too much memory
if file_size > MAX_TOTAL_SIZE_BYTES / 10: # Limit single file to 10% of total limit
print(f"Skipping file {path}: file is too large for memory-efficient processing")
return

stats.total_files += 1
stats.total_size += file_size

Expand All @@ -211,6 +236,12 @@ def _process_file(path: Path, parent_node: FileSystemNode, stats: FileSystemStat
parent_node.size += file_size
parent_node.file_count += 1

# If we've processed a lot of files, clear any cached content to free memory
if stats.total_files % 100 == 0:
for sibling in parent_node.children[:-10]: # Keep the 10 most recent files cached
if sibling.type == FileSystemNodeType.FILE:
sibling.clear_content_cache()


def limit_exceeded(stats: FileSystemStats, depth: int) -> bool:
"""Check if any of the traversal limits have been exceeded.
Expand Down
184 changes: 165 additions & 19 deletions src/gitingest/output_formatter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
"""Functions to ingest and analyze a codebase directory or single file."""
"""Functions to ingest and analyze a codebase directory or single file.

Memory optimization:
- Generator-based processing: Uses generators to process files one at a time
- Streaming approach: Avoids loading all file contents into memory at once
- Works with lazy loading: Complements the lazy loading in FileSystemNode
"""

from __future__ import annotations

from typing import TYPE_CHECKING
import gc
import io
from typing import TYPE_CHECKING, Generator

import tiktoken

Expand Down Expand Up @@ -47,12 +55,45 @@ def format_node(node: FileSystemNode, query: IngestionQuery) -> tuple[str, str,

tree = "Directory structure:\n" + _create_tree_structure(query, node=node)

content = _gather_file_contents(node)
# Estimate tokens for tree
tree_tokens = _count_tokens(tree)

# For token estimation, we need to sample some content
# We'll use a small sample to estimate without loading everything
content_sample = ""
content_generator = _gather_file_contents(node)

# Try to get a small sample for token estimation
try:
# Get first item from generator for sampling
first_item = next(content_generator)
sample_size = min(len(first_item), 10000) # Limit sample size
content_sample = first_item[:sample_size]
except StopIteration:
# No content
pass

# Estimate tokens based on sample
sample_tokens = _count_tokens(content_sample)

# If we have a sample, extrapolate total tokens based on file sizes
if sample_tokens > 0 and len(content_sample) > 0:
# Estimate tokens per byte
tokens_per_byte = sample_tokens / len(content_sample)
# Estimate total tokens based on total file size
estimated_content_tokens = int(node.size * tokens_per_byte)
total_tokens = tree_tokens + estimated_content_tokens
else:
total_tokens = tree_tokens

token_estimate = _format_token_count(tree + content)
token_estimate = _format_token_count(total_tokens)
if token_estimate:
summary += f"\nEstimated tokens: {token_estimate}"

# For backward compatibility with tests, return content as a string
# But use a more memory-efficient approach by processing files in chunks
content = _gather_content_string(node)

return summary, tree, content


Expand Down Expand Up @@ -93,28 +134,115 @@ def _create_summary_prefix(query: IngestionQuery, *, single_file: bool = False)
return "\n".join(parts) + "\n"


def _gather_file_contents(node: FileSystemNode) -> str:
def _gather_file_contents(node: FileSystemNode) -> Generator[str]:
"""Recursively gather contents of all files under the given node.

This function recursively processes a directory node and gathers the contents of all files
under that node. It returns the concatenated content of all files as a single string.
This function recursively processes a directory node and yields the contents of all files
under that node one at a time. Instead of concatenating all content into a single string,
it returns a generator that yields each file's content separately.

The implementation is memory-efficient, processing one file at a time and using
generators to avoid loading all content into memory at once.

Parameters
----------
node : FileSystemNode
The current directory or file node being processed.

Yields
------
Generator[str]
The content of each file as a string.

"""
if node.type != FileSystemNodeType.DIRECTORY:
yield node.content_string
# Clear content cache immediately after yielding to free memory
node.clear_content_cache()
else:
# Process one child at a time to avoid loading all content at once
for child in node.children:
yield from _gather_file_contents(child)


def _gather_content_string(node: FileSystemNode) -> str:
"""Gather file contents as a string, but in a memory-efficient way.

This function processes files in chunks to avoid loading all content into memory at once.
It builds the content string incrementally, clearing file content caches as it goes.

For very large repositories, it uses a more aggressive chunking strategy to minimize memory usage.

Parameters
----------
node : FileSystemNode
The file system node to process.

Returns
-------
str
The concatenated content of all files under the given node.
The combined content string.

"""
if node.type != FileSystemNodeType.DIRECTORY:
return node.content_string

# Recursively gather contents of all files under the current directory
return "\n".join(_gather_file_contents(child) for child in node.children)
# For very small repositories (less than 10MB), use simple approach
if node.size < 10 * 1024 * 1024:
content_chunks = list(_gather_file_contents(node))
return "\n".join(content_chunks)

# For medium repositories (10MB to 100MB), use chunked approach
if node.size < 100 * 1024 * 1024:
# Use a list to accumulate content chunks
content_chunks = []
chunk_size = 0
max_chunk_size = 5 * 1024 * 1024 # 5MB per chunk

# Process files in batches to limit memory usage
for content_item in _gather_file_contents(node):
content_chunks.append(content_item)
chunk_size += len(content_item)

# If we've accumulated enough content, join it and reset
if chunk_size >= max_chunk_size:
# Join the current chunks
joined_chunk = "\n".join(content_chunks)
# Reset the chunks list with just the joined chunk
content_chunks = [joined_chunk]
# Update the chunk size
chunk_size = len(joined_chunk)

# Join any remaining chunks
return "\n".join(content_chunks)

# For large repositories (over 100MB), use a hybrid approach with StringIO
# Use StringIO as a memory-efficient buffer
buffer = io.StringIO()
flush_interval = 100 # Flush to string every 100 files

# Process files and write to buffer
for i, content_item in enumerate(_gather_file_contents(node)):
buffer.write(content_item)
buffer.write("\n")

# Periodically get the current value to avoid buffer growing too large
if (i + 1) % flush_interval == 0:
# Get current value
current_value = buffer.getvalue()

# Reset buffer
buffer.close()
buffer = io.StringIO()

# Write current value back to buffer
buffer.write(current_value)

# Force garbage collection to free memory
gc.collect()

# Get final result
result = buffer.getvalue()
buffer.close()

return result


def _create_tree_structure(
Expand Down Expand Up @@ -169,25 +297,43 @@ def _create_tree_structure(
return tree_str


def _format_token_count(text: str) -> str | None:
"""Return a human-readable token-count string (e.g. 1.2k, 1.2 M).
def _count_tokens(text: str) -> int:
"""Count the number of tokens in a text string.

Parameters
----------
text : str
The text string for which the token count is to be estimated.
The text string for which to count tokens.

Returns
-------
str | None
The formatted number of tokens as a string (e.g., ``"1.2k"``, ``"1.2M"``), or ``None`` if an error occurs.
int
The number of tokens in the text, or 0 if an error occurs.

"""
try:
encoding = tiktoken.get_encoding("o200k_base") # gpt-4o, gpt-4o-mini
total_tokens = len(encoding.encode(text, disallowed_special=()))
return len(encoding.encode(text, disallowed_special=()))
except (ValueError, UnicodeEncodeError) as exc:
print(exc)
return 0


def _format_token_count(total_tokens: int) -> str | None:
"""Return a human-readable token-count string (e.g. 1.2k, 1.2 M).

Parameters
----------
total_tokens : int
The number of tokens to format.

Returns
-------
str | None
The formatted number of tokens as a string (e.g., ``"1.2k"``, ``"1.2M"``), or ``None`` if total_tokens is 0.

"""
if total_tokens == 0:
return None

for threshold, suffix in _TOKEN_THRESHOLDS:
Expand Down
Loading
Loading