This repository has been archived by the owner on Jan 18, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 432
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
In the process - "spring clean" the modules that were touched - use HttpMock when HttpMockSequence not needed - add some verifications on new HttpMock's
- Loading branch information
Showing
6 changed files
with
237 additions
and
119 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
from oauth2client import client | ||
from oauth2client import clientsecrets | ||
from oauth2client import service_account | ||
from oauth2client import transport | ||
from . import http_mock | ||
|
||
__author__ = '[email protected] (Joe Gregorio)' | ||
|
@@ -899,7 +900,7 @@ def test_token_refresh_success(self): | |
({'status': http_client.OK}, 'echo_request_headers'), | ||
]) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) | ||
self.assertFalse(self.credentials.access_token_expired) | ||
self.assertEqual(token_response, self.credentials.token_response) | ||
|
@@ -918,7 +919,7 @@ def test_recursive_authorize(self): | |
http = http_mock.HttpMock(data=encoded_response) | ||
http = self.credentials.authorize(http) | ||
http = self.credentials.authorize(http) | ||
http.request('http://example.com') | ||
transport.request(http, 'http://example.com') | ||
|
||
def test_token_refresh_failure(self): | ||
for status_code in client.REFRESH_STATUS_CODES: | ||
|
@@ -930,7 +931,7 @@ def test_token_refresh_failure(self): | |
http = self.credentials.authorize(http) | ||
with self.assertRaises( | ||
client.HttpAccessTokenRefreshError) as exc_manager: | ||
http.request('http://example.com') | ||
transport.request(http, 'http://example.com') | ||
self.assertEqual(http_client.BAD_REQUEST, | ||
exc_manager.exception.status) | ||
self.assertTrue(self.credentials.access_token_expired) | ||
|
@@ -957,7 +958,7 @@ def test_token_revoke_fallback(self): | |
def test_non_401_error_response(self): | ||
http = http_mock.HttpMock(headers={'status': http_client.BAD_REQUEST}) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(http_client.BAD_REQUEST, resp.status) | ||
self.assertEqual(None, self.credentials.token_response) | ||
|
||
|
@@ -1001,16 +1002,18 @@ def test_unicode_header_checks(self): | |
http = credentials.authorize(http_mock.HttpMock()) | ||
headers = {u'foo': 3, b'bar': True, 'baz': b'abc'} | ||
cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'} | ||
http.request(u'http://example.com', method=u'GET', headers=headers) | ||
transport.request( | ||
http, u'http://example.com', method=u'GET', headers=headers) | ||
for k, v in cleaned_headers.items(): | ||
self.assertTrue(k in http.headers) | ||
self.assertEqual(v, http.headers[k]) | ||
|
||
# Next, test that we do fail on unicode. | ||
unicode_str = six.unichr(40960) + 'abcd' | ||
with self.assertRaises(client.NonAsciiHeaderError): | ||
http.request(u'http://example.com', method=u'GET', | ||
headers={u'foo': unicode_str}) | ||
transport.request( | ||
http, u'http://example.com', method=u'GET', | ||
headers={u'foo': unicode_str}) | ||
|
||
def test_no_unicode_in_request_params(self): | ||
access_token = u'foo' | ||
|
@@ -1027,17 +1030,18 @@ def test_no_unicode_in_request_params(self): | |
|
||
http = http_mock.HttpMock() | ||
http = credentials.authorize(http) | ||
http.request(u'http://example.com', method=u'GET', | ||
headers={u'foo': u'bar'}) | ||
transport.request( | ||
http, u'http://example.com', method=u'GET', | ||
headers={u'foo': u'bar'}) | ||
for k, v in six.iteritems(http.headers): | ||
self.assertIsInstance(k, six.binary_type) | ||
self.assertIsInstance(v, six.binary_type) | ||
|
||
# Test again with unicode strings that can't simply be converted | ||
# to ASCII. | ||
with self.assertRaises(client.NonAsciiHeaderError): | ||
http.request( | ||
u'http://example.com', method=u'GET', | ||
transport.request( | ||
http, u'http://example.com', method=u'GET', | ||
headers={u'foo': u'\N{COMET}'}) | ||
|
||
self.credentials.token_response = 'foobar' | ||
|
@@ -1473,7 +1477,7 @@ def test_refresh_updates_id_token(self): | |
({'status': '200'}, 'echo_request_headers'), | ||
]) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(self.credentials.id_token, body) | ||
|
||
|
||
|
@@ -1492,7 +1496,7 @@ def test_token_refresh_success(self): | |
headers={'status': status_code}, data=b'') | ||
http = self.credentials.authorize(http) | ||
with self.assertRaises(client.AccessTokenCredentialsError): | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
|
||
def test_token_revoke_success(self): | ||
_token_revoke_test_helper( | ||
|
@@ -1507,15 +1511,15 @@ def test_token_revoke_failure(self): | |
def test_non_401_error_response(self): | ||
http = http_mock.HttpMock(headers={'status': http_client.BAD_REQUEST}) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(http_client.BAD_REQUEST, resp.status) | ||
|
||
def test_auth_header_sent(self): | ||
http = http_mock.HttpMockSequence([ | ||
({'status': '200'}, 'echo_request_headers'), | ||
]) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(b'Bearer foo', content[b'Authorization']) | ||
|
||
|
||
|
@@ -1551,7 +1555,7 @@ def test_assertion_refresh(self): | |
({'status': '200'}, 'echo_request_headers'), | ||
]) | ||
http = self.credentials.authorize(http) | ||
resp, content = http.request('http://example.com') | ||
resp, content = transport.request(http, 'http://example.com') | ||
self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) | ||
|
||
def test_token_revoke_success(self): | ||
|
@@ -1742,16 +1746,17 @@ def _step1_get_device_and_user_codes_helper( | |
device_code, user_code, None, ver_url, None) | ||
self.assertEqual(result, expected) | ||
self.assertEqual(len(http.requests), 1) | ||
self.assertEqual( | ||
http.requests[0]['uri'], oauth2client.GOOGLE_DEVICE_URI) | ||
body = http.requests[0]['body'] | ||
self.assertEqual(urllib.parse.parse_qs(body), | ||
{'client_id': [flow.client_id], | ||
'scope': [flow.scope]}) | ||
info = http.requests[0] | ||
self.assertEqual(info['uri'], oauth2client.GOOGLE_DEVICE_URI) | ||
expected_body = { | ||
'client_id': [flow.client_id], | ||
'scope': [flow.scope], | ||
} | ||
self.assertEqual(urllib.parse.parse_qs(info['body']), expected_body) | ||
headers = {'content-type': 'application/x-www-form-urlencoded'} | ||
if extra_headers is not None: | ||
headers.update(extra_headers) | ||
self.assertEqual(http.requests[0]['headers'], headers) | ||
self.assertEqual(info['headers'], headers) | ||
|
||
def test_step1_get_device_and_user_codes(self): | ||
self._step1_get_device_and_user_codes_helper() | ||
|
Oops, something went wrong.