From 7941f4c5898f1e48ecf253372b6b60005b75846a Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Thu, 19 May 2022 18:13:02 +0200 Subject: [PATCH 01/14] Improve resilience and performance of do_bulk_inference In case of errors, the `InferenceClient.do_bulk_inference` method will now return `None` for the affected objects instead of aborting the entire bulk inference operation (and discarding any successfully processed objects). Fixes issue #68 The fix for #68 is different than what is described in #68. Instead of using a generator based approach which will require the SDK consumer to implement the error handling themselves, the SDK itself now handles the errors. The downside of not using a generator is a larger memory footprint to accumulate the results in a list. As an alternative, we can consider using a generator to either yield the successfully processed inference results or the list containing `None`. This approach will save memory. Additionally, this commit introduces parallel processing in `InferenceClient.do_bulk_inference`. This will greatly improve performance. Due to the non-lazy implementation of `ThreadPoolProcessor.map`, this increases memory usage slightly ([cpython issue #74028]) [cpython issue #74028]: https://github.com/python/cpython/issues/74028 --- .pre-commit-config.yaml | 2 +- sap/aibus/dar/client/inference_client.py | 46 +++++++++++++------ tests/sap/aibus/dar/client/test_exceptions.py | 29 +++++++----- .../aibus/dar/client/test_inference_client.py | 38 +++++++++++++++ 4 files changed, 89 insertions(+), 26 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a00f8a7..b68f51d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ default_language_version: python: python3.7 repos: - repo: https://github.com/ambv/black - rev: 21.6b0 + rev: 22.3.0 hooks: - id: black language_version: python3.7 diff --git a/sap/aibus/dar/client/inference_client.py b/sap/aibus/dar/client/inference_client.py index 23c9c93..fc738ec 100644 --- a/sap/aibus/dar/client/inference_client.py +++ b/sap/aibus/dar/client/inference_client.py @@ -1,9 +1,13 @@ """ Client API for the Inference microservice. """ -from typing import List +from concurrent.futures import ThreadPoolExecutor +from typing import List, Union + +from requests import RequestException from sap.aibus.dar.client.base_client import BaseClientWithSession +from sap.aibus.dar.client.exceptions import DARHTTPException from sap.aibus.dar.client.inference_constants import InferencePaths from sap.aibus.dar.client.util.lists import split_list @@ -73,7 +77,7 @@ def do_bulk_inference( objects: List[dict], top_n: int = TOP_N, retry: bool = True, - ) -> List[dict]: + ) -> List[Union[dict, None]]: """ Performs bulk inference for larger collections. @@ -88,15 +92,11 @@ def do_bulk_inference( This method calls the inference endpoint multiple times to process all data. For non-trial service instances, each call will incur a cost. - If one of the calls fails, this method will raise an Exception and the - progress will be lost. In this case, all calls until the Exception happened - will be charged. - To reduce the likelihood of a failed request terminating the bulk inference process, this method will retry failed requests. There is a small chance that even retried requests will be charged, e.g. - if a problem occurs with the request on the client side outside of the + if a problem occurs with the request on the client side outside the control of the service and after the service has processed the request. To disable `retry` behavior simply pass `retry=False` to the method. @@ -114,10 +114,30 @@ def do_bulk_inference( :param retry: whether to retry on errors. Default: True :return: the aggregated ObjectPrediction dictionaries """ - result = [] # type: List[dict] - for work_package in split_list(objects, LIMIT_OBJECTS_PER_CALL): - response = self.create_inference_request( - model_name, work_package, top_n=top_n, retry=retry + + def predict_call(work_package): + try: + response = self.create_inference_request( + model_name, work_package, top_n=top_n, retry=retry + ) + return response["predictions"] + except (DARHTTPException, RequestException) as exc: + self.log.warning( + "Caught %s during bulk inference. " + "Setting results to None for this batch!", + exc, + exc_info=True, + ) + return [None for _ in range(len(work_package))] + + results = [] + + with ThreadPoolExecutor(max_workers=4) as pool: + results_iterator = pool.map( + predict_call, split_list(objects, LIMIT_OBJECTS_PER_CALL) ) - result.extend(response["predictions"]) - return result + + for predictions in results_iterator: + results.extend(predictions) + + return results diff --git a/tests/sap/aibus/dar/client/test_exceptions.py b/tests/sap/aibus/dar/client/test_exceptions.py index 4d0e81c..5f054ef 100644 --- a/tests/sap/aibus/dar/client/test_exceptions.py +++ b/tests/sap/aibus/dar/client/test_exceptions.py @@ -8,22 +8,28 @@ url = "http://localhost:4321/test/" +correlation_id = "412d84ae-0eb5-4421-863d-956570c2da54" +vcap_request_id = "d9cd7dec-4d74-4a7a-a953-4ca583c8d912" + + +def create_mock_response_404(): + mock_response = create_mock_response() + + mock_response.headers["X-Correlation-ID"] = correlation_id + mock_response.headers["X-Vcap-Request-Id"] = vcap_request_id + mock_response.headers["Server"] = "Gunicorn" + mock_response.headers["X-Cf-Routererror"] = "unknown_route" + mock_response.status_code = 404 + mock_response.request.method = "GET" + mock_response.reason = b"\xc4\xd6\xdc Not Found" + return mock_response + class TestDARHTTPException: url = "http://localhost:4321/test/" def test_basic(self): - mock_response = create_mock_response() - - correlation_id = "412d84ae-0eb5-4421-863d-956570c2da54" - mock_response.headers["X-Correlation-ID"] = correlation_id - vcap_request_id = "d9cd7dec-4d74-4a7a-a953-4ca583c8d912" - mock_response.headers["X-Vcap-Request-Id"] = vcap_request_id - mock_response.headers["Server"] = "Gunicorn" - mock_response.headers["X-Cf-Routererror"] = "unknown_route" - mock_response.status_code = 404 - mock_response.request.method = "GET" - mock_response.reason = b"\xc4\xd6\xdc Not Found" + mock_response = create_mock_response_404() exception = DARHTTPException.create_from_response(url, mock_response) @@ -130,7 +136,6 @@ class TestDARHTTPExceptionReason: # status line: https://tools.ietf.org/html/rfc7230#section-3.1.2 def test_reason_works_iso8859_1(self): - mock_response = create_mock_response() # ÄÖÜ encoded as ISO-8859-1 mock_response.reason = b"\xc4\xd6\xdc" diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index 49e9398..a8b0ef0 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -7,12 +7,15 @@ from unittest.mock import call import pytest +from requests import RequestException, Timeout +from sap.aibus.dar.client.exceptions import DARHTTPException from sap.aibus.dar.client.inference_client import InferenceClient from tests.sap.aibus.dar.client.test_data_manager_client import ( AbstractDARClientConstruction, prepare_client, ) +from tests.sap.aibus.dar.client.test_exceptions import create_mock_response_404 class TestInferenceClientConstruction(AbstractDARClientConstruction): @@ -203,3 +206,38 @@ def _assert_bulk_inference_works( inference_client.session.post_to_endpoint.call_args_list == expected_calls_to_post ) + + def test_bulk_inference_error(self, inference_client: InferenceClient): + """ + Tests if do_bulk_inference method will recover from errors. + """ + + response_404 = create_mock_response_404() + url = "http://localhost:4321/test/" + + exception_404 = DARHTTPException.create_from_response(url, response_404) + + exceptions = [exception_404, RequestException, Timeout] + # Try different exceptions + for exc in exceptions: + inference_client.session.post_to_endpoint.return_value.json.side_effect = [ + self.inference_response(50), + exc, + self.inference_response(40), + ] + + many_objects = [self.objects[0] for _ in range(50 + 50 + 40)] + assert len(many_objects) == 50 + 50 + 40 + + response = inference_client.do_bulk_inference( + model_name="test-model", + objects=many_objects, + top_n=4, + ) + + expected_response = [] + expected_response.extend(self.inference_response(50)["predictions"]) + expected_response.extend(None for _ in range(50)) + expected_response.extend(self.inference_response(40)["predictions"]) + + assert response == expected_response From b2934aa1e3e8b3c596738ee6ca8bb99ca49ed3fd Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Mon, 23 May 2022 18:45:31 +0200 Subject: [PATCH 02/14] fix: bulk inference error response should include object_id This also contains a documentation formatting fix. --- sap/aibus/dar/client/inference_client.py | 56 ++++++++++- sap/aibus/dar/client/model_manager_client.py | 2 +- .../aibus/dar/client/test_inference_client.py | 98 +++++++++++++++---- 3 files changed, 130 insertions(+), 26 deletions(-) diff --git a/sap/aibus/dar/client/inference_client.py b/sap/aibus/dar/client/inference_client.py index f8087c5..ce3f359 100644 --- a/sap/aibus/dar/client/inference_client.py +++ b/sap/aibus/dar/client/inference_client.py @@ -84,16 +84,42 @@ def do_bulk_inference( For *objects* collections larger than *LIMIT_OBJECTS_PER_CALL*, splits the data into several smaller Inference requests. + Requests are executed in parallel. + Returns the aggregated values of the *predictions* of the original API response - as returned by :meth:`create_inference_request`. + as returned by :meth:`create_inference_request`. If one of the inference + requests to the service fails, an artificial prediction object is inserted with + the `labels` key set to `None` for each of the objects in the failing request. + + Example of a prediction object which indicates an error: + + .. code-block:: python + + { + 'objectId': 'b5cbcb34-7ab9-4da5-b7ec-654c90757eb9', + 'labels': None, + '_sdk_error': 'RequestException: Request Error' + } + + In case the `objects` passed to this method do not contain the `objectId` field, + the value is set to `None` in the error prediction object: + + .. code-block:: python + + { + 'objectId': None, + 'labels': None, + '_sdk_error': 'RequestException: Request Error' + } + .. note:: This method calls the inference endpoint multiple times to process all data. For non-trial service instances, each call will incur a cost. - To reduce the likelihood of a failed request terminating the bulk inference - process, this method will retry failed requests. + To reduce the impact of a failed request, this method will retry failed + requests. There is a small chance that even retried requests will be charged, e.g. if a problem occurs with the request on the client side outside the @@ -107,6 +133,19 @@ def do_bulk_inference( The default for the `retry` parameter changed from `retry=False` to `retry=True` for increased reliability in day-to-day operations. + .. versionchanged:: 0.12.0 + Requests are now executed in parallel with up to four threads. + + Errors are now handled in this method instead of raising an exception and + discarding inference results from previous requests. For objects where the + inference request did not succeed, a replacement `dict` object is placed in + the returned `list`. + This `dict` follows the format of the `ObjectPrediction` object sent by the + service. To indicate that this is a client-side generated placeholder, the + `labels` key for all ObjectPrediction dicts of the failed inference request + has value `None`. + A `_sdk_error` key is added with the Exception details. + :param model_name: name of the model used for inference :param objects: Objects to be classified @@ -128,7 +167,16 @@ def predict_call(work_package): exc, exc_info=True, ) - return [None for _ in range(len(work_package))] + + prediction_error = [ + { + "objectId": inference_object.get("objectId", None), + "labels": None, + "_sdk_error": f"{exc.__class__.__name__}: {exc}", + } + for inference_object in work_package + ] + return prediction_error results = [] diff --git a/sap/aibus/dar/client/model_manager_client.py b/sap/aibus/dar/client/model_manager_client.py index ca94e38..fd7113c 100644 --- a/sap/aibus/dar/client/model_manager_client.py +++ b/sap/aibus/dar/client/model_manager_client.py @@ -174,7 +174,7 @@ def create_job( :param model_template_id: Model template ID for training :param business_blueprint_id: Business Blueprint template ID for training :raises CreateTrainingJobFailed: When business_blueprint_id - and model_template_id are provided or when both are not provided + and model_template_id are provided or when both are not provided :return: newly created Job as dict """ self.log.info( diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index 0df1b4d..8ebef37 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -2,8 +2,7 @@ # The pragma above causes mypy to ignore this file: # mypy cannot deal with some of the monkey-patching we do below. # https://github.com/python/mypy/issues/2427 - - +from typing import Optional from unittest.mock import call import pytest @@ -31,20 +30,24 @@ def inference_client(): class TestInferenceClient: - @property - def objects(self): + def objects( + self, object_id: Optional[str] = "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9" + ): """ Returns sample Objects used as classification inputs. """ return [ { - "objectId": "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9", + "objectId": object_id, "features": [{"name": "manufacturer", "value": "ACME"}], } ] @staticmethod - def inference_response(prediction_count): + def inference_response( + prediction_count, + object_id: Optional[str] = "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9", + ): """ Returns a sample InferenceResponseSchema with the given number of predictions. @@ -55,7 +58,7 @@ def inference_response(prediction_count): "processedTime": "2018-08-31T11:45:54.727934+00:00", "predictions": [ { - "objectId": "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9", + "objectId": object_id, "labels": [{"name": "category", "value": "ANVIL"}], } for _ in range(prediction_count) @@ -67,12 +70,12 @@ def test_create_inference_request(self, inference_client: InferenceClient): Checks inference call. """ response = inference_client.create_inference_request( - "my-model", objects=self.objects + "my-model", objects=self.objects() ) expected_call = call( "/inference/api/v3/models/my-model/versions/1", - payload={"topN": 1, "objects": self.objects}, + payload={"topN": 1, "objects": self.objects()}, retry=False, ) @@ -92,11 +95,11 @@ def test_create_inference_request_with_top_n( Checks if top_n parameter is passed correctly. """ response = inference_client.create_inference_request( - "a-test-model", objects=self.objects, top_n=99, retry=False + "a-test-model", objects=self.objects(), top_n=99, retry=False ) expected_call = call( "/inference/api/v3/models/a-test-model/versions/1", - payload={"topN": 99, "objects": self.objects}, + payload={"topN": 99, "objects": self.objects()}, retry=False, ) @@ -116,12 +119,12 @@ def test_create_inference_request_with_retry_enabled( Checks if retry parameter is passsed correctly. """ response = inference_client.create_inference_request( - "my-model", objects=self.objects, retry=True + "my-model", objects=self.objects(), retry=True ) expected_call = call( "/inference/api/v3/models/my-model/versions/1", - payload={"topN": 1, "objects": self.objects}, + payload={"topN": 1, "objects": self.objects()}, retry=True, ) @@ -162,7 +165,7 @@ def _assert_bulk_inference_works( # passed to InferenceClient.do_bulk_inference - the default is assumed to be # False and the internal calls to Inference.create_inference_request will # be checked for this. - many_objects = [self.objects[0] for _ in range(75)] + many_objects = [self.objects()[0] for _ in range(75)] assert len(many_objects) == 75 # On first call, return response with 50 predictions. On second call, @@ -213,12 +216,12 @@ def test_create_inference_with_url_works(self, inference_client: InferenceClient """ url = DAR_URL + "inference/api/v3/models/my-model/versions/1" response = inference_client.create_inference_request_with_url( - url, objects=self.objects + url, objects=self.objects() ) expected_call = call( url, - payload={"topN": 1, "objects": self.objects}, + payload={"topN": 1, "objects": self.objects()}, retry=False, ) @@ -238,12 +241,12 @@ def test_create_inference_request_with_url_retry_enabled( url = DAR_URL + "inference/api/v3/models/my-model/versions/1" response = inference_client.create_inference_request_with_url( - url=url, objects=self.objects, retry=True + url=url, objects=self.objects(), retry=True ) expected_call = call( url, - payload={"topN": 1, "objects": self.objects}, + payload={"topN": 1, "objects": self.objects()}, retry=True, ) @@ -264,7 +267,11 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): exception_404 = DARHTTPException.create_from_response(url, response_404) - exceptions = [exception_404, RequestException, Timeout] + exceptions = [ + exception_404, + RequestException("Request Error"), + Timeout("Timeout"), + ] # Try different exceptions for exc in exceptions: inference_client.session.post_to_endpoint.return_value.json.side_effect = [ @@ -273,7 +280,7 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): self.inference_response(40), ] - many_objects = [self.objects[0] for _ in range(50 + 50 + 40)] + many_objects = [self.objects()[0] for _ in range(50 + 50 + 40)] assert len(many_objects) == 50 + 50 + 40 response = inference_client.do_bulk_inference( @@ -282,9 +289,58 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): top_n=4, ) + expected_error_response = { + "objectId": "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9", + "labels": None, + # If this test fails, I found it can make pytest/PyCharm hang because it + # takes too much time in difflib. + "_sdk_error": f"{exc.__class__.__name__}: {exc}", + } + expected_response = [] expected_response.extend(self.inference_response(50)["predictions"]) - expected_response.extend(None for _ in range(50)) + expected_response.extend(expected_error_response for _ in range(50)) expected_response.extend(self.inference_response(40)["predictions"]) assert response == expected_response + + def test_bulk_inference_error_no_object_ids( + self, inference_client: InferenceClient + ): + response_404 = create_mock_response_404() + url = "http://localhost:4321/test/" + + exception_404 = DARHTTPException.create_from_response(url, response_404) + + inference_client.session.post_to_endpoint.return_value.json.side_effect = [ + self.inference_response(50, object_id=None), + exception_404, + self.inference_response(22, object_id=None), + ] + + inference_objects = [ + self.objects(object_id=None)[0] for _ in range(50 + 50 + 22) + ] + + response = inference_client.do_bulk_inference( + model_name="test-model", + objects=inference_objects, + top_n=4, + ) + expected_error_response = { + "objectId": None, + "labels": None, + # If this test fails, I found it can make pytest/PyCharm hang because it + # takes too much time in difflib. + "_sdk_error": f"{exception_404.__class__.__name__}: {exception_404}", + } + expected_response = [] + expected_response.extend( + self.inference_response(50, object_id=None)["predictions"] + ) + expected_response.extend(expected_error_response for _ in range(50)) + expected_response.extend( + self.inference_response(22, object_id=None)["predictions"] + ) + + assert response == expected_response From 8d2096426c0e143001f53dad46b75fe91a06975c Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Mon, 23 May 2022 18:53:40 +0200 Subject: [PATCH 03/14] chore: update CHANGELOG.md --- CHANGELOG.md | 16 ++++++++++++++++ version.txt | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a07f582..46e63c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.12.0] + +### Changed + +* `InferenceClient.do_bulk_inference` is now faster due to processing requests in parallel [#128] +* `InferenceClient.do_bulk_inference` is more resilient and handles errors internally. Instead of + raising an Exception if the inference request to the service fails, the `do_bulk_inference` method + will place a special error response object in the returned list. This can be considered a breaking API change, + because the special error response object will have a value of `None` for the `labels` key. + As this project is still versioned below 1.0.0, the breaking API change does not warrant a major version update. + See [#128] for details. + +[#128]: https://github.com/SAP/data-attribute-recommendation-python-sdk/pull/128 + ## [0.11.0] +### Added + * Support for reading training jobs using model name in `read_job_by_model_name` [#124]: https://github.com/SAP/data-attribute-recommendation-python-sdk/pull/124 diff --git a/version.txt b/version.txt index d9df1bb..ac454c6 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.11.0 +0.12.0 From 9ea705e7c258064f39c9939f412f932c7295b169 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Mon, 23 May 2022 18:56:48 +0200 Subject: [PATCH 04/14] chore: fix Python 3.5 compat --- sap/aibus/dar/client/inference_client.py | 2 +- tests/sap/aibus/dar/client/test_inference_client.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sap/aibus/dar/client/inference_client.py b/sap/aibus/dar/client/inference_client.py index ce3f359..2773718 100644 --- a/sap/aibus/dar/client/inference_client.py +++ b/sap/aibus/dar/client/inference_client.py @@ -172,7 +172,7 @@ def predict_call(work_package): { "objectId": inference_object.get("objectId", None), "labels": None, - "_sdk_error": f"{exc.__class__.__name__}: {exc}", + "_sdk_error": "{}: {}".format(exc.__class__.__name__, str(exc)), } for inference_object in work_package ] diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index 8ebef37..5e5f9ea 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -294,7 +294,7 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): "labels": None, # If this test fails, I found it can make pytest/PyCharm hang because it # takes too much time in difflib. - "_sdk_error": f"{exc.__class__.__name__}: {exc}", + "_sdk_error": "{}: {}".format(exc.__class__.__name__, str(exc)), } expected_response = [] @@ -332,7 +332,9 @@ def test_bulk_inference_error_no_object_ids( "labels": None, # If this test fails, I found it can make pytest/PyCharm hang because it # takes too much time in difflib. - "_sdk_error": f"{exception_404.__class__.__name__}: {exception_404}", + "_sdk_error": "{}: {}".format( + exception_404.__class__.__name__, str(exception_404) + ), } expected_response = [] expected_response.extend( From 4372b8f2dba93df8258b6b1031ee2f66e2fa3f5b Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Tue, 24 May 2022 11:17:52 +0200 Subject: [PATCH 05/14] chore: attempt to fix build error on Python 3.7 --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 576459c..5df0e1b 100644 --- a/tox.ini +++ b/tox.ini @@ -11,6 +11,8 @@ passenv = DAR_* TRAVIS TRAVIS_* COVERALLS_* deps = zipp<2.0.0 # for python 3.5 support + # https://stackoverflow.com/questions/59549322/importing-pytest-fails-with-attributeerror-str-object-has-no-attribute-patt + pluggy==1.0.0 # pytest dependency, specify manually to fix build error pytest==6.1.2 # latest supporting Python 3.5 pytest-cov==2.12.1 httpretty==1.1.4 From 67be9ce61a5e9cf012e631a59592da54ef5d142e Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Wed, 15 Jun 2022 18:30:56 +0200 Subject: [PATCH 06/14] Revert "chore: attempt to fix build error on Python 3.7" This reverts commit 4372b8f2dba93df8258b6b1031ee2f66e2fa3f5b. --- tox.ini | 2 -- 1 file changed, 2 deletions(-) diff --git a/tox.ini b/tox.ini index 5df0e1b..576459c 100644 --- a/tox.ini +++ b/tox.ini @@ -11,8 +11,6 @@ passenv = DAR_* TRAVIS TRAVIS_* COVERALLS_* deps = zipp<2.0.0 # for python 3.5 support - # https://stackoverflow.com/questions/59549322/importing-pytest-fails-with-attributeerror-str-object-has-no-attribute-patt - pluggy==1.0.0 # pytest dependency, specify manually to fix build error pytest==6.1.2 # latest supporting Python 3.5 pytest-cov==2.12.1 httpretty==1.1.4 From 3c60ddeb8368a4d613047d6bd37fe30c24c24f87 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Wed, 15 Jun 2022 18:37:21 +0200 Subject: [PATCH 07/14] chore: Fix build for Python 3.7 --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 576459c..285e8e6 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,8 @@ setenv = COVERAGE_FILE = test_results/{envname}/unit_coverage/.coverage passenv = DAR_* TRAVIS TRAVIS_* COVERALLS_* deps = + # Fix build on 3.7: https://github.com/pypa/setuptools/issues/3293 + py37: setuptools==60.9.0 zipp<2.0.0 # for python 3.5 support pytest==6.1.2 # latest supporting Python 3.5 pytest-cov==2.12.1 From 136592ed84fe1f7e4da6bcb698b42adfa5192077 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 08:50:28 +0200 Subject: [PATCH 08/14] Fix build on Python 3.7, second attempt setuptools must be installed much earlier in the process. --- .travis.yml | 2 ++ tox.ini | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index c60b84e..c8adc7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -43,6 +43,8 @@ stages: language: python install: + # Fix build on 3.7: https://github.com/pypa/setuptools/issues/3293 +- pip3 install setuptools==60.9.0 - pip3 install -r requirements-test.txt script: tox diff --git a/tox.ini b/tox.ini index 285e8e6..576459c 100644 --- a/tox.ini +++ b/tox.ini @@ -10,8 +10,6 @@ setenv = COVERAGE_FILE = test_results/{envname}/unit_coverage/.coverage passenv = DAR_* TRAVIS TRAVIS_* COVERALLS_* deps = - # Fix build on 3.7: https://github.com/pypa/setuptools/issues/3293 - py37: setuptools==60.9.0 zipp<2.0.0 # for python 3.5 support pytest==6.1.2 # latest supporting Python 3.5 pytest-cov==2.12.1 From 456ca8d90ea2b28ea2783f4c4c79cf38f959edba Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 09:07:27 +0200 Subject: [PATCH 09/14] Only install setuptools on Python 3.7 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index c8adc7f..1139e35 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,7 @@ stages: language: python install: # Fix build on 3.7: https://github.com/pypa/setuptools/issues/3293 -- pip3 install setuptools==60.9.0 +- pip3 install 'setuptools==60.9.0;python_version==3.7' - pip3 install -r requirements-test.txt script: tox From 3d0432c397f8091401217de213ae0188ba62b6d9 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 09:14:43 +0200 Subject: [PATCH 10/14] Fix syntax --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1139e35..13a52d5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,7 @@ stages: language: python install: # Fix build on 3.7: https://github.com/pypa/setuptools/issues/3293 -- pip3 install 'setuptools==60.9.0;python_version==3.7' +- pip3 install 'setuptools==60.9.0;python_version=="3.7"' - pip3 install -r requirements-test.txt script: tox From e9403e739a2df46c8e60462b78ece9ed5e14a0d4 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 09:31:22 +0200 Subject: [PATCH 11/14] chore: make pytest output verbose --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 576459c..7d7d920 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,7 @@ commands = -o junit_suite_name={envname} \ -o console_output_style=classic \ -o junit_family=xunit2 \ + -vv \ tests/ \ sap/ cov: coveralls From accc46bb75f0f262dd5c98bf55968de9814dd06c Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 12:04:47 +0200 Subject: [PATCH 12/14] bulk_inference: fix tests on pypy The bulk_inference code is now multithreaded. For this reason, the trick of returning different values in the Mock based on the order of the calls no longer works. This was somewhat accidentally working on CPython, but not on pypy. --- .../aibus/dar/client/test_inference_client.py | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index 5e5f9ea..b6be6d5 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -3,7 +3,7 @@ # mypy cannot deal with some of the monkey-patching we do below. # https://github.com/python/mypy/issues/2427 from typing import Optional -from unittest.mock import call +from unittest.mock import call, Mock import pytest from requests import RequestException, Timeout @@ -267,20 +267,41 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): exception_404 = DARHTTPException.create_from_response(url, response_404) + # The old trick to return different values in a Mock based on the call order + # does not work here because the code is concurrent. Instead, we use a different + # objectId for those objects where we want the request to fail + def make_mock_post(exc): + def post_to_endpoint(*args, **kwargs): + payload = kwargs.pop("payload") + object_id = payload["objects"][0]["objectId"] + if object_id == "expected-to-fail": + raise exc + elif object_id == "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9": + response = Mock() + response.json.return_value = self.inference_response( + len(payload["objects"]) + ) + return response + else: + raise ValueError(f"objectId '{object_id}' not handled in test.") + + return post_to_endpoint + + # Try different exceptions exceptions = [ exception_404, RequestException("Request Error"), Timeout("Timeout"), ] - # Try different exceptions for exc in exceptions: - inference_client.session.post_to_endpoint.return_value.json.side_effect = [ - self.inference_response(50), - exc, - self.inference_response(40), - ] + inference_client.session.post_to_endpoint.side_effect = make_mock_post(exc) - many_objects = [self.objects()[0] for _ in range(50 + 50 + 40)] + many_objects = [] + many_objects.extend([self.objects()[0] for _ in range(50)]) + many_objects.extend( + [self.objects(object_id="expected-to-fail")[0] for _ in range(50)] + ) + many_objects.extend([self.objects()[0] for _ in range(40)]) assert len(many_objects) == 50 + 50 + 40 response = inference_client.do_bulk_inference( @@ -290,7 +311,7 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): ) expected_error_response = { - "objectId": "b5cbcb34-7ab9-4da5-b7ec-654c90757eb9", + "objectId": "expected-to-fail", "labels": None, # If this test fails, I found it can make pytest/PyCharm hang because it # takes too much time in difflib. @@ -302,6 +323,7 @@ def test_bulk_inference_error(self, inference_client: InferenceClient): expected_response.extend(expected_error_response for _ in range(50)) expected_response.extend(self.inference_response(40)["predictions"]) + assert len(response) == len(expected_response) assert response == expected_response def test_bulk_inference_error_no_object_ids( From f0d4169a1203d2d7d80413fcc0f1812b886a72b8 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 15:00:18 +0200 Subject: [PATCH 13/14] fix: Python 3.5 has no f strings --- tests/sap/aibus/dar/client/test_inference_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index b6be6d5..cfd47a6 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -283,7 +283,7 @@ def post_to_endpoint(*args, **kwargs): ) return response else: - raise ValueError(f"objectId '{object_id}' not handled in test.") + raise ValueError("objectId '%s' not handled in test." % object_id) return post_to_endpoint From 5bbf4a27c42bdd81f9fd62d7e511c01bca99fcb4 Mon Sep 17 00:00:00 2001 From: Michael Haas Date: Fri, 17 Jun 2022 17:34:30 +0200 Subject: [PATCH 14/14] do_bulk_inference: add worker_count parameter This is mainly useful to fix the tests which rely on the mocks being called in a certain order. One of the tests supports concurrency by mocking in a better way, but this was not feasible for the other tests. This commit also updates the documentation build tools to the latest version to fix the documentation build on my local machine. --- docs/requirements.txt | 4 +- sap/aibus/dar/client/exceptions.py | 8 ++++ sap/aibus/dar/client/inference_client.py | 25 +++++++++++- system_tests/workflow/test_end_to_end.py | 7 ++++ tests/sap/aibus/dar/client/test_exceptions.py | 5 ++- .../aibus/dar/client/test_inference_client.py | 39 ++++++++++++++++++- 6 files changed, 81 insertions(+), 7 deletions(-) diff --git a/docs/requirements.txt b/docs/requirements.txt index 05b0310..4fdec42 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,3 @@ # docs -sphinx==2.4.1 -sphinx_rtd_theme==0.5.0 +sphinx==5.0.2 +sphinx_rtd_theme==1.0.0 diff --git a/sap/aibus/dar/client/exceptions.py b/sap/aibus/dar/client/exceptions.py index 8541efb..ccf6b47 100644 --- a/sap/aibus/dar/client/exceptions.py +++ b/sap/aibus/dar/client/exceptions.py @@ -123,6 +123,14 @@ class JobNotFound(DARException): pass +class InvalidWorkerCount(DARException): + """ + Invalid worker_count parameter is specified. + + .. versionadded:: 0.12.0 + """ + + class ModelAlreadyExists(DARException): """ Model already exists and must be deleted first. diff --git a/sap/aibus/dar/client/inference_client.py b/sap/aibus/dar/client/inference_client.py index 2773718..3f8c218 100644 --- a/sap/aibus/dar/client/inference_client.py +++ b/sap/aibus/dar/client/inference_client.py @@ -7,7 +7,7 @@ from requests import RequestException from sap.aibus.dar.client.base_client import BaseClientWithSession -from sap.aibus.dar.client.exceptions import DARHTTPException +from sap.aibus.dar.client.exceptions import DARHTTPException, InvalidWorkerCount from sap.aibus.dar.client.inference_constants import InferencePaths from sap.aibus.dar.client.util.lists import split_list @@ -17,6 +17,8 @@ #: How many labels to predict for a single object by default TOP_N = 1 +# pylint: disable=too-many-arguments + class InferenceClient(BaseClientWithSession): """ @@ -77,6 +79,7 @@ def do_bulk_inference( objects: List[dict], top_n: int = TOP_N, retry: bool = True, + worker_count: int = 4, ) -> List[Union[dict, None]]: """ Performs bulk inference for larger collections. @@ -146,14 +149,32 @@ def do_bulk_inference( has value `None`. A `_sdk_error` key is added with the Exception details. + .. versionadded:: 0.12.0 + The `worker_count` parameter allows to fine-tune the number of concurrent + request threads. Set `worker_count` to `1` to disable concurrent execution of + requests. + :param model_name: name of the model used for inference :param objects: Objects to be classified :param top_n: How many predictions to return per object :param retry: whether to retry on errors. Default: True + :param worker_count: maximum number of concurrent requests + :raises: InvalidWorkerCount if worker_count param is incorrect :return: the aggregated ObjectPrediction dictionaries """ + if worker_count is None: + raise InvalidWorkerCount("worker_count cannot be None!") + + if worker_count > 4: + msg = "worker_count too high: %s. Up to 4 allowed." % worker_count + raise InvalidWorkerCount(msg) + + if worker_count <= 0: + msg = "worker_count must be greater than 0!" + raise InvalidWorkerCount(msg) + def predict_call(work_package): try: response = self.create_inference_request( @@ -180,7 +201,7 @@ def predict_call(work_package): results = [] - with ThreadPoolExecutor(max_workers=4) as pool: + with ThreadPoolExecutor(max_workers=worker_count) as pool: results_iterator = pool.map( predict_call, split_list(objects, LIMIT_OBJECTS_PER_CALL) ) diff --git a/system_tests/workflow/test_end_to_end.py b/system_tests/workflow/test_end_to_end.py index e8b3d39..28829b4 100644 --- a/system_tests/workflow/test_end_to_end.py +++ b/system_tests/workflow/test_end_to_end.py @@ -249,12 +249,19 @@ def _assert_inference_works(self, inference_client, model_name): # One object has been classified assert len(response["predictions"]) == 1 + # do_bulk_inference with concurrency big_to_be_classified = [to_be_classified[0] for _ in range(123)] response = inference_client.do_bulk_inference( model_name=model_name, objects=big_to_be_classified ) assert len(response) == 123 + # do_bulk_inference without concurrency + response = inference_client.do_bulk_inference( + model_name=model_name, objects=big_to_be_classified, worker_count=1 + ) + assert len(response) == 123 + url = os.environ["DAR_URL"] if url[-1] == "/": url = url[:-1] diff --git a/tests/sap/aibus/dar/client/test_exceptions.py b/tests/sap/aibus/dar/client/test_exceptions.py index 5f054ef..7bf17a8 100644 --- a/tests/sap/aibus/dar/client/test_exceptions.py +++ b/tests/sap/aibus/dar/client/test_exceptions.py @@ -1,7 +1,10 @@ import datetime from unittest.mock import PropertyMock -from sap.aibus.dar.client.exceptions import DARHTTPException, ModelAlreadyExists +from sap.aibus.dar.client.exceptions import ( + DARHTTPException, + ModelAlreadyExists, +) from tests.sap.aibus.dar.client.test_dar_session import create_mock_response # TODO: test __str__ diff --git a/tests/sap/aibus/dar/client/test_inference_client.py b/tests/sap/aibus/dar/client/test_inference_client.py index cfd47a6..1577b6e 100644 --- a/tests/sap/aibus/dar/client/test_inference_client.py +++ b/tests/sap/aibus/dar/client/test_inference_client.py @@ -8,7 +8,7 @@ import pytest from requests import RequestException, Timeout -from sap.aibus.dar.client.exceptions import DARHTTPException +from sap.aibus.dar.client.exceptions import DARHTTPException, InvalidWorkerCount from sap.aibus.dar.client.inference_client import InferenceClient from tests.sap.aibus.dar.client.test_data_manager_client import ( AbstractDARClientConstruction, @@ -180,7 +180,11 @@ def _assert_bulk_inference_works( retry_kwarg["retry"] = retry_flag response = inference_client.do_bulk_inference( - model_name="test-model", objects=many_objects, top_n=4, **retry_kwarg + model_name="test-model", + objects=many_objects, + top_n=4, + worker_count=1, # Disable concurrency to make tests deterministic. + **retry_kwarg, ) # The return value is the concatenation of all 'predictions' of the individual @@ -348,6 +352,7 @@ def test_bulk_inference_error_no_object_ids( model_name="test-model", objects=inference_objects, top_n=4, + worker_count=1, # disable concurrency to make tests deterministic ) expected_error_response = { "objectId": None, @@ -368,3 +373,33 @@ def test_bulk_inference_error_no_object_ids( ) assert response == expected_response + + def test_worker_count_validation(self, inference_client: InferenceClient): + + many_objects = [self.objects()[0] for _ in range(75)] + + with pytest.raises(InvalidWorkerCount) as context: + inference_client.do_bulk_inference( + model_name="test-model", objects=many_objects, worker_count=5 + ) + assert "worker_count too high: 5. Up to 4 allowed." in str(context.value) + + with pytest.raises(InvalidWorkerCount) as context: + inference_client.do_bulk_inference( + model_name="test-model", objects=many_objects, worker_count=0 + ) + assert "worker_count must be greater than 0" in str(context.value) + + with pytest.raises(InvalidWorkerCount) as context: + inference_client.do_bulk_inference( + model_name="test-model", objects=many_objects, worker_count=-1 + ) + assert "worker_count must be greater than 0" in str(context.value) + + with pytest.raises(InvalidWorkerCount) as context: + inference_client.do_bulk_inference( + model_name="test-model", + objects=many_objects, + worker_count=None, + ) + assert "worker_count cannot be None" in str(context.value)