Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 62 additions & 22 deletions backend/src/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,22 @@ def _sanitize_filename(self, filename: str) -> str:
return "default_sanitized_filename"
return sanitized

async def _is_safe_url(self, url: str) -> bool:
async def _resolve_and_validate(self, hostname: str) -> Optional[str]:
"""
Validates URL to prevent SSRF by resolving hostname and checking for private/loopback IPs.
Resolves hostname to IP and checks if it's a safe (public) IP.
Returns the first safe IP address found, or None if validation fails.
"""
try:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ('http', 'https'):
logger.warning(f"Unsafe scheme in URL for hostname: {parsed.hostname}")
return False
hostname = parsed.hostname
if not hostname:
return False

# Run blocking DNS resolution in executor
loop = asyncio.get_running_loop()
# Use getaddrinfo to support both IPv4 and IPv6
try:
infos = await loop.run_in_executor(None, socket.getaddrinfo, hostname, None)
except socket.gaierror:
logger.warning(f"Could not resolve hostname: {hostname}")
return False
return None

safe_ip = None
for info in infos:
ip = info[4][0]
# Strip IPv6 scope ID if present (e.g. fe80::1%eth0 -> fe80::1)
Expand All @@ -118,15 +112,36 @@ async def _is_safe_url(self, url: str) -> bool:
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_loopback or ip_obj.is_private or ip_obj.is_link_local:
logger.warning(f"Blocked attempt to access private IP: {ip} for hostname: {hostname}")
return False
return None # Fail immediately if ANY resolved IP is private
Comment on lines 113 to +115
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security gap: Multicast IP addresses (224.0.0.0/4 for IPv4, ff00::/8 for IPv6) are not explicitly blocked. While these are unlikely to be useful for SSRF attacks, the ipaddress module provides is_multicast property that should be checked alongside is_loopback, is_private, and is_link_local for complete protection. Add 'or ip_obj.is_multicast' to the condition.

Copilot uses AI. Check for mistakes.
Comment on lines 113 to +115
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security gap: Reserved IP addresses (0.0.0.0/8, 240.0.0.0/4, etc.) are not explicitly blocked. The ipaddress module provides is_reserved property for IPv4 addresses. While these addresses are typically not routable, they could potentially be used in certain network configurations. Consider adding 'or ip_obj.is_reserved' to the validation check for defense in depth.

Copilot uses AI. Check for mistakes.
except ValueError:
# Fail securely if IP cannot be parsed
logger.warning(f"Could not parse IP address: {ip} for hostname: {hostname}")
return False
return True
return None

if safe_ip is None:
safe_ip = ip

return safe_ip
except Exception as e:
# Don't log full URL in exception either
logger.error(f"Error validating URL for hostname {parsed.hostname if 'parsed' in locals() else 'unknown'}: {e}")
logger.error(f"Error resolving/validating hostname {hostname}: {e}")
return None

async def _is_safe_url(self, url: str) -> bool:
"""
Validates URL to prevent SSRF by resolving hostname and checking for private/loopback IPs.
Wrapper for backward compatibility.
"""
try:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ('http', 'https'):
logger.warning(f"Unsafe scheme in URL for hostname: {parsed.hostname}")
return False
hostname = parsed.hostname
if not hostname:
return False
return await self._resolve_and_validate(hostname) is not None
except Exception as e:
logger.error(f"Error validating URL: {e}")
return False

def validate_upload(self, file: UploadFile) -> ValidationResult:
Expand Down Expand Up @@ -615,15 +630,40 @@ async def download_from_url(self, url: str, job_id: str) -> DownloadResult:

# Manual redirect handling with SSRF protection
for _ in range(5): # Max 5 redirects
if not await self._is_safe_url(current_url):
# Sanitize URL for logging (strip query params/fragments)
parsed_unsafe = urllib.parse.urlparse(current_url)
sanitized_unsafe = f"{parsed_unsafe.scheme}://{parsed_unsafe.hostname}{parsed_unsafe.path}"
msg = f"Unsafe URL detected: {sanitized_unsafe}"
parsed_current = urllib.parse.urlparse(current_url)
hostname = parsed_current.hostname

