From d2d05e07488e03cb8808c3709a0da65d3d746371 Mon Sep 17 00:00:00 2001 From: Anon Ray Date: Sun, 29 Mar 2020 04:57:58 +0530 Subject: [PATCH] support HTTP and architectural refactor - add HTTP support too. Modular transport design - required architectural refactor --- Pipfile | 13 ++ Pipfile.lock | 72 ++++++++++ README.md | 126 +++++++++++++----- example.py | 48 ------- examples/http_example.py | 25 ++++ examples/ws_example.py | 39 ++++++ graphql_client/__init__.py | 158 +--------------------- graphql_client/client.py | 37 ++++++ graphql_client/transport/__init__.py | 3 + graphql_client/transport/base.py | 18 +++ graphql_client/transport/http.py | 38 ++++++ graphql_client/transport/websocket.py | 182 ++++++++++++++++++++++++++ setup.py | 8 +- 13 files changed, 529 insertions(+), 238 deletions(-) create mode 100644 Pipfile create mode 100644 Pipfile.lock delete mode 100644 example.py create mode 100644 examples/http_example.py create mode 100644 examples/ws_example.py create mode 100644 graphql_client/client.py create mode 100644 graphql_client/transport/__init__.py create mode 100644 graphql_client/transport/base.py create mode 100644 graphql_client/transport/http.py create mode 100644 graphql_client/transport/websocket.py diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..1db1a44 --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +websocket-client = "*" +requests = "*" + +[requires] +python_version = "3.8" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..51093bb --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,72 @@ +{ + "_meta": { + "hash": { + "sha256": "0f7fe85a3c8c88cf2fd64f5f9793f2e6147ee0cdac8a4f359cee402b4237691a" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.8" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "certifi": { + "hashes": [ + "sha256:017c25db2a153ce562900032d5bc68e9f191e44e9a0f762f373977de9df1fbb3", + "sha256:25b64c7da4cd7479594d035c08c2d809eb4aab3a26e5a990ea98cc450c320f1f" + ], + "version": "==2019.11.28" + }, + "chardet": { + "hashes": [ + "sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", + "sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691" + ], + "version": "==3.0.4" + }, + "idna": { + "hashes": [ + "sha256:7588d1c14ae4c77d74036e8c22ff447b26d0fde8f007354fd48a7814db15b7cb", + "sha256:a068a21ceac8a4d63dbfd964670474107f541babbd2250d61922f029858365fa" + ], + "version": "==2.9" + }, + "requests": { + "hashes": [ + "sha256:43999036bfa82904b6af1d99e4882b560e5e2c68e5c4b0aa03b655f3d7d73fee", + "sha256:b3f43d496c6daba4493e7c431722aeb7dbc6288f52a6e04e7b6023b0247817e6" + ], + "index": "pypi", + "version": "==2.23.0" + }, + "six": { + "hashes": [ + "sha256:236bdbdce46e6e6a3d61a337c0f8b763ca1e8717c03b369e87a7ec7ce1319c0a", + "sha256:8f3cd2e254d8f793e7f3d6d9df77b92252b52637291d0f0da013c76ea2724b6c" + ], + "version": "==1.14.0" + }, + "urllib3": { + "hashes": [ + "sha256:2f3db8b19923a873b3e5256dc9c2dedfa883e33d87c690d9c7913e1f40673cdc", + "sha256:87716c2d2a7121198ebcb7ce7cccf6ce5e9ba539041cfbaeecfb641dc0bf6acc" + ], + "version": "==1.25.8" + }, + "websocket-client": { + "hashes": [ + "sha256:0fc45c961324d79c781bab301359d5a1b00b13ad1b10415a4780229ef71a5549", + "sha256:d735b91d6d1692a6a181f2a8c9e0238e5f6373356f561bb9dc4c7af36f452010" + ], + "index": "pypi", + "version": "==0.57.0" + } + }, + "develop": {} +} diff --git a/README.md b/README.md index ab57a71..df19fdf 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # py-graphql-client -Dead-simple to use GraphQL client over websocket. Using the -[apollo-transport-ws](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md) -protocol. + +Dead-simple to use GraphQL client over websocket. Using the [apollo-transport-ws](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md) protocol. Supports HTTP too. ## Install @@ -11,52 +10,47 @@ pip install py-graphql-client ## Examples -### Setup subscriptions super easily +### Super easy to setup and run subscriptions ```python -from graphql_client import GraphQLClient +from graphql_client import GraphQLClient, WebsocketTransport + +transport = WebsocketTransport('ws://localhost:8080/graphql') +client = GraphQLClient(transport) -ws = GraphQLClient('ws://localhost:8080/graphql') def callback(_id, data): - print("got new data..") - print(f"msg id: {_id}. data: {data}") + print(f"Got new data on: Operation ID: {_id}. Data: {data}") -query = """ - subscription { - notifications { +latest_notifications = """ + subscription getLatestNotifications($limit: Int!) { + notifications (last: $limit) { id title content } } """ -sub_id = ws.subscribe(query, callback=callback) +sub_id = client.subscribe(latest_notifications, variables={'limit': 5}, callback=callback) + ... # later stop the subscription -ws.stop_subscribe(sub_id) -ws.close() +client.stop_subscription(sub_id) +client.close() ``` -### Variables can be passed +### Set session context via HTTP headers ```python -from graphql_client import GraphQLClient +from graphql_client import GraphQLClient, WebsocketTransport -ws = GraphQLClient('ws://localhost:8080/graphql') -def callback(_id, data): - print("got new data..") - print(f"msg id: {_id}. data: {data}") +transport = WebsocketTransport('ws://localhost:8080/graphql') +client = GraphQLClient(transport) -query = """ - subscription ($limit: Int!) { - notifications (order_by: {created: "desc"}, limit: $limit) { - id - title - content - } - } -""" -sub_id = ws.subscribe(query, variables={'limit': 10}, callback=callback) +... + +client.set_session(headers={'Authorization': 'Bearer xxxxx'}) + +sub_id = client.subscribe(latest_notifications, variables={'limit': 5}, callback=callback) ``` ### Normal queries and mutations work too @@ -64,7 +58,9 @@ sub_id = ws.subscribe(query, variables={'limit': 10}, callback=callback) ```python from graphql_client import GraphQLClient -ws = GraphQLClient('ws://localhost:8080/graphql') +transport = WebsocketTransport('ws://localhost:8080/graphql') +client = GraphQLClient('ws://localhost:8080/graphql') + query = """ query ($limit: Int!) { notifications (order_by: {created: "desc"}, limit: $limit) { @@ -76,11 +72,75 @@ query = """ """ res = ws.query(query, variables={'limit': 10}) print(res) -ws.close() + +mutation = """ + mutation ($username: String!) { + registerUser (username: $username) { + userId + username + } + } +""" +res = client.mutate(mutation, variables={'username': 'alice'}) +print(res) + +client.close() ``` +### Using HTTP transport + +```python +from graphql_client import GraphQLClient, HttpTransport + +# first create a `HttpTransport` +transport = HttpTransport('https://countries.trevorblades.com/') +# then create a `GraphQLClient` which uses the `HttpTransport` +client = GraphQLClient(transport) + +query = """ +query getCountry($code: ID!) { + country(code: $code) { + code + name + capital + } +} +""" + +client.set_session(headers={'Authorization': 'Bearer xxxx'}) +res = client.query(query, variables={'code': 'IN'}) +print(res) + +# unset all headers +client.set_session(headers=None) +res = client.query(query, variables={'code': 'IN'}) +print(res) + +# mutations work too + +create_user = """ + mutation createUser($username: String!, $password: String!) { + insertUser(username: $username, password: $password) { + userId + username + } + } +""" +res = client.mutate(create_user, variables={'username': 'alice', 'password': 'p@55w0rd'}) +print(res) + +client.close() +``` ## TODO - tests -- support http as well - should use asyncio websocket library? + + diff --git a/example.py b/example.py deleted file mode 100644 index e61e3e9..0000000 --- a/example.py +++ /dev/null @@ -1,48 +0,0 @@ -import time -import websocket -from graphql_client import GraphQLClient - -# some sample GraphQL server which supports websocket transport and subscription - ws = GraphQLClient('ws://localhost:5000/graphql') - -## Simple Query Example ## - -# query example with GraphQL variables -query = """ -query getUser($userId: Int!) { - user (id: $userId) { - id - username - } -} -""" - -# This is a blocking call, you receive response in the `res` variable - -res = ws.query(q1, variables={'userId': 2}) -print(res) - - -## Subscription Example ## - -subscription_query = """ -subscription getUser { - user (id: 2) { - id - username - } -} -""" - -def my_callback(_id, data): - print(f"Got data for Sub ID: {_id}. Data: {data}") - -# sub_id = ws.subscribe(subscription_query, callback=my_callback) -sub_id = ws.subscribe(s1, variables={'userId': 2}, callback=my_callback) - -# do some operation while the subscription is running... -print(f'started subscriptions, will keep it alive for 4 seconds') -time.sleep(4) -print(f'4 seconds over, stopping the subscription') -ws.stop_subscribe(sub_id) -ws.close() diff --git a/examples/http_example.py b/examples/http_example.py new file mode 100644 index 0000000..e987180 --- /dev/null +++ b/examples/http_example.py @@ -0,0 +1,25 @@ +from graphql_client import GraphQLClient, HttpTransport + +# first create a `HttpTransport` +transport = HttpTransport('https://countries.trevorblades.com/') + +# then create a `GraphQLClient` which uses the `HttpTransport` +client = GraphQLClient(transport) + +query = """ +query getCountry($code: ID!) { + country(code: $code) { + code + name + capital + } +} +""" + +# changing session context via headers +client.set_session(headers={'Authorization': 'Bearer xxxx'}) + +# # This is a blocking call, you receive response in the `res` variable +res = client.query(query, variables={'code': 'IN'}) +print(res) +client.close() diff --git a/examples/ws_example.py b/examples/ws_example.py new file mode 100644 index 0000000..1695f44 --- /dev/null +++ b/examples/ws_example.py @@ -0,0 +1,39 @@ +import time +import random + +from graphql_client import GraphQLClient, WebsocketTransport + +transport = WebsocketTransport('wss://hge-testing.herokuapp.com/v1/graphql') +client = GraphQLClient(transport) + +query = """ +query getUser($userId: Int!) { + users(where:{u_id: {_eq: $userId}}) { + u_id + u_name + } +} +""" + +res = client.query(query, variables={'userId': 2}) +print(res) + +subscription_query = """ +subscription getUser { + user (id: 2) { + id + username + } +} +""" + +def my_callback(_id, data): + print(f": Got data for Sub ID: {_id}. Data: {data}") + +sub_id = client.subscribe(subscription_query, variables={'userId': 2}, callback=my_callback) + +# do some operation while the subscription is running... +time.sleep(3) + +client.stop_subscription(sub_id) +client.close() diff --git a/graphql_client/__init__.py b/graphql_client/__init__.py index cf1b641..6467330 100644 --- a/graphql_client/__init__.py +++ b/graphql_client/__init__.py @@ -1,156 +1,8 @@ # -*- coding: utf-8 -*- +"""A simple GraphQL client that works over both Websocket and HTTP as the transport protocol. +Over HTTP it follows the general convention/protocol: https://graphql.github.io/learn/serving-over-http/ +Over websocket it follows the apollo protocol: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md """ -A simple GraphQL client that works over Websocket as the transport -protocol, instead of HTTP. -This follows the Apollo protocol. -https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md -""" - -import sys -import string -import random -import json -import threading - -import websocket - - -GQL_WS_SUBPROTOCOL = "graphql-ws" - -# all the message types -GQL_CONNECTION_INIT = 'connection_init' -GQL_START = 'start' -GQL_STOP = 'stop' -GQL_CONNECTION_TERMINATE = 'connection_terminate' -GQL_CONNECTION_ERROR = 'connection_error' -GQL_CONNECTION_ACK = 'connection_ack' -GQL_DATA = 'data' -GQL_ERROR = 'error' -GQL_COMPLETE = 'complete' -GQL_CONNECTION_KEEP_ALIVE = 'ka' - - -class ConnectionException(Exception): - """Exception thrown during connection errors to the GraphQL server""" - - -class GraphQLClient(): - """ - A simple GraphQL client that works over Websocket as the transport - protocol, instead of HTTP. - This follows the Apollo protocol. - https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md - """ - - def __init__(self, url): - self.ws_url = url - self._connection = websocket.create_connection(self.ws_url, - on_message=self._on_message, - subprotocols=[GQL_WS_SUBPROTOCOL]) - self._connection.on_message = self._on_message - self._subscription_running = False - self._st_id = None - - def _on_message(self, message, skip_ka=True): - """ default callback handler when message arrives """ - data = json.loads(message) - # skip keepalive messages - if not(skip_ka and data['type'] == GQL_CONNECTION_KEEP_ALIVE): - print(message) - - # wait for any valid message, while ignoring GQL_CONNECTION_KEEP_ALIVE - def _receive(self): - """the recieve function of the client. Which validates response from the - server and returns data """ - res = self._connection.recv() - try: - msg = json.loads(res) - except json.JSONDecodeError as err: - print(f'Server sent invalid JSON data: {res} \n {err}') - sys.exit(1) - if msg['type'] != GQL_CONNECTION_KEEP_ALIVE: - return msg - # FIXME: this can lead to potential inifinite loop when server doesn't - # send anything other than GQL_CONNECTION_KEEP_ALIVE - return self._receive() - - def _connection_init(self, headers=None): - # send the `connection_init` message with the payload - payload = {'type': 'connection_init', 'payload': {'headers': headers}} - self._connection.send(json.dumps(payload)) - - res = self._receive() - if res['type'] == 'connection_error': - err = res['payload'] if 'payload' in res else 'unknown error' - raise ConnectionException(err) - if res['type'] == 'connection_ack': - return None - - err_msg = "Unknown message from server, this client did not understand. " + \ - "Original message: " + res['type'] - raise ConnectionException(err_msg) - - def _start(self, payload): - _id = gen_id() - frame = {'id': _id, 'type': GQL_START, 'payload': payload} - self._connection.send(json.dumps(frame)) - return _id - - def _stop(self, _id): - # print('inside _stop()') - payload = {'id': _id, 'type': GQL_STOP} - self._connection.send(json.dumps(payload)) - # print(f'sent message {GQL_STOP}') - # print('inside _stop :: recving...') - resp = self._connection.recv() - # print('inside _stop :: recvd resp', resp) - return resp - - def query(self, query, variables=None, headers=None): - self._connection_init(headers) - payload = {'headers': headers, 'query': query, 'variables': variables} - _id = self._start(payload) - res = self._receive() - self._stop(_id) - return res - - - def _subscription_recieve_thread(self, sub_id, callback): - # print('inside the subs method') - while self._subscription_running: - # print('subscription is running') - # print('recving...') - resp = self._connection.recv() - res = json.loads(resp) - # print('inside subs:: recved resp', resp) - if res['type'] == GQL_CONNECTION_KEEP_ALIVE: - continue - if res['type'] == GQL_ERROR or res['type'] == GQL_COMPLETE: - # print(res) - self.stop_subscribe(sub_id) - break - elif res['type'] == GQL_DATA: - callback(sub_id, res) - - def subscribe(self, query, variables=None, headers=None, callback=None): - self._connection_init(headers) - payload = {'headers': headers, 'query': query, 'variables': variables} - callback = self._on_message if not callback else callback - sub_id = self._start(payload) - self._subscription_running = True - self._st_id = threading.Thread(target=self._subscription_recieve_thread, args=(sub_id, callback,)) - self._st_id.start() - return sub_id - - def stop_subscribe(self, _id): - self._subscription_running = False - self._stop(_id) - self._st_id.join(3) - - def close(self): - self._connection.close() - -# generate random alphanumeric id -def gen_id(size=6, chars=string.ascii_letters + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) +from graphql_client.client import GraphQLClient +from graphql_client.transport import WebsocketTransport, HttpTransport diff --git a/graphql_client/client.py b/graphql_client/client.py new file mode 100644 index 0000000..a12ebb3 --- /dev/null +++ b/graphql_client/client.py @@ -0,0 +1,37 @@ +from graphql_client.transport import GraphQLTransport, WebsocketTransport, TransportException + +class GraphQLClient(): + def __init__(self, transport: GraphQLTransport) -> None: + self.transport = transport + self.headers = None + + def set_session(self, headers: dict = None) -> None: + self.headers = headers + self.transport.set_session(headers) + + def query(self, operation: str, operation_name: str = None, variables: dict = None) -> dict: + return self.transport.execute(operation, + operation_name=operation_name, + variables=variables) + + def mutate(self, operation: str, operation_name: str = None, variables: dict = None) -> dict: + return self.transport.execute(operation, + operation_name=operation_name, + variables=variables) + + def subscribe(self, operation: str, operation_name: str = None, variables: dict = None, + callback=None) -> str: + if not isinstance(self.transport, WebsocketTransport): + raise TransportException('Only `WebsocketTransport` can be used for subscriptions') + return self.transport.subscribe(operation, operation_name, variables, callback) + + def stop_subscription(self, sub_id: str) -> None: + if not isinstance(self.transport, WebsocketTransport): + raise TransportException('Only `WebsocketTransport` can be used for subscriptions') + self.transport.stop_subscription(sub_id) + + def stop(self) -> None: + self.transport.stop_all_operations() + + def close(self) -> None: + pass diff --git a/graphql_client/transport/__init__.py b/graphql_client/transport/__init__.py new file mode 100644 index 0000000..7d2b4e2 --- /dev/null +++ b/graphql_client/transport/__init__.py @@ -0,0 +1,3 @@ +from graphql_client.transport.base import TransportException, GraphQLTransport +from graphql_client.transport.websocket import WebsocketTransport +from graphql_client.transport.http import HttpTransport diff --git a/graphql_client/transport/base.py b/graphql_client/transport/base.py new file mode 100644 index 0000000..063ec6a --- /dev/null +++ b/graphql_client/transport/base.py @@ -0,0 +1,18 @@ +import abc + + +class TransportException(Exception): + """...""" + +class GraphQLTransport(abc.ABC): + @abc.abstractmethod + def set_session(self, headers: dict = None) -> None: + pass + + @abc.abstractmethod + def execute(self, operation: str, operation_name: str = None, variables: dict = None) -> dict: + pass + + @abc.abstractmethod + def stop_all_operations(self): + pass diff --git a/graphql_client/transport/http.py b/graphql_client/transport/http.py new file mode 100644 index 0000000..5baa6eb --- /dev/null +++ b/graphql_client/transport/http.py @@ -0,0 +1,38 @@ +import requests + +from graphql_client.transport.base import GraphQLTransport, TransportException + +class HttpTransport(GraphQLTransport): + + def __init__(self, url): + self.url = url + self.client = None + self.headers = None + + def set_session(self, headers=None): + self.headers = headers + self._make_client() + self.client.headers = headers + + def _make_client(self): + if not self.client: + self.client = requests.Session() + + def execute(self, operation: str, operation_name: str = None, variables: dict = None) -> dict: + self._make_client() + payload = { + 'query': operation, + 'variables': variables, + 'operation_name': operation_name + } + resp = self.client.post(self.url, json=payload) + + if resp.status_code <= 200 and resp.status_code >= 400: + err = f'Received non-200 HTTP response: {resp.status_code}. Body: {resp.text}' + raise TransportException(err) + + res = resp.json() + return res + + def stop_all_operations(self): + self.client.close() diff --git a/graphql_client/transport/websocket.py b/graphql_client/transport/websocket.py new file mode 100644 index 0000000..378708c --- /dev/null +++ b/graphql_client/transport/websocket.py @@ -0,0 +1,182 @@ +""" +The Websocket transport implementation for the apollo-ws-transport protocol +Apollo protocol: https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md +""" + +import string +import os +import random +import sys +import json +import logging +import threading + +import websocket + +from graphql_client.transport.base import GraphQLTransport, TransportException + +# subprotocol header +GQL_WS_SUBPROTOCOL = "graphql-ws" + +# all the protocol message types +# https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/message-types.ts +GQL_CONNECTION_INIT = 'connection_init' # Client -> Server +GQL_CONNECTION_ACK = 'connection_ack' # Server -> Client +GQL_CONNECTION_ERROR = 'connection_error' # Server -> Client + +GQL_CONNECTION_KEEP_ALIVE = 'ka' # Server -> Client + +GQL_CONNECTION_TERMINATE = 'connection_terminate' # Client -> Server +GQL_START = 'start' # Client -> Server +GQL_DATA = 'data' # Server -> Client +GQL_ERROR = 'error' # Server -> Client +GQL_STOP = 'stop' # Client -> Server +GQL_COMPLETE = 'complete' # Server -> Client + +DEBUG = os.getenv('DEBUG', False) +if DEBUG: + logging.basicConfig(level=logging.DEBUG) + + +class ConnectionException(Exception): + """Exception thrown during connection errors to the GraphQL server""" + + +class WebsocketClient(): + def __init__(self, url): + self.ws_url = url + self._connection = websocket.create_connection(self.ws_url, + subprotocols=[GQL_WS_SUBPROTOCOL]) + + def _start_server_receive_thread(self): + """start a thread, which keeps receiving messages from the server and puts it + in a queue""" + + while True: + _upstream_data = self._connection.recv() + try: + payload = json.loads(_upstream_data) + except json.JSONDecodeError as err: + # JUST BLOW UP! + print(f'Server sent invalid JSON data: {_upstream_data} \n {err}') + sys.exit(1) + print(payload) + + def send(self, payload): + self._connection.send(json.dumps(payload)) + + def receive(self): + return self._connection.recv() + + def close(self): + return self._connection.close() + + +class WebsocketTransport(GraphQLTransport): + def __init__(self, url): + self.server_url = url + self.client = None + self.headers = None + self._connection_init_done = False + self._operation_map = {} + + def _make_client(self): + if not self.client: + self.client = WebsocketClient(self.server_url) + + def _wait_for(self, message_types, operation_id=None, retries=10): + if retries == 0: + raise TransportException('unexpected error: retries over; no response from server') + resp = self.client.receive() + logging.debug(f'server frame <= {resp}') + res = json.loads(resp) + if operation_id and not self._operation_map[operation_id]['running']: + return res + if res['type'] in message_types: + return res + return self._wait_for(message_types, operation_id, retries=retries-1) + + def _send_msg(self, msg, payload=None, operation_id=None): + self._make_client() + frame = {'type': msg} + if payload: + frame['payload'] = payload + if operation_id: + frame['id'] = operation_id + logging.debug(f'client frame => {frame}') + self.client.send(frame) + + def set_session(self, headers=None): + self.headers = headers + self._send_msg(GQL_CONNECTION_INIT, payload={'headers': headers}) + res = self._wait_for([GQL_CONNECTION_ACK, GQL_CONNECTION_ERROR]) + if res['type'] == GQL_CONNECTION_ACK: + self._connection_init_done = True + elif res['type'] == GQL_CONNECTION_ERROR: + self._connection_init_done = False + raise ConnectionException(f'could not initialise session with headers: {headers}') + + def execute(self, operation, operation_name=None, variables=None) -> dict: + if not self._connection_init_done: + self.set_session() + + self._send_msg(GQL_START, operation_id=gen_id(), + payload={'query': operation, 'variables': variables, + 'operation_name': operation_name}) + + res = self._wait_for([GQL_DATA, GQL_ERROR, GQL_CONNECTION_ERROR]) + if res['type'] == GQL_DATA: + return res + if res['type'] == GQL_ERROR: + return res + if res['type'] == GQL_CONNECTION_ERROR: + print(f'unexpected connection error {res["payload"]}') + + def subscribe(self, operation, operation_name=None, variables=None, callback=None): + if not self._connection_init_done: + self.set_session() + + # logging.debug(f'Operation Map (before starting sub): {self._operation_map}') + op_id = gen_id() + self._send_msg(GQL_START, operation_id=op_id, + payload={'query': operation, 'variables': variables, + 'operation_name': operation_name}) + + thread_id = threading.Thread(target=self._subscription_recieve_thread, + args=(op_id, callback,)) + self._operation_map[op_id] = {'thread_id': thread_id, 'running': True} + # logging.debug(f'Operation Map (after starting sub): {self._operation_map}') + thread_id.start() + return op_id + + def _subscription_recieve_thread(self, op_id, callback): + while self._operation_map[op_id]['running']: + res = self._wait_for([GQL_DATA, GQL_ERROR, GQL_CONNECTION_ERROR, GQL_COMPLETE], + operation_id=op_id) + if res['type'] == GQL_DATA: + callback(op_id, res) + if res['type'] == GQL_ERROR: + callback(op_id, res) + if res['type'] == GQL_CONNECTION_ERROR: + print(f'unexpected connection error {res["payload"]}') + break + + def stop_subscription(self, op_id): + # print('', op_id) + # logging.debug(f'Operation Map (before stopping sub): {self._operation_map}') + self._operation_map[op_id]['running'] = False + # logging.debug(f'Operation Map (after altering): {self._operation_map}') + self._send_msg(GQL_STOP, operation_id=op_id) + self._operation_map[op_id]['thread_id'].join(10) + del(self._operation_map[op_id]) + # logging.debug(f'Operation Map (after stopping sub): {self._operation_map}') + + def _default_callback(self, sub_id, data): + print(f' Inside default callback: SubID: {sub_id}. Data: {data}') + + def stop_all_operations(self): + self.client.close() + +# generate random alphanumeric id +def gen_id(size=6, chars=string.ascii_letters + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) diff --git a/setup.py b/setup.py index 9d55504..85d4002 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,15 @@ # -*- coding: utf-8 -*- from setuptools import find_packages, setup - -__version__ = "0.1.1-beta.1" +__version__ = "0.2.0-beta.1" __desc__ = "A dead-simple graphql client that supports subscriptions over websockets." with open('README.md') as readme_file: readme = readme_file.read() requirements = [ - 'websocket-client==0.54.0' + 'websocket-client==0.54.0', + 'requests' ] test_requirements = [] @@ -37,7 +37,7 @@ 'License :: OSI Approved :: BSD License', 'Natural Language :: English', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Environment :: Console', 'Environment :: Web Environment', 'Environment :: Other Environment',