From 7aa17fda51fe89a67a4d8400d1a462af28f5e7cd Mon Sep 17 00:00:00 2001 From: Hiroaki KAWAI Date: Thu, 23 Feb 2017 11:19:59 +0000 Subject: [PATCH 1/5] Add ENABLE_PUSH flag in the Upgrade HTTP2-Settings header Push flag required for the case the initial upgrade request triggered server push. --- hyper/common/connection.py | 3 +- hyper/http11/connection.py | 5 + test/test_abstraction.py | 1 + test/test_hyper.py | 190 ++++++++++++++++++++++++++++++++++++- 4 files changed, 194 insertions(+), 5 deletions(-) diff --git a/hyper/common/connection.py b/hyper/common/connection.py index dee18d68..507a8ad7 100644 --- a/hyper/common/connection.py +++ b/hyper/common/connection.py @@ -62,7 +62,8 @@ def __init__(self, self._port = port self._h1_kwargs = { 'secure': secure, 'ssl_context': ssl_context, - 'proxy_host': proxy_host, 'proxy_port': proxy_port + 'proxy_host': proxy_host, 'proxy_port': proxy_port, + 'enable_push': enable_push } self._h2_kwargs = { 'window_manager': window_manager, 'enable_push': enable_push, diff --git a/hyper/http11/connection.py b/hyper/http11/connection.py index 48bde2f0..8025c740 100644 --- a/hyper/http11/connection.py +++ b/hyper/http11/connection.py @@ -78,6 +78,7 @@ def __init__(self, host, port=None, secure=None, ssl_context=None, # only send http upgrade headers for non-secure connection self._send_http_upgrade = not self.secure + self._enable_push = kwargs.get('enable_push') self.ssl_context = ssl_context self._sock = None @@ -276,6 +277,10 @@ def _add_upgrade_headers(self, headers): # Settings header. http2_settings = SettingsFrame(0) http2_settings.settings[SettingsFrame.INITIAL_WINDOW_SIZE] = 65535 + if self._enable_push is not None: + http2_settings.settings[SettingsFrame.ENABLE_PUSH] = ( + int(self._enable_push) + ) encoded_settings = base64.urlsafe_b64encode( http2_settings.serialize_body() ) diff --git a/test/test_abstraction.py b/test/test_abstraction.py index cd0e0645..7c2cad1a 100644 --- a/test/test_abstraction.py +++ b/test/test_abstraction.py @@ -19,6 +19,7 @@ def test_h1_kwargs(self): 'proxy_host': False, 'proxy_port': False, 'other_kwarg': True, + 'enable_push': True, } def test_h2_kwargs(self): diff --git a/test/test_hyper.py b/test/test_hyper.py index 6a18d592..09847511 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -9,6 +9,7 @@ PingFrame, FRAME_MAX_ALLOWED_LEN ) from hpack.hpack_compat import Encoder +from hyper.common.connection import HTTPConnection from hyper.http20.connection import HTTP20Connection from hyper.http20.response import HTTP20Response, HTTP20Push from hyper.http20.exceptions import ConnectionError, StreamResetError @@ -731,8 +732,8 @@ def add_data_frame(self, stream_id, data, end_stream=False): frame.flags.add('END_STREAM') self.frames.append(frame) - def request(self): - self.conn = HTTP20Connection('www.google.com', enable_push=True) + def request(self, enable_push=True): + self.conn = HTTP20Connection('www.google.com', enable_push=enable_push) self.conn._sock = DummySocket() self.conn._sock.buffer = BytesIO( b''.join([frame.serialize() for frame in self.frames]) @@ -934,8 +935,7 @@ def test_reset_pushed_streams_when_push_disabled(self): 1, [(':status', '200'), ('content-type', 'text/html')] ) - self.request() - self.conn._enable_push = False + self.request(False) self.conn.get_response() f = RstStreamFrame(2) @@ -1303,6 +1303,188 @@ def test_resetting_streams_after_close(self): c._single_read() +class TestUpgradingPush(object): + http101 = (b"HTTP/1.1 101 Switching Protocols\r\n" + b"Connection: upgrade\r\n" + b"Upgrade: h2c\r\n" + b"\r\n") + + def setup_method(self, method): + self.frames = [SettingsFrame(0)] # Server side preface + self.encoder = Encoder() + self.conn = None + + def add_push_frame(self, stream_id, promised_stream_id, headers, + end_block=True): + frame = PushPromiseFrame(stream_id) + frame.promised_stream_id = promised_stream_id + frame.data = self.encoder.encode(headers) + if end_block: + frame.flags.add('END_HEADERS') + self.frames.append(frame) + + def add_headers_frame(self, stream_id, headers, end_block=True, + end_stream=False): + frame = HeadersFrame(stream_id) + frame.data = self.encoder.encode(headers) + if end_block: + frame.flags.add('END_HEADERS') + if end_stream: + frame.flags.add('END_STREAM') + self.frames.append(frame) + + def add_data_frame(self, stream_id, data, end_stream=False): + frame = DataFrame(stream_id) + frame.data = data + if end_stream: + frame.flags.add('END_STREAM') + self.frames.append(frame) + + def request(self, enable_push=True): + self.conn = HTTPConnection('www.google.com', enable_push=enable_push) + self.conn._conn._sock = DummySocket() + self.conn._conn._sock.buffer = BytesIO( + self.http101 + b''.join([frame.serialize() + for frame in self.frames]) + ) + self.conn.request('GET', '/') + + def assert_response(self): + self.response = self.conn.get_response() + assert self.response.status == 200 + assert dict(self.response.headers) == {b'content-type': [b'text/html']} + + def assert_pushes(self): + self.pushes = list(self.conn.get_pushes()) + assert len(self.pushes) == 1 + assert self.pushes[0].method == b'GET' + assert self.pushes[0].scheme == b'http' + assert self.pushes[0].authority == b'www.google.com' + assert self.pushes[0].path == b'/' + expected_headers = {b'accept-encoding': [b'gzip']} + assert dict(self.pushes[0].request_headers) == expected_headers + + def assert_push_response(self): + push_response = self.pushes[0].get_response() + assert push_response.status == 200 + assert dict(push_response.headers) == { + b'content-type': [b'application/javascript'] + } + assert push_response.read() == b'bar' + + def test_promise_before_headers(self): + # Current implementation only support get_pushes call + # after get_response + pass + + def test_promise_after_headers(self): + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_data_frame(1, b'foo', end_stream=True) + self.add_headers_frame( + 2, [(':status', '200'), ('content-type', 'application/javascript')] + ) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + self.assert_response() + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_promise_after_data(self): + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + self.add_data_frame(1, b'fo') + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_data_frame(1, b'o', end_stream=True) + self.add_headers_frame( + 2, [(':status', '200'), ('content-type', 'application/javascript')] + ) + self.add_data_frame(2, b'bar', end_stream=True) + + self.request() + self.assert_response() + assert self.response.read() == b'foo' + self.assert_pushes() + self.assert_push_response() + + def test_capture_all_promises(self): + # Current implementation does not support capture_all + # for h2c upgrading connection. + pass + + def test_cancel_push(self): + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + + self.request() + self.conn.get_response() + list(self.conn.get_pushes())[0].cancel() + + f = RstStreamFrame(2) + f.error_code = 8 + assert self.conn._sock.queue[-1] == f.serialize() + + def test_reset_pushed_streams_when_push_disabled(self): + self.add_push_frame( + 1, + 2, + [ + (':method', 'GET'), + (':path', '/'), + (':authority', 'www.google.com'), + (':scheme', 'http'), + ('accept-encoding', 'gzip') + ] + ) + self.add_headers_frame( + 1, [(':status', '200'), ('content-type', 'text/html')] + ) + + self.request(False) + self.conn.get_response() + + f = RstStreamFrame(2) + f.error_code = 7 + assert self.conn._sock.queue[-1].endswith(f.serialize()) + + # Some utility classes for the tests. class NullEncoder(object): @staticmethod From 085975fbc0b25cfe9d0333a029822d971b8f961d Mon Sep 17 00:00:00 2001 From: Hiroaki KAWAI Date: Thu, 23 Feb 2017 11:22:33 +0000 Subject: [PATCH 2/5] fix test synchronization Some tests requires synchronization to let the connection state machine working. --- test/test_integration.py | 119 +++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 37 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 96a8af76..f78107aa 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,6 +12,7 @@ import hyper import hyper.http11.connection import pytest +from contextlib import contextmanager from mock import patch from h2.frame_buffer import FrameBuffer from hyper.compat import ssl @@ -64,17 +65,30 @@ def frame_buffer(): return buffer +@contextmanager +def reusable_frame_buffer(buffer): + # FrameBuffer does not return new iterator for iteration. + data = buffer.data + yield buffer + buffer.data = data + + def receive_preamble(sock): # Receive the HTTP/2 'preamble'. - first = sock.recv(65535) + client_preface = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' + timeout = time.time() + 5 + got = b'' + while len(got) < len(client_preface) and time.time() < timeout: + got += sock.recv(len(client_preface) - len(got)) + + assert got == client_preface, "client preface mismatch" - # Work around some bugs: if the first message received was only the PRI - # string, aim to receive a settings frame as well. - if len(first) <= len(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'): - sock.recv(65535) + # Send server side HTTP/2 preface sock.send(SettingsFrame(0).serialize()) - sock.recv(65535) - return + # Drain to let the client proceed. + # Note that in the lower socket level, this method is not + # just doing "receive". + return sock.recv(65535) @patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS) @@ -138,7 +152,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.connect() - send_event.wait() + send_event.wait(5) # Get the chunk of data after the preamble and decode it into frames. # We actually expect two, but only the second one contains ENABLE_PUSH. @@ -242,7 +256,7 @@ def socket_handler(listener): f = SettingsFrame(0) sock.send(f.serialize()) - send_event.wait() + send_event.wait(5) sock.recv(65535) sock.close() @@ -260,6 +274,7 @@ def socket_handler(listener): def test_closed_responses_remove_their_streams_from_conn(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -270,6 +285,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 @@ -282,6 +299,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Close the response. @@ -296,6 +314,7 @@ def socket_handler(listener): def test_receiving_responses_with_no_body(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -306,6 +325,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has no body f = build_headers_frame( [(':status', '204'), ('content-length', '0')] @@ -321,6 +342,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -338,6 +360,7 @@ def socket_handler(listener): def test_receiving_trailers(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -350,6 +373,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame( [(':status', '200'), ('content-length', '14')], @@ -372,12 +397,13 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -396,13 +422,14 @@ def socket_handler(listener): assert len(resp.trailers) == 2 # Awesome, we're done now. - recv_event.wait(5) + recv_event.set() self.tear_down() def test_receiving_trailers_before_reading(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() wait_event = threading.Event() @@ -416,6 +443,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. f = build_headers_frame( [(':status', '200'), ('content-length', '14')], @@ -449,6 +478,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -647,6 +677,7 @@ def test_resetting_stream_with_frames_in_flight(self): """ self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -657,6 +688,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has no # body. f = build_headers_frame( @@ -673,6 +706,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() stream_id = conn.request('GET', '/') + req_event.set() # Now, trigger the RST_STREAM frame by closing the stream. conn._send_rst_frame(stream_id, 0) @@ -696,6 +730,7 @@ def test_stream_can_be_reset_multiple_times(self): """ self.set_up() + req_event = threading.Event() recv_event = threading.Event() def socket_handler(listener): @@ -706,6 +741,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send two RST_STREAM frames. for _ in range(0, 2): f = RstStreamFrame(1) @@ -718,6 +755,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() # Now, eat the Rst frames. These should not cause an exception. conn._single_read() @@ -737,6 +775,7 @@ def socket_handler(listener): def test_read_chunked_http2(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() wait_event = threading.Event() @@ -748,6 +787,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has a body. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 @@ -777,6 +818,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -805,6 +847,7 @@ def socket_handler(listener): def test_read_delayed(self): self.set_up() + req_event = threading.Event() recv_event = threading.Event() wait_event = threading.Event() @@ -816,6 +859,8 @@ def socket_handler(listener): receive_preamble(sock) sock.recv(65535) + # Wait for request + req_event.wait(5) # Now, send the headers for the response. This response has a body. f = build_headers_frame([(':status', '200')]) f.stream_id = 1 @@ -845,6 +890,7 @@ def socket_handler(listener): self._start_server(socket_handler) conn = self.get_connection() conn.request('GET', '/') + req_event.set() resp = conn.get_response() # Confirm the status code. @@ -958,6 +1004,8 @@ def socket_handler(listener): receive_preamble(sock) + # Wait for the message from the main thread. + send_event.wait() # Send the headers for the response. This response has no body. f = build_headers_frame( [(':status', '200'), ('content-length', '0')] @@ -965,9 +1013,6 @@ def socket_handler(listener): f.flags.add('END_STREAM') f.stream_id = 1 sock.sendall(f.serialize()) - - # Wait for the message from the main thread. - send_event.wait() sock.close() self._start_server(socket_handler) @@ -996,7 +1041,7 @@ def socket_handler(listener): data += sock.recv(65535) assert b'upgrade: h2c\r\n' in data - send_event.wait() + send_event.wait(5) # We need to send back a response. resp = ( @@ -1038,7 +1083,7 @@ class TestRequestsAdapter(SocketLevelTest): # This uses HTTP/2. h2 = True - def test_adapter_received_values(self, monkeypatch): + def test_adapter_received_values(self, monkeypatch, frame_buffer): self.set_up() # We need to patch the ssl_wrap_socket method to ensure that we @@ -1051,17 +1096,20 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - data = [] - send_event = threading.Event() - def socket_handler(listener): sock = listener.accept()[0] # Do the handshake: conn header, settings, send settings, recv ack. - receive_preamble(sock) + frame_buffer.add_data(receive_preamble(sock)) # Now expect some data. One headers frame. - data.append(sock.recv(65535)) + req_wait = True + while req_wait: + frame_buffer.add_data(sock.recv(65535)) + with reusable_frame_buffer(frame_buffer) as fr: + for f in fr: + if isinstance(f, HeadersFrame): + req_wait = False # Respond! h = HeadersFrame(1) @@ -1078,8 +1126,6 @@ def socket_handler(listener): d.data = b'1234567890' * 2 d.flags.add('END_STREAM') sock.send(d.serialize()) - - send_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1093,11 +1139,9 @@ def socket_handler(listener): assert r.headers[b'Content-Type'] == b'not/real' assert r.content == b'1234567890' * 2 - send_event.set() - self.tear_down() - def test_adapter_sending_values(self, monkeypatch): + def test_adapter_sending_values(self, monkeypatch, frame_buffer): self.set_up() # We need to patch the ssl_wrap_socket method to ensure that we @@ -1110,17 +1154,20 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - data = [] - def socket_handler(listener): sock = listener.accept()[0] # Do the handshake: conn header, settings, send settings, recv ack. - receive_preamble(sock) + frame_buffer.add_data(receive_preamble(sock)) # Now expect some data. One headers frame and one data frame. - data.append(sock.recv(65535)) - data.append(sock.recv(65535)) + req_wait = True + while req_wait: + frame_buffer.add_data(sock.recv(65535)) + with reusable_frame_buffer(frame_buffer) as fr: + for f in fr: + if isinstance(f, DataFrame): + req_wait = False # Respond! h = HeadersFrame(1) @@ -1137,7 +1184,6 @@ def socket_handler(listener): d.data = b'1234567890' * 2 d.flags.add('END_STREAM') sock.send(d.serialize()) - sock.close() self._start_server(socket_handler) @@ -1152,11 +1198,10 @@ def socket_handler(listener): # Assert about the sent values. assert r.status_code == 200 - f = decode_frame(data[0]) - assert isinstance(f, HeadersFrame) + frames = list(frame_buffer) + assert isinstance(frames[-2], HeadersFrame) - f = decode_frame(data[1]) - assert isinstance(f, DataFrame) - assert f.data == b'hi there' + assert isinstance(frames[-1], DataFrame) + assert frames[-1].data == b'hi there' self.tear_down() From a974eec5f3f2f110ab78e797a703f1adb30bd109 Mon Sep 17 00:00:00 2001 From: Hiroaki KAWAI Date: Thu, 23 Feb 2017 16:04:55 +0000 Subject: [PATCH 3/5] update test introduced FrameEncoderMixin helper class. fix waits for not displaying unwanted EOF errors. --- test/test_hyper.py | 49 ++++++------------ test/test_integration.py | 105 ++++++++++++++++++++++++--------------- 2 files changed, 80 insertions(+), 74 deletions(-) diff --git a/test/test_hyper.py b/test/test_hyper.py index 09847511..27f4e7b2 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -48,6 +48,7 @@ def frame_buffer(): class TestHyperConnection(object): + def test_connections_accept_hosts_and_ports(self): c = HTTP20Connection(host='www.google.com', port=8080) assert c.host == 'www.google.com' @@ -499,6 +500,7 @@ def test_headers_with_continuation(self): def test_send_tolerate_peer_gone(self): class ErrorSocket(DummySocket): + def sendall(self, data): raise socket.error(errno.EPIPE) @@ -700,7 +702,8 @@ def test_incrementing_window_after_close(self): assert len(originally_sent_data) + 1 == len(c._sock.queue) -class TestServerPush(object): +class FrameEncoderMixin(object): + def setup_method(self, method): self.frames = [] self.encoder = Encoder() @@ -732,6 +735,9 @@ def add_data_frame(self, stream_id, data, end_stream=False): frame.flags.add('END_STREAM') self.frames.append(frame) + +class TestServerPush(FrameEncoderMixin): + def request(self, enable_push=True): self.conn = HTTP20Connection('www.google.com', enable_push=enable_push) self.conn._sock = DummySocket() @@ -957,6 +963,7 @@ def test_pushed_requests_ignore_unexpected_headers(self): class TestResponse(object): + def test_status_is_stripped_from_headers(self): headers = HTTPHeaderMap([(':status', '200')]) resp = HTTP20Response(headers, None) @@ -1107,6 +1114,7 @@ def test_response_version(self): class TestHTTP20Adapter(object): + def test_adapter_reuses_connections(self): a = HTTP20Adapter() conn1 = a.get_connection('http2bin.org', 80, 'http') @@ -1130,6 +1138,7 @@ def test_adapter_accept_client_certificate(self): class TestUtilities(object): + def test_combining_repeated_headers(self): test_headers = [ (b'key1', b'val1'), @@ -1303,44 +1312,14 @@ def test_resetting_streams_after_close(self): c._single_read() -class TestUpgradingPush(object): +class TestUpgradingPush(FrameEncoderMixin): http101 = (b"HTTP/1.1 101 Switching Protocols\r\n" b"Connection: upgrade\r\n" b"Upgrade: h2c\r\n" b"\r\n") - def setup_method(self, method): - self.frames = [SettingsFrame(0)] # Server side preface - self.encoder = Encoder() - self.conn = None - - def add_push_frame(self, stream_id, promised_stream_id, headers, - end_block=True): - frame = PushPromiseFrame(stream_id) - frame.promised_stream_id = promised_stream_id - frame.data = self.encoder.encode(headers) - if end_block: - frame.flags.add('END_HEADERS') - self.frames.append(frame) - - def add_headers_frame(self, stream_id, headers, end_block=True, - end_stream=False): - frame = HeadersFrame(stream_id) - frame.data = self.encoder.encode(headers) - if end_block: - frame.flags.add('END_HEADERS') - if end_stream: - frame.flags.add('END_STREAM') - self.frames.append(frame) - - def add_data_frame(self, stream_id, data, end_stream=False): - frame = DataFrame(stream_id) - frame.data = data - if end_stream: - frame.flags.add('END_STREAM') - self.frames.append(frame) - def request(self, enable_push=True): + self.frames = [SettingsFrame(0)] + self.frames # Server side preface self.conn = HTTPConnection('www.google.com', enable_push=enable_push) self.conn._conn._sock = DummySocket() self.conn._conn._sock.buffer = BytesIO( @@ -1487,6 +1466,7 @@ def test_reset_pushed_streams_when_push_disabled(self): # Some utility classes for the tests. class NullEncoder(object): + @staticmethod def encode(headers): @@ -1503,6 +1483,7 @@ def to_str(v): class FixedDecoder(object): + def __init__(self, result): self.result = result @@ -1511,6 +1492,7 @@ def decode(self, headers): class DummySocket(object): + def __init__(self): self.queue = [] self._buffer = BytesIO() @@ -1548,6 +1530,7 @@ def fill(self): class DummyStream(object): + def __init__(self, data, trailers=None): self.data = data self.data_frames = [] diff --git a/test/test_integration.py b/test/test_integration.py index f78107aa..273b4c18 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -255,9 +255,9 @@ def socket_handler(listener): # We need to send back a SettingsFrame. f = SettingsFrame(0) sock.send(f.serialize()) + sock.recv(65535) send_event.wait(5) - sock.recv(65535) sock.close() self._start_server(socket_handler) @@ -293,7 +293,7 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -305,7 +305,7 @@ def socket_handler(listener): # Close the response. resp.close() - recv_event.wait(5) + recv_event.set() assert not conn.streams @@ -336,7 +336,7 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -353,8 +353,7 @@ def socket_handler(listener): assert resp._stream._in_window_manager.document_size == 0 # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_receiving_trailers(self): @@ -423,15 +422,14 @@ def socket_handler(listener): # Awesome, we're done now. recv_event.set() - self.tear_down() def test_receiving_trailers_before_reading(self): self.set_up() req_event = threading.Event() - recv_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -472,7 +470,7 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -500,8 +498,7 @@ def socket_handler(listener): assert resp._stream._in_window_manager.document_size == 14 # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_clean_shut_down(self): @@ -522,7 +519,7 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -533,8 +530,7 @@ def socket_handler(listener): assert conn._sock is None # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_unexpected_shut_down(self): @@ -555,8 +551,8 @@ def socket_handler(listener): sock.send(f.serialize()) # Wait for the message from the main thread. + recv_event.wait(5) sock.close() - recv_event.set() self._start_server(socket_handler) conn = self.get_connection() @@ -568,15 +564,15 @@ def socket_handler(listener): assert conn._sock is None # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_insecure_connection(self): self.set_up(secure=False) data = [] - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -584,7 +580,7 @@ def socket_handler(listener): receive_preamble(sock) data.append(sock.recv(65535)) - send_event.wait(5) + req_event.wait(5) h = HeadersFrame(1) h.data = self.get_encoder().encode( @@ -603,12 +599,13 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) + recv_event.wait(5) sock.close() self._start_server(socket_handler) c = self.get_connection() c.request('GET', '/') - send_event.set() + req_event.set() r = c.get_response() assert r.status == 200 @@ -619,13 +616,15 @@ def socket_handler(listener): assert r.read() == b'nsaislistening' + recv_event.set() self.tear_down() def test_proxy_connection(self): self.set_up(proxy=True) data = [] - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -633,7 +632,7 @@ def socket_handler(listener): receive_preamble(sock) data.append(sock.recv(65535)) - send_event.wait(5) + req_event.wait(5) h = HeadersFrame(1) h.data = self.get_encoder().encode( @@ -652,12 +651,13 @@ def socket_handler(listener): d.flags.add('END_STREAM') sock.send(d.serialize()) + recv_event.wait(5) sock.close() self._start_server(socket_handler) c = self.get_connection() c.request('GET', '/') - send_event.set() + req_event.set() r = c.get_response() assert r.status == 200 @@ -668,6 +668,7 @@ def socket_handler(listener): assert r.read() == b'thisisaproxy' + recv_event.set() self.tear_down() def test_resetting_stream_with_frames_in_flight(self): @@ -720,7 +721,6 @@ def socket_handler(listener): # Awesome, we're done now. recv_event.set() - self.tear_down() def test_stream_can_be_reset_multiple_times(self): @@ -812,7 +812,7 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -840,7 +840,7 @@ def socket_handler(listener): assert third_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) + recv_event.set() self.tear_down() @@ -848,8 +848,8 @@ def test_read_delayed(self): self.set_up() req_event = threading.Event() - recv_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -884,7 +884,7 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -904,15 +904,14 @@ def socket_handler(listener): assert second_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_upgrade(self): self.set_up(secure=False) - recv_event = threading.Event() wait_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -961,7 +960,7 @@ def socket_handler(listener): sock.sendall(f.serialize()) # Wait for the message from the main thread. - recv_event.set() + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -980,8 +979,7 @@ def socket_handler(listener): assert second_chunk == b'world' # Awesome, we're done now. - recv_event.wait(5) - + recv_event.set() self.tear_down() def test_version_after_tls_upgrade(self, monkeypatch): @@ -997,15 +995,16 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] receive_preamble(sock) - # Wait for the message from the main thread. - send_event.wait() + # Wait for the request + req_event.wait(5) # Send the headers for the response. This response has no body. f = build_headers_frame( [(':status', '200'), ('content-length', '0')] @@ -1013,6 +1012,9 @@ def socket_handler(listener): f.flags.add('END_STREAM') f.stream_id = 1 sock.sendall(f.serialize()) + + # wait for the message from the main thread + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1021,16 +1023,18 @@ def socket_handler(listener): assert c.version is HTTPVersion.http11 assert c.version is not HTTPVersion.http20 c.request('GET', '/') - send_event.set() + req_event.set() assert c.version is HTTPVersion.http20 + recv_event.set() self.tear_down() def test_version_after_http_upgrade(self): self.set_up() self.secure = False - send_event = threading.Event() + req_event = threading.Event() + recv_event = threading.Event() def socket_handler(listener): sock = listener.accept()[0] @@ -1041,7 +1045,7 @@ def socket_handler(listener): data += sock.recv(65535) assert b'upgrade: h2c\r\n' in data - send_event.wait(5) + req_event.wait(5) # We need to send back a response. resp = ( @@ -1065,15 +1069,22 @@ def socket_handler(listener): f.flags.add('END_STREAM') sock.sendall(f.serialize()) + # keep the socket open for clean shutdown + recv_event.wait(5) + sock.close() + self._start_server(socket_handler) c = hyper.HTTPConnection(self.host, self.port) assert c.version is HTTPVersion.http11 + c.request('GET', '/') - send_event.set() + req_event.set() + resp = c.get_response() assert c.version is HTTPVersion.http20 assert resp.version is HTTPVersion.http20 + recv_event.set() self.tear_down() @@ -1096,6 +1107,8 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) + recv_event = threading.Event() + def socket_handler(listener): sock = listener.accept()[0] @@ -1126,6 +1139,9 @@ def socket_handler(listener): d.data = b'1234567890' * 2 d.flags.add('END_STREAM') sock.send(d.serialize()) + + # keep the socket open for clean shutdown + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1139,6 +1155,7 @@ def socket_handler(listener): assert r.headers[b'Content-Type'] == b'not/real' assert r.content == b'1234567890' * 2 + recv_event.set() self.tear_down() def test_adapter_sending_values(self, monkeypatch, frame_buffer): @@ -1154,6 +1171,8 @@ def wrap(*args): monkeypatch.setattr(hyper.http11.connection, 'wrap_socket', wrap) + recv_event = threading.Event() + def socket_handler(listener): sock = listener.accept()[0] @@ -1184,6 +1203,9 @@ def socket_handler(listener): d.data = b'1234567890' * 2 d.flags.add('END_STREAM') sock.send(d.serialize()) + + # keep the socket open for clean shutdown + recv_event.wait(5) sock.close() self._start_server(socket_handler) @@ -1204,4 +1226,5 @@ def socket_handler(listener): assert isinstance(frames[-1], DataFrame) assert frames[-1].data == b'hi there' + recv_event.set() self.tear_down() From 8089d972a2e0e72d20700de043ea8991e0cc1244 Mon Sep 17 00:00:00 2001 From: Hiroaki KAWAI Date: Thu, 23 Feb 2017 16:50:40 +0000 Subject: [PATCH 4/5] revert autopep8 blank lines --- test/test_hyper.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/test/test_hyper.py b/test/test_hyper.py index 27f4e7b2..66ff6f42 100644 --- a/test/test_hyper.py +++ b/test/test_hyper.py @@ -48,7 +48,6 @@ def frame_buffer(): class TestHyperConnection(object): - def test_connections_accept_hosts_and_ports(self): c = HTTP20Connection(host='www.google.com', port=8080) assert c.host == 'www.google.com' @@ -500,7 +499,6 @@ def test_headers_with_continuation(self): def test_send_tolerate_peer_gone(self): class ErrorSocket(DummySocket): - def sendall(self, data): raise socket.error(errno.EPIPE) @@ -703,7 +701,6 @@ def test_incrementing_window_after_close(self): class FrameEncoderMixin(object): - def setup_method(self, method): self.frames = [] self.encoder = Encoder() @@ -737,7 +734,6 @@ def add_data_frame(self, stream_id, data, end_stream=False): class TestServerPush(FrameEncoderMixin): - def request(self, enable_push=True): self.conn = HTTP20Connection('www.google.com', enable_push=enable_push) self.conn._sock = DummySocket() @@ -963,7 +959,6 @@ def test_pushed_requests_ignore_unexpected_headers(self): class TestResponse(object): - def test_status_is_stripped_from_headers(self): headers = HTTPHeaderMap([(':status', '200')]) resp = HTTP20Response(headers, None) @@ -1114,7 +1109,6 @@ def test_response_version(self): class TestHTTP20Adapter(object): - def test_adapter_reuses_connections(self): a = HTTP20Adapter() conn1 = a.get_connection('http2bin.org', 80, 'http') @@ -1138,7 +1132,6 @@ def test_adapter_accept_client_certificate(self): class TestUtilities(object): - def test_combining_repeated_headers(self): test_headers = [ (b'key1', b'val1'), @@ -1466,7 +1459,6 @@ def test_reset_pushed_streams_when_push_disabled(self): # Some utility classes for the tests. class NullEncoder(object): - @staticmethod def encode(headers): @@ -1483,7 +1475,6 @@ def to_str(v): class FixedDecoder(object): - def __init__(self, result): self.result = result @@ -1492,7 +1483,6 @@ def decode(self, headers): class DummySocket(object): - def __init__(self): self.queue = [] self._buffer = BytesIO() @@ -1530,7 +1520,6 @@ def fill(self): class DummyStream(object): - def __init__(self, data, trailers=None): self.data = data self.data_frames = [] From 3a02fe19df0791b7d195e5a784d7313631c92d6a Mon Sep 17 00:00:00 2001 From: Hiroaki KAWAI Date: Thu, 23 Feb 2017 17:09:53 +0000 Subject: [PATCH 5/5] fix socket.recv usage --- test/test_integration.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 273b4c18..72da6156 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -76,10 +76,11 @@ def reusable_frame_buffer(buffer): def receive_preamble(sock): # Receive the HTTP/2 'preamble'. client_preface = b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n' - timeout = time.time() + 5 got = b'' - while len(got) < len(client_preface) and time.time() < timeout: - got += sock.recv(len(client_preface) - len(got)) + while len(got) < len(client_preface): + tmp = sock.recv(len(client_preface) - len(got)) + assert len(tmp) > 0, "unexpected EOF" + got += tmp assert got == client_preface, "client preface mismatch"