if not hostname:
return DownloadResult(success=False, message="Invalid URL: Missing hostname")

# Resolve and validate IP to prevent SSRF
safe_ip = await self._resolve_and_validate(hostname)
if not safe_ip:
sanitized_unsafe = f"{parsed_current.scheme}://{parsed_current.hostname}{parsed_current.path}"
msg = f"Unsafe URL detected or resolution failed: {sanitized_unsafe}"
logger.warning(msg)
return DownloadResult(success=False, message="Unsafe URL detected")

response = await client.get(current_url, follow_redirects=False, timeout=30.0)
if parsed_current.scheme == "http":
# TOCTOU Protection for HTTP: Use IP directly
ip_netloc = f"[{safe_ip}]" if ":" in safe_ip else safe_ip
if parsed_current.port:
ip_netloc += f":{parsed_current.port}"

# Preserve credentials if present
final_netloc = ip_netloc
if "@" in parsed_current.netloc:
auth_part = parsed_current.netloc.rsplit("@", 1)[0]
final_netloc = f"{auth_part}@{ip_netloc}"
Comment on lines +653 to +657
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security risk: Credentials are preserved when rewriting HTTP URLs to use IP addresses (lines 653-657). This means user:password will be sent to the resolved IP address with only the Host header indicating the original hostname. If DNS is compromised or returns a malicious IP during validation, credentials could be leaked to an attacker-controlled server. Consider either rejecting URLs with embedded credentials or adding a warning in the security documentation.

Copilot uses AI. Check for mistakes.

url_with_ip = parsed_current._replace(netloc=final_netloc).geturl()
headers = {"Host": hostname}
logger.info(f"Using DNS pinning for HTTP: {hostname} -> {safe_ip}")
response = await client.get(url_with_ip, headers=headers, follow_redirects=False, timeout=30.0)
else:
# HTTPS: Cannot rewrite URL without breaking SSL verification (TOCTOU risk remains)
logger.warning(f"HTTPS URL used: {hostname}. TOCTOU protection limited.")
response = await client.get(current_url, follow_redirects=False, timeout=30.0)
Comment on lines +663 to +666
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation gap: The HTTPS limitation warning (line 665) is only logged but not documented in the function's docstring or returned to the caller. Users of the API won't know that HTTPS URLs have reduced SSRF protection compared to HTTP URLs. Consider adding this to the DownloadResult or documenting it prominently in the API documentation.

Copilot uses AI. Check for mistakes.

if 300 <= response.status_code < 400:
location = response.headers.get("Location")
Expand Down
21 changes: 17 additions & 4 deletions backend/src/tests/unit/test_file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
def file_processor():
"""Pytest fixture to provide a FileProcessor instance."""
fp = FileProcessor()
# Mock _is_safe_url to always return True by default for existing tests to avoid network calls
fp._is_safe_url = mock.AsyncMock(return_value=True)
# Mock _resolve_and_validate to return a safe IP by default
fp._resolve_and_validate = mock.AsyncMock(return_value="93.184.216.34")
return fp


Expand Down Expand Up @@ -97,9 +97,10 @@ async def mock_aiter_bytes():
# But wait, we mocked _is_safe_url, so the loop runs.
# The loop calls client.get(..., follow_redirects=False).

# Verify that get was called with follow_redirects=False
# Verify that get was called with follow_redirects=False and IP pinning
expected_url = "http://93.184.216.34/download.zip"
MockAsyncClient.return_value.__aenter__.return_value.get.assert_called_with(
url, follow_redirects=False, timeout=30.0
expected_url, headers={"Host": "example.com"}, follow_redirects=False, timeout=30.0
)

@pytest.mark.asyncio
Expand Down Expand Up @@ -135,6 +136,12 @@ async def mock_aiter_bytes():
assert result.file_name == "another_example.jar"
assert expected_file_path.read_bytes() == b"jar content"

# Verify IP pinning
expected_url = "http://93.184.216.34/another_example.jar"
MockAsyncClient.return_value.__aenter__.return_value.get.assert_called_with(
expected_url, headers={"Host": "example.com"}, follow_redirects=False, timeout=30.0
)

