Skip to content

Comments

🛡️ Sentinel: [HIGH] Fix SSRF TOCTOU vulnerability in file downloads#597

Open
anchapin wants to merge 1 commit intomainfrom
sentinel-fix-ssrf-toctou-11002891422496069375
Open

🛡️ Sentinel: [HIGH] Fix SSRF TOCTOU vulnerability in file downloads#597
anchapin wants to merge 1 commit intomainfrom
sentinel-fix-ssrf-toctou-11002891422496069375

Conversation

@anchapin
Copy link
Owner

  • 🚨 Severity: HIGH
  • 💡 Vulnerability: SSRF via DNS Rebinding (TOCTOU) in FileProcessor.download_from_url. An attacker could control a DNS server to return a safe IP during check and a private IP during fetch.
  • 🎯 Impact: Access to internal network services via the backend.
  • 🔧 Fix: Implemented DNS pinning for HTTP requests. Hostname is resolved and validated once, then the resolved IP is used for the request while preserving the Host header. For HTTPS, a warning is logged as full mitigation requires more complex transport changes.
  • ✅ Verification: Added backend/src/tests/unit/test_file_processor_security.py and updated backend/src/tests/unit/test_file_processor.py. Verified that requests use the validated IP address.

PR created automatically by Jules for task 11002891422496069375 started by @anchapin

- Implemented DNS pinning for HTTP downloads in `FileProcessor`
- Extracted `_resolve_and_validate` for reusable DNS security checks
- Updated tests to verify IP-based routing and Host header preservation
- Added specific security tests for DNS resolution logic

Co-authored-by: anchapin <6326294+anchapin@users.noreply.github.com>
Copilot AI review requested due to automatic review settings February 22, 2026 00:35
@google-labs-jules
Copy link
Contributor

👋 Jules, reporting for duty! I'm here to lend a hand with this pull request.

When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down.

I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job!

For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with @jules. You can find this option in the Pull Request section of your global Jules UI settings. You can always switch back!

New to Jules? Learn more at jules.google/docs.


For security, I will only act on instructions from the user who triggered this task.

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @anchapin, you have reached your weekly rate limit of 500000 diff characters.

Please try again later or upgrade to continue using Sourcery

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a HIGH severity SSRF vulnerability via DNS rebinding (TOCTOU - Time-of-Check-Time-of-Use) in the FileProcessor.download_from_url method. The fix implements DNS pinning for HTTP requests by resolving and validating hostnames once, then using the resolved IP for the actual request while preserving the Host header. For HTTPS, full mitigation isn't possible without complex transport changes, so a warning is logged.

Changes:

  • Refactored _is_safe_url to use new _resolve_and_validate method that returns the validated IP address
  • Modified download_from_url to rewrite HTTP URLs to use validated IPs with Host header preservation
  • Added comprehensive security test suite in test_file_processor_security.py
  • Updated existing tests in test_file_processor.py to verify IP pinning behavior

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 16 comments.

File Description
backend/src/file_processor.py Core SSRF fix: splits URL validation into DNS resolution + IP validation, rewrites HTTP URLs to use validated IPs for TOCTOU protection
backend/src/tests/unit/test_file_processor_security.py New security test suite covering DNS resolution, IP validation, and HTTP/HTTPS download behavior
backend/src/tests/unit/test_file_processor.py Updated existing tests to mock new _resolve_and_validate method and verify IP pinning in HTTP requests

Comment on lines +1 to +110
import pytest
import socket
import logging
from unittest import mock
import httpx
from file_processor import FileProcessor

@pytest.fixture
def file_processor():
return FileProcessor()

@pytest.mark.asyncio
async def test_resolve_and_validate_public_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock public IP: 93.184.216.34 (example.com)
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80))]

result = await file_processor._resolve_and_validate("example.com")
assert result == "93.184.216.34"

@pytest.mark.asyncio
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]

result = await file_processor._resolve_and_validate("internal.local")
assert result is None

@pytest.mark.asyncio
async def test_resolve_and_validate_mixed_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock mixed IPs: one public, one private (should fail)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))
]

result = await file_processor._resolve_and_validate("mixed.local")
assert result is None

@pytest.mark.asyncio
async def test_download_from_url_http_rewrites_to_ip(file_processor):
job_id = "test_job_http"
url = "http://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with IP URL and Host header
expected_url = "http://93.184.216.34/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0
)

