Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#651] regex compatability for headers matching #663

Merged
merged 5 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion responses/matchers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json as json_module
import re
from json.decoder import JSONDecodeError
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -402,14 +403,31 @@ def header_matcher(
:return: (func) matcher
"""

def _compare_with_regex(request_headers: Union[Dict[Any, Any], Any]) -> bool:
if strict_match and len(request_headers) != len(headers):
return False

for k, v in headers.items():
if request_headers.get(k) is not None:
geetptl marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(v, re.Pattern):
if re.match(v, request_headers[k]) is None:
return False
else:
if not v == request_headers[k]:
return False
elif strict_match:
return False

return True

def match(request: PreparedRequest) -> Tuple[bool, str]:
request_headers: Union[Dict[Any, Any], Any] = request.headers or {}

if not strict_match:
# filter down to just the headers specified in the matcher
request_headers = {k: v for k, v in request_headers.items() if k in headers}

valid = sorted(headers.items()) == sorted(request_headers.items())
valid = _compare_with_regex(request_headers)

if not valid:
return False, "Headers do not match: {} doesn't match {}".format(
Expand Down
164 changes: 164 additions & 0 deletions responses/tests/test_matchers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -782,3 +783,166 @@ def test_matchers_create_key_val_str():
"a, {3: test, key1: val1, key2: 2}], test: val}"
)
assert conv_str == reference


class TestHeaderWithRegex:
url = "http://example.com/"
geetptl marked this conversation as resolved.
Show resolved Hide resolved

def _register(self):
responses.add(
method=responses.GET,
url=self.url,
body="success",
match=[
matchers.header_matcher(
{
"Accept": "text/plain",
"Message-Signature": re.compile(r'signature="\S+",created=\d+'),
},
strict_match=True,
)
],
)

def test_request_matches_headers_regex(self):
@responses.activate
def run():
# this one can not use common _register method because different headers
responses.add(
method=responses.GET,
url=self.url,
json={"success": True},
match=[
matchers.header_matcher(
{
"Message-Signature": re.compile(
r'signature="\S+",created=\d+'
),
"Authorization": "Bearer API_TOKEN",
},
strict_match=False,
)
],
)
# the actual request can contain extra headers (requests always adds some itself anyway)
resp = requests.get(
self.url,
headers={
"Message-Signature": 'signature="abc",created=1243',
"Authorization": "Bearer API_TOKEN",
},
)
assert_response(
resp, body='{"success": true}', content_type="application/json"
)

run()
assert_reset()

def test_request_matches_headers_regex_strict_match_regex_failed(self):
@responses.activate
def run():
self._register()
session = requests.Session()
# requests will add some extra headers of its own, so we have to use prepared requests
prepped = session.prepare_request(
requests.Request(
method="GET",
url=self.url,
)
)
prepped.headers.clear()
prepped.headers["Accept"] = "text/plain"
prepped.headers["Message-Signature"] = 'signature="123",created=abc'
with pytest.raises(ConnectionError) as excinfo:
session.send(prepped)
msg = str(excinfo.value)
assert (
"Headers do not match: {Accept: text/plain, Message-Signature: "
'signature="123",created=abc} '
"doesn't match {Accept: text/plain, Message-Signature: "
"re.compile('signature=\"\\\\S+\",created=\\\\d+')}"
) in msg

run()
assert_reset()

def test_request_matches_headers_regex_strict_match_mismatched_field(self):
@responses.activate
def run():
self._register()
# requests will add some extra headers of its own, so we have to use prepared requests
session = requests.Session()
prepped = session.prepare_request(
requests.Request(
method="GET",
url=self.url,
)
)
prepped.headers.clear()
prepped.headers["Accept"] = "text/plain"
prepped.headers["Accept-Charset"] = "utf-8"
# "Accept-Charset" header will fail to match to "Message-Signature"
with pytest.raises(ConnectionError) as excinfo:
session.send(prepped)
msg = str(excinfo.value)
assert (
"Headers do not match: {Accept: text/plain, Accept-Charset: utf-8} "
"doesn't match {Accept: text/plain, Message-Signature: "
"re.compile('signature=\"\\\\S+\",created=\\\\d+')}"
) in msg

run()
assert_reset()

def test_request_matches_headers_regex_strict_match_mismatched_number(self):
@responses.activate
def run():
self._register()
# requests will add some extra headers of its own, so we have to use prepared requests
session = requests.Session()
# include the "Accept-Charset" header, which will fail to match
prepped = session.prepare_request(
requests.Request(
method="GET",
url=self.url,
)
)
prepped.headers.clear()
prepped.headers["Accept"] = "text/plain"
prepped.headers["Accept-Charset"] = "utf-8"
prepped.headers["Message-Signature"] = 'signature="abc",created=1243'
with pytest.raises(ConnectionError) as excinfo:
session.send(prepped)
msg = str(excinfo.value)
assert (
"Headers do not match: {Accept: text/plain, Accept-Charset: utf-8, "
'Message-Signature: signature="abc",'
"created=1243} "
"doesn't match {Accept: text/plain, Message-Signature: "
"re.compile('signature=\"\\\\S+\",created=\\\\d+')}"
) in msg

run()
assert_reset()

def test_request_matches_headers_regex_strict_match_positive(self):
@responses.activate
def run():
self._register()
# requests will add some extra headers of its own, so we have to use prepared requests
session = requests.Session()
prepped = session.prepare_request(
requests.Request(
method="GET",
url=self.url,
)
)
prepped.headers.clear()
prepped.headers["Accept"] = "text/plain"
prepped.headers["Message-Signature"] = 'signature="abc",created=1243'
resp = session.send(prepped)
assert_response(resp, body="success", content_type="text/plain")

run()
assert_reset()