@pytest.mark.asyncio
@mock.patch("file_processor.httpx.AsyncClient")
async def test_download_from_url_success_content_type_extension(
Expand Down Expand Up @@ -166,6 +173,12 @@ async def mock_aiter_bytes():
assert result.file_name == "some_file_no_ext"
assert expected_file_path.read_bytes() == b"java archive"

# Verify IP pinning
expected_url = "http://93.184.216.34/some_file_no_ext"
MockAsyncClient.return_value.__aenter__.return_value.get.assert_called_with(
expected_url, headers={"Host": "example.com"}, follow_redirects=False, timeout=30.0
)

@pytest.mark.asyncio
@pytest.mark.parametrize(
"status_code, error_type_expected",
Expand Down
110 changes: 110 additions & 0 deletions backend/src/tests/unit/test_file_processor_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest
import socket
import logging
from unittest import mock
import httpx
from file_processor import FileProcessor

@pytest.fixture
def file_processor():
return FileProcessor()

@pytest.mark.asyncio
async def test_resolve_and_validate_public_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock public IP: 93.184.216.34 (example.com)
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80))]

result = await file_processor._resolve_and_validate("example.com")
assert result == "93.184.216.34"

@pytest.mark.asyncio
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]
Comment on lines +22 to +25
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for comprehensive private IP ranges. The test only covers 192.168.1.1 (line 25), but should also test 10.0.0.0/8 and 172.16.0.0/12 ranges. Additionally, edge cases like 10.0.0.1, 172.16.0.1, 192.168.255.254 should be tested to ensure the entire private IP space is properly blocked.

Suggested change
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]
@pytest.mark.parametrize(
"ip",
[
# 192.168.0.0/16 range
"192.168.1.1",
"192.168.255.254",
# 10.0.0.0/8 range (including edge case)
"10.0.0.1",
"10.123.45.67",
# 172.16.0.0/12 range (including edge case)
"172.16.0.1",
"172.31.255.254",
],
)
async def test_resolve_and_validate_private_ip_ranges(file_processor, ip):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP from RFC1918 ranges
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, "", (ip, 80))
]

Copilot uses AI. Check for mistakes.

result = await file_processor._resolve_and_validate("internal.local")
assert result is None

@pytest.mark.asyncio
async def test_resolve_and_validate_mixed_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock mixed IPs: one public, one private (should fail)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))
]

result = await file_processor._resolve_and_validate("mixed.local")
assert result is None
Comment on lines +14 to +40
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test implementation issue: The mock for socket.getaddrinfo should be 'file_processor.socket.getaddrinfo' to match the import in file_processor.py (line 6: import socket), not just 'socket.getaddrinfo'. The current mocking may not actually patch the socket module used by the FileProcessor class, potentially making these tests ineffective at verifying the DNS resolution logic.

Copilot uses AI. Check for mistakes.

@pytest.mark.asyncio
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test coverage gap: The mixed IPs test (lines 31-40) verifies that ANY private IP in DNS results causes rejection, but doesn't test what happens when a hostname resolves to ONLY public IPs (e.g., multiple public IPs for load balancing). The implementation should use the first public IP found, but this behavior isn't tested.

Suggested change
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_resolve_and_validate_multiple_public_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock multiple public IPs: should select the first public IP
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.35', 80))
]
result = await file_processor._resolve_and_validate("multi.example.com")
assert result == "93.184.216.34"
@pytest.mark.asyncio

Copilot uses AI. Check for mistakes.
async def test_download_from_url_http_rewrites_to_ip(file_processor):
job_id = "test_job_http"
url = "http://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with IP URL and Host header
expected_url = "http://93.184.216.34/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0
)

@pytest.mark.asyncio
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for URLs with non-standard ports. The implementation handles ports (lines 650-651), but there are no tests verifying that http://example.com:8080/file.zip correctly rewrites to http://93.184.216.34:8080/file.zip with Host: example.com header. This is important for SSRF protection as port-based services are common attack targets.