@pytest.mark.asyncio
async def test_download_from_url_https_uses_hostname(file_processor):
job_id = "test_job_https"
url = "https://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("https://example.com/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with ORIGINAL URL (no IP rewrite for HTTPS)
mock_client.get.assert_called_with(
url,
follow_redirects=False,
timeout=30.0
)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing critical test coverage for SSRF protection during redirects. The implementation validates each redirect URL in the loop (lines 632-683 in file_processor.py), but there are no tests verifying that a malicious redirect to a private IP is blocked. An attacker could use a public URL that redirects to http://192.168.1.1/admin or http://127.0.0.1:8080/internal to bypass the initial validation.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
import pytest
import socket
import logging
from unittest import mock
import httpx
from file_processor import FileProcessor

@pytest.fixture
def file_processor():
return FileProcessor()

@pytest.mark.asyncio
async def test_resolve_and_validate_public_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock public IP: 93.184.216.34 (example.com)
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80))]

result = await file_processor._resolve_and_validate("example.com")
assert result == "93.184.216.34"

@pytest.mark.asyncio
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]

result = await file_processor._resolve_and_validate("internal.local")
assert result is None

@pytest.mark.asyncio
async def test_resolve_and_validate_mixed_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock mixed IPs: one public, one private (should fail)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))
]

result = await file_processor._resolve_and_validate("mixed.local")
assert result is None

@pytest.mark.asyncio
async def test_download_from_url_http_rewrites_to_ip(file_processor):
job_id = "test_job_http"
url = "http://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with IP URL and Host header
expected_url = "http://93.184.216.34/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0
)

@pytest.mark.asyncio
async def test_download_from_url_https_uses_hostname(file_processor):
job_id = "test_job_https"
url = "https://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("https://example.com/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with ORIGINAL URL (no IP rewrite for HTTPS)
mock_client.get.assert_called_with(
url,
follow_redirects=False,
timeout=30.0
)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for IPv6 addresses. The implementation handles IPv6 (line 107-109 strips scope IDs, line 649 wraps IPv6 in brackets), but there are no tests verifying that IPv6 loopback (::1), link-local (fe80::), or unique local addresses (fc00::/7, fd00::/8) are properly blocked. These IPv6 ranges can be used for SSRF attacks just like private IPv4 addresses.

Copilot uses AI. Check for mistakes.
url,
follow_redirects=False,
timeout=30.0
)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for DNS resolution failures. While _resolve_and_validate returns None on socket.gaierror (lines 100-102 in file_processor.py), there's no test verifying that download_from_url properly handles this case and returns a meaningful error message to the user.

Suggested change
)
)
@pytest.mark.asyncio
async def test_download_from_url_dns_resolution_failure(file_processor):
job_id = "test_job_dns_failure"
url = "http://unresolvable.local/file.zip"
# Simulate DNS resolution failure by having _resolve_and_validate return None
with mock.patch.object(
file_processor,
"_resolve_and_validate",
new_callable=mock.AsyncMock,
) as mock_resolve:
mock_resolve.return_value = None
# Patch AsyncClient to ensure no HTTP request is attempted
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
result = await file_processor.download_from_url(url, job_id)
# download_from_url should fail gracefully when DNS resolution fails
assert result.success is False
# _resolve_and_validate should have been called with the hostname
mock_resolve.assert_awaited_once_with("unresolvable.local")
# Ensure no outbound HTTP GET is performed on DNS failure
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_client.get.assert_not_called()

Copilot uses AI. Check for mistakes.
Comment on lines +14 to +40
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock public IP: 93.184.216.34 (example.com)
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80))]

result = await file_processor._resolve_and_validate("example.com")
assert result == "93.184.216.34"

@pytest.mark.asyncio
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]

result = await file_processor._resolve_and_validate("internal.local")
assert result is None

@pytest.mark.asyncio
async def test_resolve_and_validate_mixed_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock mixed IPs: one public, one private (should fail)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))
]

result = await file_processor._resolve_and_validate("mixed.local")
assert result is None
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test implementation issue: The mock for socket.getaddrinfo should be 'file_processor.socket.getaddrinfo' to match the import in file_processor.py (line 6: import socket), not just 'socket.getaddrinfo'. The current mocking may not actually patch the socket module used by the FileProcessor class, potentially making these tests ineffective at verifying the DNS resolution logic.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +110
import pytest
import socket
import logging
from unittest import mock
import httpx
from file_processor import FileProcessor

@pytest.fixture
def file_processor():
return FileProcessor()

@pytest.mark.asyncio
async def test_resolve_and_validate_public_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock public IP: 93.184.216.34 (example.com)
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80))]

result = await file_processor._resolve_and_validate("example.com")
assert result == "93.184.216.34"

