Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 89 additions & 45 deletions dev_kit_mcp_server/tools/explore/search_text.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Module for searching text content in files."""

import asyncio
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional

import aiofiles
import git

from ...core import AsyncOperation
Expand All @@ -16,7 +18,70 @@

name = "search_text"

def _search_text(
async def _process_file_async(
self,
file_path: Path,
compiled_pattern: re.Pattern[str],
context: Optional[int] = None,
) -> tuple[List[Dict[str, Any]], int]:
"""Process a single file asynchronously for pattern matches.

Args:
file_path: Path to the file to process
compiled_pattern: Compiled regex pattern to search for
context: Number of context lines to include before/after matches

Returns:
Tuple of (matches found in file, number of lines searched)

"""
matches = []
lines_searched = 0

try:
async with aiofiles.open(file_path, "r", encoding="utf-8", errors="ignore") as f:
lines = await f.readlines()

lines_searched = len(lines)

# Find matching lines
for line_num, line in enumerate(lines, 1):
if compiled_pattern.search(line):
# Get relative path from project root
try:
relative_path = file_path.relative_to(self._root_path)
except ValueError:
relative_path = file_path

Check warning on line 54 in dev_kit_mcp_server/tools/explore/search_text.py

View check run for this annotation

Codecov / codecov/patch

dev_kit_mcp_server/tools/explore/search_text.py#L53-L54

Added lines #L53 - L54 were not covered by tests

match_data = {
"file": str(relative_path),
"line_number": line_num,
"line": line.rstrip("\n\r"),
}

# Add context lines if requested
if context is not None and context > 0:
start_line = max(0, line_num - 1 - context)
end_line = min(len(lines), line_num + context)

context_lines = []
for i in range(start_line, end_line):
context_lines.append({
"line_number": i + 1,
"line": lines[i].rstrip("\n\r"),
"is_match": i == line_num - 1,
})
match_data["context"] = context_lines

matches.append(match_data)

except (UnicodeDecodeError, OSError, PermissionError):

Check warning on line 78 in dev_kit_mcp_server/tools/explore/search_text.py

View check run for this annotation

Codecov / codecov/patch

dev_kit_mcp_server/tools/explore/search_text.py#L78

Added line #L78 was not covered by tests
# Skip binary files or files with access issues
pass

Check warning on line 80 in dev_kit_mcp_server/tools/explore/search_text.py

View check run for this annotation

Codecov / codecov/patch

dev_kit_mcp_server/tools/explore/search_text.py#L80

Added line #L80 was not covered by tests

return matches, lines_searched

async def _search_text(
self,
pattern: str,
files: Optional[List[str]] = None,
Expand Down Expand Up @@ -78,54 +143,33 @@
raise ValueError(f"Path is not a file: {file_str}")
search_files.append(file_path)

# Search for matches
# Process files concurrently with limited concurrency
# Use a semaphore to limit concurrent file operations
max_concurrent_files = min(20, len(search_files)) # Limit to 20 concurrent files
semaphore = asyncio.Semaphore(max_concurrent_files)

async def process_file_with_semaphore(file_path: Path) -> tuple[List[Dict[str, Any]], int]:
async with semaphore:
return await self._process_file_async(file_path, compiled_pattern, context)

# Process all files concurrently
tasks = [process_file_with_semaphore(file_path) for file_path in search_files]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Collect results and handle any exceptions
matches: List[Dict[str, Any]] = []
total_files_searched = 0
total_lines_searched = 0

for file_path in search_files:
total_files_searched += 1
try:
# Try to read as text file
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()

total_lines_searched += len(lines)

# Find matching lines
for line_num, line in enumerate(lines, 1):
if compiled_pattern.search(line):
# Get relative path from project root
try:
relative_path = file_path.relative_to(self._root_path)
except ValueError:
relative_path = file_path

match_data = {
"file": str(relative_path),
"line_number": line_num,
"line": line.rstrip("\n\r"),
}

# Add context lines if requested
if context is not None and context > 0:
start_line = max(0, line_num - 1 - context)
end_line = min(len(lines), line_num + context)

context_lines = []
for i in range(start_line, end_line):
context_lines.append({
"line_number": i + 1,
"line": lines[i].rstrip("\n\r"),
"is_match": i == line_num - 1,
})
match_data["context"] = context_lines

matches.append(match_data)

except (UnicodeDecodeError, OSError, PermissionError):
# Skip binary files or files with access issues
for result in results:
if isinstance(result, BaseException):
# Log the exception but continue processing other files
continue
else:
file_matches, lines_count = result
matches.extend(file_matches)
total_files_searched += 1
total_lines_searched += lines_count

# Prepare output
content_lines = [f"Text search results for pattern '{pattern}':", ""]
Expand Down Expand Up @@ -182,7 +226,7 @@

"""
try:
result = self._search_text(pattern, files, context, max_chars)
result = await self._search_text(pattern, files, context, max_chars)
return {
"status": "success",
"message": (
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ dependencies = [
"mcp>=1.8.1",
"fastmcp>=2.3.3",
"gitpython>=3.1.43",
"toml>=0.10.2"
"toml>=0.10.2",
"aiofiles>=24.1.0"

]
[project.optional-dependencies]
Expand All @@ -51,6 +52,7 @@ dev = [
"pytest-parametrization>=2022",
"ruff>=0.8.2",
"pytest-asyncio>=0.23.5",
"types-aiofiles>=24.1.0",
]
docs = [
"sphinx>=8.2.0; python_version >= '3.11'",
Expand Down
157 changes: 157 additions & 0 deletions tests/tools/explore/test_search_text_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Performance tests for SearchTextOperation async implementation."""

import asyncio
import tempfile
import time
from pathlib import Path

import git
import pytest

from dev_kit_mcp_server.tools import SearchTextOperation


@pytest.fixture
def large_test_setup():
"""Create a test environment with many files for performance testing."""
with tempfile.TemporaryDirectory() as temp_dir:
# Initialize git repository
git.Repo.init(temp_dir)

# Create multiple files with varying content
files_created = []

# Create 50 files with content to search
for i in range(50):
file_path = Path(temp_dir) / f"test_file_{i:03d}.py"
with open(file_path, "w") as f:
f.write(f"# Test file {i}\n")
f.write("import os\n")
f.write("import sys\n")
f.write(f"def function_{i}():\n")
f.write(f' """This is function {i}"""\n')
f.write(f" return {i}\n")
f.write("\n")
f.write("if __name__ == '__main__':\n")
f.write(f" print(function_{i}())\n")
# Add some random content to make files different sizes
for j in range(i % 10):
f.write(f"# Additional line {j} in file {i}\n")
files_created.append(str(file_path))

# Also create some larger files
for i in range(5):
file_path = Path(temp_dir) / f"large_file_{i}.txt"
with open(file_path, "w") as f:
for line_num in range(1000):
if line_num % 100 == 0:
f.write(f"SEARCH_TARGET line {line_num} in large file {i}\n")
else:
f.write(f"Regular line {line_num} in large file {i}\n")
files_created.append(str(file_path))

yield temp_dir, files_created


@pytest.mark.asyncio
async def test_search_text_performance_many_files(large_test_setup):
"""Test that async search performs well with many files."""
temp_dir, files_created = large_test_setup

operation = SearchTextOperation(root_dir=temp_dir)

# Test searching for a pattern that will be found in many files
start_time = time.time()
result = await operation(pattern="import")
end_time = time.time()

duration = end_time - start_time

# Verify the search worked correctly
assert result["status"] == "success"
assert result["matches_found"] > 50 # Should find imports in the Python files
assert result["files_searched"] >= len(files_created) # Might find additional files like .git files

# Performance should be reasonable (this is a basic check)
# With async processing, it should complete within a reasonable time
assert duration < 5.0 # Should complete within 5 seconds

print(f"Search of {len(files_created)} files completed in {duration:.3f} seconds")
print(f"Found {result['matches_found']} matches in {result['files_searched']} files")


@pytest.mark.asyncio
async def test_search_text_performance_specific_files(large_test_setup):
"""Test async search performance with specific files."""
temp_dir, files_created = large_test_setup

operation = SearchTextOperation(root_dir=temp_dir)

# Get relative paths for just the large files
large_files = [f"large_file_{i}.txt" for i in range(5)]

start_time = time.time()
result = await operation(pattern="SEARCH_TARGET", files=large_files)
end_time = time.time()

duration = end_time - start_time

# Verify the search worked correctly
assert result["status"] == "success"
assert result["matches_found"] == 50 # 10 matches per file * 5 files
assert result["files_searched"] == 5

# Performance check
assert duration < 2.0 # Should complete quickly for just 5 files

print(f"Search of 5 large files completed in {duration:.3f} seconds")
print(f"Found {result['matches_found']} matches")


@pytest.mark.asyncio
async def test_search_text_concurrency_behavior(large_test_setup):
"""Test that the async implementation handles concurrent operations properly."""
temp_dir, files_created = large_test_setup

operation = SearchTextOperation(root_dir=temp_dir)

# Run multiple concurrent searches
async def search_task(pattern):
return await operation(pattern=pattern)

start_time = time.time()
results = await asyncio.gather(
search_task("import"), search_task("def"), search_task("return"), return_exceptions=True
)
end_time = time.time()

duration = end_time - start_time

# Verify all searches completed successfully
for result in results:
assert not isinstance(result, Exception)
assert result["status"] == "success"
assert result["matches_found"] > 0

# Concurrent execution should be efficient
assert duration < 10.0 # Should complete within reasonable time

print(f"3 concurrent searches completed in {duration:.3f} seconds")


@pytest.mark.asyncio
async def test_search_text_error_handling_async(large_test_setup):
"""Test that async error handling works correctly."""
temp_dir, files_created = large_test_setup

operation = SearchTextOperation(root_dir=temp_dir)

# Test with invalid regex - should handle error gracefully
result = await operation(pattern="[invalid")
assert result["status"] == "error"
assert "Invalid regex pattern" in result["message"]

# Test with non-existent file
result = await operation(pattern="test", files=["nonexistent.txt"])
assert result["status"] == "error"
assert "does not exist" in result["message"]
Loading
Loading