Suggested change
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_download_from_url_http_with_non_standard_port_rewrites_to_ip(file_processor):
job_id = "test_job_http_non_standard_port"
url = "http://example.com:8080/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34:8080/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response
# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# Check that client.get was called with IP URL (preserving port) and Host header
expected_url = "http://93.184.216.34:8080/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0,
)
@pytest.mark.asyncio

Copilot uses AI. Check for mistakes.
async def test_download_from_url_https_uses_hostname(file_processor):
job_id = "test_job_https"
url = "https://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("https://example.com/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with ORIGINAL URL (no IP rewrite for HTTPS)
mock_client.get.assert_called_with(
url,
follow_redirects=False,
timeout=30.0
)
Comment on lines +1 to +110
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing critical test coverage for SSRF protection during redirects. The implementation validates each redirect URL in the loop (lines 632-683 in file_processor.py), but there are no tests verifying that a malicious redirect to a private IP is blocked. An attacker could use a public URL that redirects to http://192.168.1.1/admin or http://127.0.0.1:8080/internal to bypass the initial validation.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for IPv6 addresses. The implementation handles IPv6 (line 107-109 strips scope IDs, line 649 wraps IPv6 in brackets), but there are no tests verifying that IPv6 loopback (::1), link-local (fe80::), or unique local addresses (fc00::/7, fd00::/8) are properly blocked. These IPv6 ranges can be used for SSRF attacks just like private IPv4 addresses.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for DNS resolution failures. While _resolve_and_validate returns None on socket.gaierror (lines 100-102 in file_processor.py), there's no test verifying that download_from_url properly handles this case and returns a meaningful error message to the user.

Suggested change
)
)
@pytest.mark.asyncio
async def test_download_from_url_dns_resolution_failure(file_processor):
job_id = "test_job_dns_failure"
url = "http://unresolvable.local/file.zip"
# Simulate DNS resolution failure by having _resolve_and_validate return None
with mock.patch.object(
file_processor,
"_resolve_and_validate",
new_callable=mock.AsyncMock,
) as mock_resolve:
mock_resolve.return_value = None
# Patch AsyncClient to ensure no HTTP request is attempted
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
result = await file_processor.download_from_url(url, job_id)
# download_from_url should fail gracefully when DNS resolution fails
assert result.success is False
# _resolve_and_validate should have been called with the hostname
mock_resolve.assert_awaited_once_with("unresolvable.local")
# Ensure no outbound HTTP GET is performed on DNS failure
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_client.get.assert_not_called()

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge case: If DNS resolution succeeds but returns an empty list (infos is empty), safe_ip remains None and the function returns None. While this is secure (fail-closed), there's no test coverage for this edge case. Add a test where getaddrinfo returns an empty list to verify the behavior.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for loopback IP addresses (127.0.0.1, 127.0.0.x, ::1). While the implementation uses ipaddress.is_loopback (line 113), there are no tests verifying this protection works. Loopback addresses can be used to access services running on the same machine as the backend, which is a common SSRF attack vector.