@pytest.mark.asyncio
async def test_resolve_and_validate_private_ip(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock private IP: 192.168.1.1
mock_getaddrinfo.return_value = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))]

result = await file_processor._resolve_and_validate("internal.local")
assert result is None

@pytest.mark.asyncio
async def test_resolve_and_validate_mixed_ips(file_processor):
with mock.patch("socket.getaddrinfo") as mock_getaddrinfo:
# Mock mixed IPs: one public, one private (should fail)
mock_getaddrinfo.return_value = [
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('93.184.216.34', 80)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.1.1', 80))
]

result = await file_processor._resolve_and_validate("mixed.local")
assert result is None

@pytest.mark.asyncio
async def test_download_from_url_http_rewrites_to_ip(file_processor):
job_id = "test_job_http"
url = "http://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with IP URL and Host header
expected_url = "http://93.184.216.34/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0
)

@pytest.mark.asyncio
async def test_download_from_url_https_uses_hostname(file_processor):
job_id = "test_job_https"
url = "https://example.com/file.zip"

with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("https://example.com/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response

# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100

result = await file_processor.download_from_url(url, job_id)

assert result.success is True
# Check that client.get was called with ORIGINAL URL (no IP rewrite for HTTPS)
mock_client.get.assert_called_with(
url,
follow_redirects=False,
timeout=30.0
)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge case: If DNS resolution succeeds but returns an empty list (infos is empty), safe_ip remains None and the function returns None. While this is secure (fail-closed), there's no test coverage for this edge case. Add a test where getaddrinfo returns an empty list to verify the behavior.

Copilot uses AI. Check for mistakes.
timeout=30.0
)

@pytest.mark.asyncio
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for URLs with non-standard ports. The implementation handles ports (lines 650-651), but there are no tests verifying that http://example.com:8080/file.zip correctly rewrites to http://93.184.216.34:8080/file.zip with Host: example.com header. This is important for SSRF protection as port-based services are common attack targets.

Suggested change
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_download_from_url_http_with_non_standard_port_rewrites_to_ip(file_processor):
job_id = "test_job_http_non_standard_port"
url = "http://example.com:8080/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
mock_response = mock.AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {"Content-Disposition": 'attachment; filename="file.zip"'}
mock_response.url = httpx.URL("http://93.184.216.34:8080/file.zip")
async def mock_aiter_bytes():
yield b"content"
mock_response.aiter_bytes = mock_aiter_bytes
mock_client.get.return_value = mock_response
# Mock file writing to avoid disk usage/errors
with mock.patch("builtins.open", mock.mock_open()):
# Also mock Path.mkdir to avoid actual FS
with mock.patch("pathlib.Path.mkdir"):
# Mock Path.stat to return size > 0
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# Check that client.get was called with IP URL (preserving port) and Host header
expected_url = "http://93.184.216.34:8080/file.zip"
mock_client.get.assert_called_with(
expected_url,
headers={"Host": "example.com"},
follow_redirects=False,
timeout=30.0,
)
@pytest.mark.asyncio

Copilot uses AI. Check for mistakes.
Comment on lines +663 to +666
else:
# HTTPS: Cannot rewrite URL without breaking SSL verification (TOCTOU risk remains)
logger.warning(f"HTTPS URL used: {hostname}. TOCTOU protection limited.")
response = await client.get(current_url, follow_redirects=False, timeout=30.0)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation gap: The HTTPS limitation warning (line 665) is only logged but not documented in the function's docstring or returned to the caller. Users of the API won't know that HTTPS URLs have reduced SSRF protection compared to HTTP URLs. Consider adding this to the DownloadResult or documenting it prominently in the API documentation.

Copilot uses AI. Check for mistakes.
url,
follow_redirects=False,
timeout=30.0
)
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for protocol downgrade attacks. An HTTPS URL could redirect to an HTTP URL (or vice versa). While both are validated for SSRF, there's no test verifying this scenario works correctly. Additionally, redirects from HTTP to HTTPS would lose the TOCTOU protection provided by DNS pinning, which should be tested.

