Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 42 additions & 51 deletions onvif/zeep_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from zeep.transports import Transport
from zeep.utils import get_version
from zeep.wsdl.utils import etree_to_string

from multidict import CIMultiDict
import httpx
from aiohttp import ClientResponse, ClientSession
from aiohttp import ClientResponse, ClientSession, hdrs
from requests import Response
from requests.structures import CaseInsensitiveDict

if TYPE_CHECKING:
from lxml.etree import _Element
Expand Down Expand Up @@ -65,14 +66,23 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
async def aclose(self) -> None:
"""Close the transport session."""

def _filter_headers(self, headers: CIMultiDict[str]) -> list[tuple[str, str]]:
"""Filter out Content-Encoding header.

Since aiohttp has already decompressed the content, we need to
remove the Content-Encoding header to prevent zeep from trying
to decompress it again, which would cause a zlib error.
"""
return [(k, v) for k, v in headers.items() if k != hdrs.CONTENT_ENCODING]

def _aiohttp_to_httpx_response(
self, aiohttp_response: ClientResponse, content: bytes
) -> httpx.Response:
"""Convert aiohttp ClientResponse to httpx Response."""
# Create httpx Response with the content
httpx_response = httpx.Response(
status_code=aiohttp_response.status,
headers=httpx.Headers(aiohttp_response.headers),
headers=httpx.Headers(self._filter_headers(aiohttp_response.headers)),
content=content,
request=httpx.Request(
method=aiohttp_response.method,
Expand Down Expand Up @@ -104,7 +114,10 @@ def _aiohttp_to_requests_response(
new = Response()
new._content = content
new.status_code = aiohttp_response.status
new.headers = dict(aiohttp_response.headers)
# Use dict comprehension for requests.Response headers
new.headers = CaseInsensitiveDict(
self._filter_headers(aiohttp_response.headers)
)
# Convert aiohttp cookies to requests format
if aiohttp_response.cookies:
for name, cookie in aiohttp_response.cookies.items():
Expand All @@ -117,27 +130,10 @@ def _aiohttp_to_requests_response(
new.encoding = aiohttp_response.charset
return new

async def post(
self, address: str, message: str, headers: dict[str, str]
) -> httpx.Response:
"""
Perform async POST request.

Args:
address: The URL to send the request to
message: The message to send
headers: HTTP headers to include

Returns:
The httpx response object

"""
return await self._post(address, message, headers)

async def _post(
self, address: str, message: str, headers: dict[str, str]
) -> httpx.Response:
"""Internal POST implementation."""
async def _post_internal(
self, address: str, message: str | bytes, headers: dict[str, str]
) -> tuple[ClientResponse, bytes]:
"""Internal POST implementation that returns aiohttp response and content."""
_LOGGER.debug("HTTP Post to %s:\n%s", address, message)

# Set default headers
Expand Down Expand Up @@ -169,15 +165,31 @@ async def _post(
content,
)

# Convert to httpx Response
return self._aiohttp_to_httpx_response(response, content)
return response, content
except RuntimeError as exc:
# Handle RuntimeError which may occur if the session is closed
raise RuntimeError(f"Failed to post to {address}: {exc}") from exc

except TimeoutError as exc:
raise TimeoutError(f"Request to {address} timed out") from exc

async def post(
self, address: str, message: str, headers: dict[str, str]
) -> httpx.Response:
"""
Perform async POST request.

Args:
address: The URL to send the request to
message: The message to send
headers: HTTP headers to include

Returns:
The httpx response object

"""
response, content = await self._post_internal(address, message, headers)
return self._aiohttp_to_httpx_response(response, content)

async def post_xml(
self, address: str, envelope: _Element, headers: dict[str, str]
) -> Response:
Expand All @@ -194,8 +206,8 @@ async def post_xml(

"""
message = etree_to_string(envelope)
response = await self.post(address, message, headers)
return self._httpx_to_requests_response(response)
response, content = await self._post_internal(address, message, headers)
return self._aiohttp_to_requests_response(response, content)

async def get(
self,
Expand All @@ -215,15 +227,6 @@ async def get(
A Response object compatible with zeep

"""
return await self._get(address, params, headers)

async def _get(
self,
address: str,
params: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> Response:
"""Internal GET implementation."""
_LOGGER.debug("HTTP Get from %s", address)

# Set default headers
Expand Down Expand Up @@ -258,18 +261,6 @@ async def _get(
except TimeoutError as exc:
raise TimeoutError(f"Request to {address} timed out") from exc

def _httpx_to_requests_response(self, response: httpx.Response) -> Response:
"""Convert an httpx.Response object to a requests.Response object"""
body = response.read()

new = Response()
new._content = body
new.status_code = response.status_code
new.headers = response.headers
new.cookies = response.cookies
new.encoding = response.encoding
return new

def load(self, url: str) -> bytes:
"""
Load content from URL synchronously.
Expand Down
126 changes: 110 additions & 16 deletions tests/test_zeep_transport.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Tests for AIOHTTPTransport to ensure compatibility with zeep's AsyncTransport."""

from http.cookies import SimpleCookie
from unittest.mock import AsyncMock, Mock, patch

import aiohttp
import httpx
import pytest
from lxml import etree
from multidict import CIMultiDict
from onvif.zeep_aiohttp import AIOHTTPTransport
from requests import Response as RequestsResponse

Expand Down Expand Up @@ -460,8 +462,6 @@ async def test_cookies_in_requests_response():
transport = AIOHTTPTransport(session=mock_session)

# Mock cookies using SimpleCookie format
from http.cookies import SimpleCookie

mock_cookies = SimpleCookie()
mock_cookies["session"] = "abc123"

Expand Down Expand Up @@ -606,8 +606,6 @@ async def test_cookie_conversion_httpx_basic():
transport = AIOHTTPTransport(session=mock_session)

# Create aiohttp cookies
from http.cookies import SimpleCookie

cookies = SimpleCookie()
cookies["session"] = "abc123"
cookies["session"]["domain"] = ".example.com"
Expand Down Expand Up @@ -652,8 +650,6 @@ async def test_cookie_conversion_requests_basic():
transport = AIOHTTPTransport(session=mock_session)

# Create aiohttp cookies
from http.cookies import SimpleCookie

cookies = SimpleCookie()
cookies["token"] = "xyz789"
cookies["token"]["domain"] = ".api.example.com"
Expand Down Expand Up @@ -688,8 +684,6 @@ async def test_cookie_attributes_httpx():
transport = AIOHTTPTransport(session=mock_session)

# Create cookie with all attributes
from http.cookies import SimpleCookie

cookies = SimpleCookie()
cookies["auth"] = "secret123"
cookies["auth"]["domain"] = ".secure.com"
Expand Down Expand Up @@ -732,8 +726,6 @@ async def test_multiple_cookies():
transport = AIOHTTPTransport(session=mock_session)

# Create multiple cookies
from http.cookies import SimpleCookie

cookies = SimpleCookie()
for i in range(5):
cookie_name = f"cookie{i}"
Expand Down Expand Up @@ -773,8 +765,6 @@ async def test_empty_cookies():
transport = AIOHTTPTransport(session=mock_session)

# Mock response without cookies
from http.cookies import SimpleCookie

mock_response = Mock(spec=aiohttp.ClientResponse)
mock_response.status = 200
mock_response.headers = {}
Expand Down Expand Up @@ -803,8 +793,6 @@ async def test_cookie_encoding():
transport = AIOHTTPTransport(session=mock_session)

# Create cookies with special chars
from http.cookies import SimpleCookie

cookies = SimpleCookie()
cookies["data"] = "hello%20world%21" # URL encoded
cookies["unicode"] = "café"
Expand Down Expand Up @@ -838,8 +826,6 @@ async def test_cookie_jar_type():
mock_session = create_mock_session()
transport = AIOHTTPTransport(session=mock_session)

from http.cookies import SimpleCookie

cookies = SimpleCookie()
cookies["test"] = "value"

Expand Down Expand Up @@ -870,6 +856,114 @@ async def test_cookie_jar_type():
assert "test" in requests_result.cookies


@pytest.mark.asyncio
async def test_gzip_content_encoding_header_removed():
"""Test that Content-Encoding: gzip header is removed after aiohttp decompresses.

This fixes the issue where aiohttp automatically decompresses gzip content
but the Content-Encoding header was still passed to zeep, causing it to
attempt decompression again on already-decompressed content, resulting in
zlib errors.
"""
mock_session = create_mock_session()
transport = AIOHTTPTransport(session=mock_session)

# Mock response with Content-Encoding: gzip
# aiohttp will have already decompressed the content
mock_aiohttp_response = Mock(spec=aiohttp.ClientResponse)
mock_aiohttp_response.status = 200
# Simulate headers with Content-Encoding: gzip
headers = CIMultiDict()
headers["Content-Type"] = "application/soap+xml; charset=utf-8"
headers["Content-Encoding"] = "gzip"
headers["Server"] = "PelcoOnvifNvt"
mock_aiohttp_response.headers = headers
mock_aiohttp_response.method = "POST"
mock_aiohttp_response.url = "http://camera.local/onvif/device_service"
mock_aiohttp_response.charset = "utf-8"
mock_aiohttp_response.cookies = {}

# Content is already decompressed by aiohttp
decompressed_content = b'<?xml version="1.0"?><soap:Envelope>test</soap:Envelope>'
mock_aiohttp_response.read = AsyncMock(return_value=decompressed_content)

mock_session = Mock(spec=aiohttp.ClientSession)
mock_session.post = AsyncMock(return_value=mock_aiohttp_response)
transport.session = mock_session

# Test httpx response (from post)
httpx_result = await transport.post(
"http://camera.local/onvif/device_service", "<request/>", {}
)

# Verify Content-Encoding header was removed
assert "content-encoding" not in httpx_result.headers
assert "Content-Encoding" not in httpx_result.headers
# Other headers should still be present
assert httpx_result.headers["content-type"] == "application/soap+xml; charset=utf-8"
assert httpx_result.headers["server"] == "PelcoOnvifNvt"
# Content should be the decompressed data
assert httpx_result.read() == decompressed_content

# Test requests response (from get)
mock_session.get = AsyncMock(return_value=mock_aiohttp_response)
requests_result = await transport.get("http://camera.local/onvif/device_service")

# Verify Content-Encoding header was removed from requests response too
assert "content-encoding" not in requests_result.headers
assert "Content-Encoding" not in requests_result.headers
# Other headers should still be present
assert (
requests_result.headers["content-type"] == "application/soap+xml; charset=utf-8"
)
assert requests_result.headers["server"] == "PelcoOnvifNvt"
# Content should be the decompressed data
assert requests_result.content == decompressed_content


@pytest.mark.asyncio
async def test_multiple_duplicate_headers_preserved():
"""Test that duplicate headers (except Content-Encoding) are preserved."""
mock_session = create_mock_session()
transport = AIOHTTPTransport(session=mock_session)

# Mock response with duplicate headers
mock_aiohttp_response = Mock(spec=aiohttp.ClientResponse)
mock_aiohttp_response.status = 200

# Create headers with duplicates (like multiple Set-Cookie headers)
headers = CIMultiDict()
headers.add("Set-Cookie", "session=abc123; Path=/")
headers.add("Set-Cookie", "user=john; Path=/api")
headers.add("Set-Cookie", "token=xyz789; Secure")
headers.add("Content-Type", "text/xml")
headers.add("Content-Encoding", "gzip") # This should be removed

mock_aiohttp_response.headers = headers
mock_aiohttp_response.method = "POST"
mock_aiohttp_response.url = "http://example.com"
mock_aiohttp_response.charset = "utf-8"
mock_aiohttp_response.cookies = {}
mock_aiohttp_response.read = AsyncMock(return_value=b"test")

mock_session = Mock(spec=aiohttp.ClientSession)
mock_session.post = AsyncMock(return_value=mock_aiohttp_response)
transport.session = mock_session

# Test httpx response
httpx_result = await transport.post("http://example.com", "test", {})

# Content-Encoding should be removed
assert "content-encoding" not in httpx_result.headers

# All Set-Cookie headers should be preserved
set_cookie_values = httpx_result.headers.get_list("set-cookie")
assert len(set_cookie_values) == 3
assert "session=abc123; Path=/" in set_cookie_values
assert "user=john; Path=/api" in set_cookie_values
assert "token=xyz789; Secure" in set_cookie_values


@pytest.mark.asyncio
async def test_http_error_responses_no_exception():
"""Test that HTTP error responses (401, 500, etc.) don't raise exceptions."""
Expand Down