Suggested change
)
)
@pytest.mark.asyncio
async def test_resolve_and_validate_loopback_ipv4_localhost(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock loopback IPv4: 127.0.0.1 (localhost)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.1", 80))
]
result = await file_processor._resolve_and_validate("localhost")
# Loopback IPs should be rejected
assert result is None
@pytest.mark.asyncio
async def test_resolve_and_validate_loopback_ipv4_other_127_range(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock another loopback IPv4 in 127.0.0.0/8: 127.0.0.5
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, "", ("127.0.0.5", 80))
]
result = await file_processor._resolve_and_validate("loopback.local")
# Any 127.0.0.0/8 address should be treated as loopback and rejected
assert result is None
@pytest.mark.asyncio
async def test_resolve_and_validate_loopback_ipv6(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock loopback IPv6: ::1
mock_getaddrinfo.return_value = [
(
socket.AF_INET6,
socket.SOCK_STREAM,
6,
"",
("::1", 80, 0, 0),
)
]
result = await file_processor._resolve_and_validate("ipv6-loopback.local")
# IPv6 loopback should also be rejected
assert result is None

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for link-local IP addresses (169.254.0.0/16 for IPv4, fe80::/10 for IPv6). The implementation checks is_link_local (line 113), but there are no tests validating this protection. Link-local addresses can be used to access other machines on the local network segment.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test organization: Consider organizing security tests into a TestClass (e.g., TestFileProcessorSecurity) to match the pattern used in test_file_processor.py (class TestFileProcessor at line 53). This improves test organization and makes it easier to share fixtures and setup/teardown logic across security-specific tests.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for protocol downgrade attacks. An HTTPS URL could redirect to an HTTP URL (or vice versa). While both are validated for SSRF, there's no test verifying this scenario works correctly. Additionally, redirects from HTTP to HTTPS would lose the TOCTOU protection provided by DNS pinning, which should be tested.

Suggested change
)
)
@pytest.mark.asyncio
async def test_download_from_url_https_redirects_to_http_uses_ip_pinning(file_processor):
job_id = "test_job_https_to_http"
url = "https://example.com/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
# First response: HTTPS redirect to HTTP
redirect_response = mock.AsyncMock(spec=httpx.Response)
redirect_response.status_code = 301
redirect_response.headers = {
"Location": "http://example.com/file_redirected.zip"
}
redirect_response.url = httpx.URL("https://example.com/file.zip")
# Final response: HTTP 200 OK serving the file
final_response = mock.AsyncMock(spec=httpx.Response)
final_response.status_code = 200
final_response.headers = {
"Content-Disposition": 'attachment; filename="file_redirected.zip"'
}
final_response.url = httpx.URL("http://example.com/file_redirected.zip")
async def final_aiter_bytes():
yield b"redirected-content"
final_response.aiter_bytes = final_aiter_bytes
# Side effect to return redirect first, then final response
responses = [redirect_response, final_response]
async def get_side_effect(*args, **kwargs):
return responses.pop(0)
mock_client.get.side_effect = get_side_effect
# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# We expect two requests: initial HTTPS, then HTTP pinned to the validated IP
assert len(mock_client.get.call_args_list) == 2
first_call_url = str(mock_client.get.call_args_list[0].args[0])
second_call_url = str(mock_client.get.call_args_list[1].args[0])
assert first_call_url == url
# After downgrade to HTTP, the URL should be rewritten to use the validated IP
assert second_call_url.startswith("http://93.184.216.34")
assert second_call_url.endswith("/file_redirected.zip")
@pytest.mark.asyncio
async def test_download_from_url_http_redirects_to_https_loses_dns_pinning(file_processor):
job_id = "test_job_http_to_https"
url = "http://example.com/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
# First response: HTTP redirect to HTTPS
redirect_response = mock.AsyncMock(spec=httpx.Response)
redirect_response.status_code = 301
redirect_response.headers = {
"Location": "https://example.com/file_secure.zip"
}
redirect_response.url = httpx.URL("http://example.com/file.zip")
# Final response: HTTPS 200 OK serving the file
final_response = mock.AsyncMock(spec=httpx.Response)
final_response.status_code = 200
final_response.headers = {
"Content-Disposition": 'attachment; filename="file_secure.zip"'
}
final_response.url = httpx.URL("https://example.com/file_secure.zip")
async def final_aiter_bytes():
yield b"secure-content"
final_response.aiter_bytes = final_aiter_bytes
# Side effect to return redirect first, then final response
responses = [redirect_response, final_response]
async def get_side_effect(*args, **kwargs):
return responses.pop(0)
mock_client.get.side_effect = get_side_effect
# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# We expect two requests: initial HTTP pinned to IP, then HTTPS using hostname
assert len(mock_client.get.call_args_list) == 2
first_call_url = str(mock_client.get.call_args_list[0].args[0])
second_call_url = str(mock_client.get.call_args_list[1].args[0])
# HTTP request should be rewritten to use the validated IP for TOCTOU protection
assert first_call_url.startswith("http://93.184.216.34")
assert first_call_url.endswith("/file.zip")
# After redirect to HTTPS, the request should use the original hostname (no IP pinning)
assert second_call_url == "https://example.com/file_secure.zip"

Copilot uses AI. Check for mistakes.