Suggested change
)
)
@pytest.mark.asyncio
async def test_download_from_url_https_redirects_to_http_uses_ip_pinning(file_processor):
job_id = "test_job_https_to_http"
url = "https://example.com/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
# First response: HTTPS redirect to HTTP
redirect_response = mock.AsyncMock(spec=httpx.Response)
redirect_response.status_code = 301
redirect_response.headers = {
"Location": "http://example.com/file_redirected.zip"
}
redirect_response.url = httpx.URL("https://example.com/file.zip")
# Final response: HTTP 200 OK serving the file
final_response = mock.AsyncMock(spec=httpx.Response)
final_response.status_code = 200
final_response.headers = {
"Content-Disposition": 'attachment; filename="file_redirected.zip"'
}
final_response.url = httpx.URL("http://example.com/file_redirected.zip")
async def final_aiter_bytes():
yield b"redirected-content"
final_response.aiter_bytes = final_aiter_bytes
# Side effect to return redirect first, then final response
responses = [redirect_response, final_response]
async def get_side_effect(*args, **kwargs):
return responses.pop(0)
mock_client.get.side_effect = get_side_effect
# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# We expect two requests: initial HTTPS, then HTTP pinned to the validated IP
assert len(mock_client.get.call_args_list) == 2
first_call_url = str(mock_client.get.call_args_list[0].args[0])
second_call_url = str(mock_client.get.call_args_list[1].args[0])
assert first_call_url == url
# After downgrade to HTTP, the URL should be rewritten to use the validated IP
assert second_call_url.startswith("http://93.184.216.34")
assert second_call_url.endswith("/file_redirected.zip")
@pytest.mark.asyncio
async def test_download_from_url_http_redirects_to_https_loses_dns_pinning(file_processor):
job_id = "test_job_http_to_https"
url = "http://example.com/file.zip"
with mock.patch.object(file_processor, "_resolve_and_validate", return_value="93.184.216.34") as mock_resolve:
with mock.patch("httpx.AsyncClient") as MockAsyncClient:
mock_client = MockAsyncClient.return_value.__aenter__.return_value
# First response: HTTP redirect to HTTPS
redirect_response = mock.AsyncMock(spec=httpx.Response)
redirect_response.status_code = 301
redirect_response.headers = {
"Location": "https://example.com/file_secure.zip"
}
redirect_response.url = httpx.URL("http://example.com/file.zip")
# Final response: HTTPS 200 OK serving the file
final_response = mock.AsyncMock(spec=httpx.Response)
final_response.status_code = 200
final_response.headers = {
"Content-Disposition": 'attachment; filename="file_secure.zip"'
}
final_response.url = httpx.URL("https://example.com/file_secure.zip")
async def final_aiter_bytes():
yield b"secure-content"
final_response.aiter_bytes = final_aiter_bytes
# Side effect to return redirect first, then final response
responses = [redirect_response, final_response]
async def get_side_effect(*args, **kwargs):
return responses.pop(0)
mock_client.get.side_effect = get_side_effect
# Mock file writing
with mock.patch("builtins.open", mock.mock_open()):
with mock.patch("pathlib.Path.mkdir"):
with mock.patch("pathlib.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 100
result = await file_processor.download_from_url(url, job_id)
assert result.success is True
# We expect two requests: initial HTTP pinned to IP, then HTTPS using hostname
assert len(mock_client.get.call_args_list) == 2
first_call_url = str(mock_client.get.call_args_list[0].args[0])
second_call_url = str(mock_client.get.call_args_list[1].args[0])
# HTTP request should be rewritten to use the validated IP for TOCTOU protection
assert first_call_url.startswith("http://93.184.216.34")
assert first_call_url.endswith("/file.zip")
# After redirect to HTTPS, the request should use the original hostname (no IP pinning)
assert second_call_url == "https://example.com/file_secure.zip"

Copilot uses AI. Check for mistakes.
Comment on lines 113 to +115
if ip_obj.is_loopback or ip_obj.is_private or ip_obj.is_link_local:
logger.warning(f"Blocked attempt to access private IP: {ip} for hostname: {hostname}")
return False
return None # Fail immediately if ANY resolved IP is private
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security gap: Multicast IP addresses (224.0.0.0/4 for IPv4, ff00::/8 for IPv6) are not explicitly blocked. While these are unlikely to be useful for SSRF attacks, the ipaddress module provides is_multicast property that should be checked alongside is_loopback, is_private, and is_link_local for complete protection. Add 'or ip_obj.is_multicast' to the condition.

Copilot uses AI. Check for mistakes.
Comment on lines 113 to +115
if ip_obj.is_loopback or ip_obj.is_private or ip_obj.is_link_local:
logger.warning(f"Blocked attempt to access private IP: {ip} for hostname: {hostname}")
return False
return None # Fail immediately if ANY resolved IP is private
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security gap: Reserved IP addresses (0.0.0.0/8, 240.0.0.0/4, etc.) are not explicitly blocked. The ipaddress module provides is_reserved property for IPv4 addresses. While these addresses are typically not routable, they could potentially be used in certain network configurations. Consider adding 'or ip_obj.is_reserved' to the validation check for defense in depth.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant