diff --git a/.dockerignore b/.dockerignore index bbe15fd0..222f5482 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,5 +4,3 @@ dist venv*/ .coverage tests/ -testkit/ -testkitbackend/ diff --git a/testkit/.dockerignore b/testkit/.dockerignore index f104652b..ebc6b8d6 100644 --- a/testkit/.dockerignore +++ b/testkit/.dockerignore @@ -1 +1,2 @@ *.py +!backend.py diff --git a/testkit/Dockerfile b/testkit/Dockerfile index e61135da..01bd384d 100644 --- a/testkit/Dockerfile +++ b/testkit/Dockerfile @@ -1,11 +1,12 @@ -FROM ubuntu:20.04 +ARG TIME_WARP="" +FROM ubuntu:20.04 AS base ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ apt-get install -y locales && \ apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \ - localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 \ - && rm -rf /var/lib/apt/lists/* + localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8 && \ + rm -rf /var/lib/apt/lists/* ENV LANG=en_US.UTF-8 # Using apt-get update alone in a RUN statement causes caching issues and subsequent apt-get install instructions fail. @@ -29,21 +30,24 @@ RUN apt-get update && \ ca-certificates && \ apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* -# Install our own CAs on the image. -# Assumes Linux Debian based image. -COPY CAs/* /usr/local/share/ca-certificates/ -# Store custom CAs somewhere where the backend can find them later. -COPY CustomCAs/* /usr/local/share/custom-ca-certificates/ -RUN update-ca-certificates - # Install pyenv RUN git clone https://github.com/pyenv/pyenv.git .pyenv ENV PYENV_ROOT=/.pyenv ENV PATH="$PYENV_ROOT/shims:$PYENV_ROOT/bin:$PATH" +ENV PIP_NO_CACHE_DIR=1 + + +FROM base AS base-py-arg +# Install all supported Python versions +ARG PYTHON_VERSIONS="3.13 3.12 3.11 3.10 3.9 3.8 3.7" + -# Setup python version -ENV PYTHON_VERSIONS="3.13 3.12 3.11 3.10 3.9 3.8 3.7" +FROM base AS base-py-arg-single-python +# Only install Python 3.7 in time warp mode +ARG PYTHON_VERSIONS="3.7" + +FROM base-py-arg${TIME_WARP:+"-single-python"} AS base-py-install RUN for version in $PYTHON_VERSIONS; do \ pyenv install $version; \ done @@ -57,3 +61,34 @@ RUN for version in $PYTHON_VERSIONS; do \ python$version -m pip install -U pip && \ python$version -m pip install -U coverage tox; \ done + + +FROM base-py-install AS backend-timewarp +WORKDIR /home/root/testkit +COPY requirements*.txt . +COPY testkit/backend.py testkit/build.py testkit/_common.py . +RUN sed -i 's|-e \..*$||' requirements*.txt + +ARG TIME_WARP +RUN for version in $PYTHON_VERSIONS; do \ + TEST_BACKEND_VERSION="python$version" python build.py && \ + python$version -m pip install --force-reinstall neo4j==${TIME_WARP}; \ + done +COPY testkitbackend ./testkitbackend +ENTRYPOINT ["python", "backend.py"] + + +FROM base-py-install AS backend +# Install our own CAs on the image. +# Assumes Linux Debian based image. +COPY CAs/* /usr/local/share/ca-certificates/ +# Store custom CAs somewhere where the backend can find them later. +COPY CustomCAs/* /usr/local/share/custom-ca-certificates/ +RUN update-ca-certificates + + +FROM backend${TIME_WARP:+"-timewarp"} AS final +ARG TIME_WARP +ENV DRIVER_TIME_WARP=$TIME_WARP +WORKDIR /home/root/testkit +EXPOSE 9876/tcp diff --git a/testkit/backend.py b/testkit/backend.py index 1bd38768..89dcc069 100644 --- a/testkit/backend.py +++ b/testkit/backend.py @@ -23,4 +23,5 @@ cmd = ["-m", "testkitbackend"] if "TEST_BACKEND_SERVER" in os.environ: cmd.append(os.environ["TEST_BACKEND_SERVER"]) - run_python(cmd) + is_time_warp = bool(os.environ.get("DRIVER_TIME_WARP")) + run_python(cmd, warning_as_error=not is_time_warp) diff --git a/testkitbackend/_async/requests.py b/testkitbackend/_async/requests.py index f08bb30d..b896e854 100644 --- a/testkitbackend/_async/requests.py +++ b/testkitbackend/_async/requests.py @@ -17,26 +17,16 @@ from __future__ import annotations import datetime -import json import re -import ssl import typing as t import warnings -from os import path from freezegun import freeze_time import neo4j import neo4j.api -import neo4j.auth_management from neo4j._async_compat.util import AsyncUtil from neo4j._routing import RoutingTable -from neo4j.auth_management import ( - AsyncAuthManager, - AsyncAuthManagers, - AsyncClientCertificateProvider, - ExpiringAuth, -) from .. import ( fromtestkit, @@ -44,7 +34,26 @@ totestkit, ) from .._warning_check import warnings_check -from ..exceptions import MarkdAsDriverError +from ..exceptions import ( + MarkdAsDriverError, + TimeWarpError, +) +from ..test_config import ( + FEATURES, + SKIPPED_TESTS, +) +from ..time_warp_compat import ( + BMM_SUPPORT, + EXECUTE_QUERY_STABILIZED, + EXECUTE_QUERY_SUPPORT, + LIVENESS_CHECK_SUPPORT, + MTLS_SUPPORT, + NOTIFICATION_WARNINGS_SUPPORTED, + SESSION_AUTH_STABILIZED, + SESSION_AUTH_SUPPORTED, + TELEMETRY_SUPPORT, + VERSION, +) if t.TYPE_CHECKING: @@ -99,20 +108,6 @@ class FrontendError(Exception): pass -def load_config(): - config_path = path.join(path.dirname(__file__), "..", "test_config.json") - with open(config_path, encoding="utf-8") as fd: - config = json.load(fd) - skips = config["skips"] - features = [k for k, v in config["features"].items() if v is True] - if ssl.HAS_TLSv1_3: - features += ["Feature:TLS:1.3"] - return skips, features - - -SKIPPED_TESTS, FEATURES = load_config() - - def _get_skip_reason(test_name): for skip_pattern, reason in SKIPPED_TESTS.items(): if skip_pattern[0] == skip_pattern[-1] == "'": @@ -185,23 +180,33 @@ async def new_driver(backend, data): data["resolverRegistered"], data["domainNameResolverRegistered"], ) - for timeout_testkit, timeout_driver in ( + timeout_configs = [ ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), - ("livenessCheckTimeoutMs", "liveness_check_timeout"), ("maxConnectionLifetimeMs", "max_connection_lifetime"), - ): + ] + if LIVENESS_CHECK_SUPPORT: + timeout_configs.append( + ("livenessCheckTimeoutMs", "liveness_check_timeout"), + ) + elif data.get("livenessCheckTimeoutMs") is not None: + raise TimeWarpError("liveness check") + for timeout_testkit, timeout_driver in timeout_configs: if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 for k in ("sessionConnectionTimeoutMs", "updateRoutingTableTimeoutMs"): if k in data: data.mark_item_as_read_if_equals(k, None) - for conf_name, data_name in ( + copy_configs = [ ("max_connection_pool_size", "maxConnectionPoolSize"), ("fetch_size", "fetchSize"), - ("telemetry_disabled", "telemetryDisabled"), - ): + ] + if TELEMETRY_SUPPORT: + copy_configs.append(("telemetry_disabled", "telemetryDisabled")) + elif data.get("telemetryDisabled") is not None: + raise TimeWarpError("telemetry") + for conf_name, data_name in copy_configs: if data.get(data_name): kwargs[conf_name] = data[data_name] for conf_name, data_name in (("encrypted", "encrypted"),): @@ -220,18 +225,22 @@ async def new_driver(backend, data): kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - expected_warnings.append( - ( - neo4j.PreviewWarning, - r"notification warnings are a preview feature\.", + if NOTIFICATION_WARNINGS_SUPPORTED: + kwargs["warn_notification_severity"] = ( + neo4j.NotificationMinimumSeverity.OFF ) - ) + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"notification warnings are a preview feature\.", + ) + ) + with warnings_check(expected_warnings): driver = neo4j.AsyncGraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], - warn_notification_severity=neo4j.NotificationMinimumSeverity.OFF, **kwargs, ) key = backend.next_key() @@ -239,221 +248,222 @@ async def new_driver(backend, data): await backend.send_response("Driver", {"id": key}) -@request_handler -async def new_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() +if SESSION_AUTH_STABILIZED: + from neo4j.auth_management import ( + AsyncAuthManager, + AsyncAuthManagers, + ExpiringAuth, + ) + + @request_handler + async def new_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + class TestKitAuthManager(AsyncAuthManager): + async def get_auth(self): + key = backend.next_key() + await backend.send_response( + "AuthTokenManagerGetAuthRequest", + { + "id": key, + "authTokenManagerId": auth_token_manager_id, + }, + ) + if not await backend.process_request(): + # connection was closed before end of next message + return None + if key not in backend.auth_token_supplies: + raise RuntimeError( + "Backend did not receive expected " + "AuthTokenManagerGetAuthCompleted message for id " + f"{key}" + ) + return backend.auth_token_supplies.pop(key) + + async def handle_security_exception(self, auth, error): + key = backend.next_key() + await backend.send_response( + "AuthTokenManagerHandleSecurityExceptionRequest", + { + "id": key, + "authTokenManagerId": auth_token_manager_id, + "auth": totestkit.auth_token(auth), + "errorCode": error.code, + }, + ) + if not await backend.process_request(): + # connection was closed before end of next message + return None + if key not in backend.auth_token_on_expiration_supplies: + raise RuntimeError( + "Backend did not receive expected " + "AuthTokenManagerHandleSecurityExceptionCompleted " + f"message for id {key}" + ) + return backend.auth_token_on_expiration_supplies.pop(key) + + auth_manager = TestKitAuthManager() + backend.auth_token_managers[auth_token_manager_id] = auth_manager + await backend.send_response( + "AuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + async def auth_token_manager_get_auth_completed(backend, data): + auth_token = fromtestkit.to_auth_token(data, "auth") - class TestKitAuthManager(AsyncAuthManager): - async def get_auth(self): + backend.auth_token_supplies[data["requestId"]] = auth_token + + @request_handler + async def auth_token_manager_handle_security_exception_completed( + backend, data + ): + handled = data["handled"] + backend.auth_token_on_expiration_supplies[data["requestId"]] = handled + + @request_handler + async def auth_token_manager_close(backend, data): + auth_token_manager_id = data["id"] + del backend.auth_token_managers[auth_token_manager_id] + await backend.send_response( + "AuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + async def new_basic_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + async def auth_token_provider(): key = backend.next_key() await backend.send_response( - "AuthTokenManagerGetAuthRequest", + "BasicAuthTokenProviderRequest", { "id": key, - "authTokenManagerId": auth_token_manager_id, + "basicAuthTokenManagerId": auth_token_manager_id, }, ) if not await backend.process_request(): # connection was closed before end of next message return None - if key not in backend.auth_token_supplies: + if key not in backend.basic_auth_token_supplies: raise RuntimeError( "Backend did not receive expected " - f"AuthTokenManagerGetAuthCompleted message for id {key}" + "BasicAuthTokenManagerCompleted message for id " + f"{key}" ) - return backend.auth_token_supplies.pop(key) + return backend.basic_auth_token_supplies.pop(key) + + auth_manager = AsyncAuthManagers.basic(auth_token_provider) + backend.auth_token_managers[auth_token_manager_id] = auth_manager + await backend.send_response( + "BasicAuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + async def basic_auth_token_provider_completed(backend, data): + auth = fromtestkit.to_auth_token(data, "auth") + backend.basic_auth_token_supplies[data["requestId"]] = auth - async def handle_security_exception(self, auth, error): + @request_handler + async def new_bearer_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + async def auth_token_provider(): key = backend.next_key() await backend.send_response( - "AuthTokenManagerHandleSecurityExceptionRequest", + "BearerAuthTokenProviderRequest", { "id": key, - "authTokenManagerId": auth_token_manager_id, - "auth": totestkit.auth_token(auth), - "errorCode": error.code, + "bearerAuthTokenManagerId": auth_token_manager_id, }, ) if not await backend.process_request(): # connection was closed before end of next message - return None - if key not in backend.auth_token_on_expiration_supplies: + return neo4j.auth_management.ExpiringAuth(None, None) + if key not in backend.expiring_auth_token_supplies: raise RuntimeError( "Backend did not receive expected " - "AuthTokenManagerHandleSecurityExceptionCompleted message " - f"for id {key}" + "BearerAuthTokenManagerCompleted message for id " + f"{key}" ) - return backend.auth_token_on_expiration_supplies.pop(key) - - auth_manager = TestKitAuthManager() - backend.auth_token_managers[auth_token_manager_id] = auth_manager - await backend.send_response( - "AuthTokenManager", {"id": auth_token_manager_id} - ) - - -@request_handler -async def auth_token_manager_get_auth_completed(backend, data): - auth_token = fromtestkit.to_auth_token(data, "auth") - - backend.auth_token_supplies[data["requestId"]] = auth_token - - -@request_handler -async def auth_token_manager_handle_security_exception_completed( - backend, data -): - handled = data["handled"] - backend.auth_token_on_expiration_supplies[data["requestId"]] = handled - + return backend.expiring_auth_token_supplies.pop(key) -@request_handler -async def auth_token_manager_close(backend, data): - auth_token_manager_id = data["id"] - del backend.auth_token_managers[auth_token_manager_id] - await backend.send_response( - "AuthTokenManager", {"id": auth_token_manager_id} - ) - - -@request_handler -async def new_basic_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() - - async def auth_token_provider(): - key = backend.next_key() + auth_manager = AsyncAuthManagers.bearer(auth_token_provider) + backend.auth_token_managers[auth_token_manager_id] = auth_manager await backend.send_response( - "BasicAuthTokenProviderRequest", - { - "id": key, - "basicAuthTokenManagerId": auth_token_manager_id, - }, + "BearerAuthTokenManager", {"id": auth_token_manager_id} ) - if not await backend.process_request(): - # connection was closed before end of next message - return None - if key not in backend.basic_auth_token_supplies: - raise RuntimeError( - "Backend did not receive expected " - "BasicAuthTokenManagerCompleted message for id " - f"{key}" - ) - return backend.basic_auth_token_supplies.pop(key) - - auth_manager = AsyncAuthManagers.basic(auth_token_provider) - backend.auth_token_managers[auth_token_manager_id] = auth_manager - await backend.send_response( - "BasicAuthTokenManager", {"id": auth_token_manager_id} - ) - -@request_handler -async def basic_auth_token_provider_completed(backend, data): - auth = fromtestkit.to_auth_token(data, "auth") - backend.basic_auth_token_supplies[data["requestId"]] = auth - - -@request_handler -async def new_bearer_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() - - async def auth_token_provider(): - key = backend.next_key() - await backend.send_response( - "BearerAuthTokenProviderRequest", - { - "id": key, - "bearerAuthTokenManagerId": auth_token_manager_id, - }, + @request_handler + async def bearer_auth_token_provider_completed(backend, data): + temp_auth_data = data["auth"] + temp_auth_data.mark_item_as_read_if_equals( + "name", "AuthTokenAndExpiration" ) - if not await backend.process_request(): - # connection was closed before end of next message - return neo4j.auth_management.ExpiringAuth(None, None) - if key not in backend.expiring_auth_token_supplies: - raise RuntimeError( - "Backend did not receive expected " - "BearerAuthTokenManagerCompleted message for id " - f"{key}" - ) - return backend.expiring_auth_token_supplies.pop(key) - - auth_manager = AsyncAuthManagers.bearer(auth_token_provider) - backend.auth_token_managers[auth_token_manager_id] = auth_manager - await backend.send_response( - "BearerAuthTokenManager", {"id": auth_token_manager_id} - ) + temp_auth_data = temp_auth_data["data"] + auth_token = fromtestkit.to_auth_token(temp_auth_data, "auth") + expiring_auth = ExpiringAuth(auth_token) + if temp_auth_data["expiresInMs"] is not None: + expires_in = temp_auth_data["expiresInMs"] / 1000 + expiring_auth = expiring_auth.expires_in(expires_in) + backend.expiring_auth_token_supplies[data["requestId"]] = expiring_auth -@request_handler -async def bearer_auth_token_provider_completed(backend, data): - temp_auth_data = data["auth"] - temp_auth_data.mark_item_as_read_if_equals( - "name", "AuthTokenAndExpiration" - ) - temp_auth_data = temp_auth_data["data"] - auth_token = fromtestkit.to_auth_token(temp_auth_data, "auth") - expiring_auth = ExpiringAuth(auth_token) - if temp_auth_data["expiresInMs"] is not None: - expires_in = temp_auth_data["expiresInMs"] / 1000 - expiring_auth = expiring_auth.expires_in(expires_in) - backend.expiring_auth_token_supplies[data["requestId"]] = expiring_auth +if MTLS_SUPPORT: + from neo4j.auth_management import AsyncClientCertificateProvider + class TestKitClientCertificateProvider(AsyncClientCertificateProvider): + def __init__(self, backend): + self.id = backend.next_key() + self._backend = backend -class TestKitClientCertificateProvider(AsyncClientCertificateProvider): - def __init__(self, backend): - self.id = backend.next_key() - self._backend = backend - - async def get_certificate(self) -> ClientCertificate | None: - request_id = self._backend.next_key() - await self._backend.send_response( - "ClientCertificateProviderRequest", - { - "id": request_id, - "clientCertificateProviderId": self.id, - }, - ) - if not await self._backend.process_request(): - # connection was closed before end of next message - return None - if request_id not in self._backend.client_cert_supplies: - raise RuntimeError( - "Backend did not receive expected " - "ClientCertificateProviderCompleted message for id " - f"{request_id}" + async def get_certificate(self) -> ClientCertificate | None: + request_id = self._backend.next_key() + await self._backend.send_response( + "ClientCertificateProviderRequest", + { + "id": request_id, + "clientCertificateProviderId": self.id, + }, ) - return self._backend.client_cert_supplies.pop(request_id) - - -@request_handler -async def new_client_certificate_provider(backend, data): - provider = TestKitClientCertificateProvider(backend) - backend.client_cert_providers[provider.id] = provider - await backend.send_response( - "ClientCertificateProvider", {"id": provider.id} - ) - + if not await self._backend.process_request(): + # connection was closed before end of next message + return None + if request_id not in self._backend.client_cert_supplies: + raise RuntimeError( + "Backend did not receive expected " + "ClientCertificateProviderCompleted message for id " + f"{request_id}" + ) + return self._backend.client_cert_supplies.pop(request_id) -@request_handler -async def client_certificate_provider_close(backend, data): - client_cert_provider_id = data["id"] - del backend.client_cert_providers[client_cert_provider_id] - await backend.send_response( - "ClientCertificateProvider", {"id": client_cert_provider_id} - ) + @request_handler + async def new_client_certificate_provider(backend, data): + provider = TestKitClientCertificateProvider(backend) + backend.client_cert_providers[provider.id] = provider + await backend.send_response( + "ClientCertificateProvider", {"id": provider.id} + ) + @request_handler + async def client_certificate_provider_close(backend, data): + client_cert_provider_id = data["id"] + del backend.client_cert_providers[client_cert_provider_id] + await backend.send_response( + "ClientCertificateProvider", {"id": client_cert_provider_id} + ) -@request_handler -async def client_certificate_provider_completed(backend, data): - has_update = data["hasUpdate"] - request_id = data["requestId"] - if not has_update: - data.mark_item_as_read("clientCertificate", recursive=True) - backend.client_cert_supplies[request_id] = None - return - client_cert = fromtestkit.to_client_cert(data, "clientCertificate") - backend.client_cert_supplies[request_id] = client_cert + @request_handler + async def client_certificate_provider_completed(backend, data): + has_update = data["hasUpdate"] + request_id = data["requestId"] + if not has_update: + data.mark_item_as_read("clientCertificate", recursive=True) + backend.client_cert_supplies[request_id] = None + return + client_cert = fromtestkit.to_client_cert(data, "clientCertificate") + backend.client_cert_supplies[request_id] = client_cert @request_handler @@ -485,72 +495,114 @@ async def get_server_info(backend, data): async def check_multi_db_support(backend, data): driver_id = data["driverId"] driver = backend.drivers[driver_id] - available = await driver.supports_multi_db() + expected_warnings = [] + if VERSION < (5, 8): + expected_warnings.append( + ( + neo4j.ExperimentalWarning, + "Feature support query, based on Bolt protocol version", + ) + ) + with warnings_check(expected_warnings): + available = await driver.supports_multi_db() await backend.send_response( "MultiDBSupport", {"id": backend.next_key(), "available": available} ) -@request_handler -async def verify_authentication(backend, data): - driver_id = data["driverId"] - driver = backend.drivers[driver_id] - auth = fromtestkit.to_auth_token(data, "authorizationToken") - authenticated = await driver.verify_authentication(auth=auth) - await backend.send_response( - "DriverIsAuthenticated", - {"id": backend.next_key(), "authenticated": authenticated}, - ) +if SESSION_AUTH_SUPPORTED: + @request_handler + async def verify_authentication(backend, data): + driver_id = data["driverId"] + driver = backend.drivers[driver_id] + auth = fromtestkit.to_auth_token(data, "authorizationToken") + expected_warnings = [] + if not SESSION_AUTH_STABILIZED: + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"User switching is a preview feature\.", + ) + ) + with warnings_check(expected_warnings): + authenticated = await driver.verify_authentication(auth=auth) + await backend.send_response( + "DriverIsAuthenticated", + {"id": backend.next_key(), "authenticated": authenticated}, + ) -@request_handler -async def check_session_auth_support(backend, data): - driver_id = data["driverId"] - driver = backend.drivers[driver_id] - available = await driver.supports_session_auth() - await backend.send_response( - "SessionAuthSupport", - {"id": backend.next_key(), "available": available}, - ) + @request_handler + async def check_session_auth_support(backend, data): + driver_id = data["driverId"] + driver = backend.drivers[driver_id] + available = await driver.supports_session_auth() + await backend.send_response( + "SessionAuthSupport", + {"id": backend.next_key(), "available": available}, + ) -@request_handler -async def execute_query(backend, data): - driver = backend.drivers[data["driverId"]] - cypher, params = fromtestkit.to_cypher_and_params(data) - config = data.get("config", {}) - kwargs = {} - for config_key, kwargs_key in ( - ("database", "database_"), - ("routing", "routing_"), - ("impersonatedUser", "impersonated_user_"), - ): - value = config.get(config_key, None) - if value is not None: - kwargs[kwargs_key] = value - tx_kwargs = fromtestkit.to_tx_kwargs(config) - query = neo4j.Query(cypher, **tx_kwargs) if tx_kwargs else cypher - bookmark_manager_id = config.get("bookmarkManagerId") - if bookmark_manager_id is not None: - if bookmark_manager_id == -1: - kwargs["bookmark_manager_"] = None - else: - bookmark_manager = backend.bookmark_managers[bookmark_manager_id] - kwargs["bookmark_manager_"] = bookmark_manager - if "authorizationToken" in config: - kwargs["auth_"] = fromtestkit.to_auth_token( - config, "authorizationToken" - ) +if EXECUTE_QUERY_SUPPORT: + + @request_handler + async def execute_query(backend, data): + driver = backend.drivers[data["driverId"]] + cypher, params = fromtestkit.to_cypher_and_params(data) + config = data.get("config", {}) + expected_warnings = [] + kwargs = {} + for config_key, kwargs_key in ( + ("database", "database_"), + ("routing", "routing_"), + ("impersonatedUser", "impersonated_user_"), + ): + value = config.get(config_key, None) + if value is not None: + kwargs[kwargs_key] = value + tx_kwargs = fromtestkit.to_tx_kwargs(config) + query = neo4j.Query(cypher, **tx_kwargs) if tx_kwargs else cypher + bookmark_manager_id = config.get("bookmarkManagerId") + if bookmark_manager_id is not None: + if bookmark_manager_id == -1: + kwargs["bookmark_manager_"] = None + else: + bookmark_manager = backend.bookmark_managers[ + bookmark_manager_id + ] + kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + if SESSION_AUTH_SUPPORTED: + kwargs["auth_"] = fromtestkit.to_auth_token( + config, "authorizationToken" + ) + if not SESSION_AUTH_STABILIZED and kwargs["auth_"] is not None: + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"User switching is a preview feature\.", + ) + ) + else: + raise TimeWarpError("session auth") + + if not EXECUTE_QUERY_STABILIZED: + expected_warnings.append( + ( + neo4j.ExperimentalWarning, + r"Driver\.execute_query is experimental\.", + ) + ) - eager_result = await driver.execute_query(query, params, **kwargs) - await backend.send_response( - "EagerResult", - { - "keys": eager_result.keys, - "records": list(map(totestkit.record, eager_result.records)), - "summary": totestkit.summary(eager_result.summary), - }, - ) + eager_result = await driver.execute_query(query, params, **kwargs) + await backend.send_response( + "EagerResult", + { + "keys": eager_result.keys, + "records": list(map(totestkit.record, eager_result.records)), + "summary": totestkit.summary(eager_result.summary), + }, + ) def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False): @@ -617,91 +669,96 @@ async def domain_name_resolution_completed(backend, data): backend.dns_resolutions[data["requestId"]] = data["addresses"] -@request_handler -async def new_bookmark_manager(backend, data): - bookmark_manager_id = backend.next_key() - - bmm_kwargs = {} - data.mark_item_as_read("initialBookmarks", recursive=True) - bmm_kwargs["initial_bookmarks"] = data.get("initialBookmarks") - if data.get("bookmarksSupplierRegistered"): - bmm_kwargs["bookmarks_supplier"] = bookmarks_supplier( - backend, bookmark_manager_id - ) - if data.get("bookmarksConsumerRegistered"): - bmm_kwargs["bookmarks_consumer"] = bookmarks_consumer( - backend, bookmark_manager_id - ) - - bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager(**bmm_kwargs) - backend.bookmark_managers[bookmark_manager_id] = bookmark_manager - await backend.send_response("BookmarkManager", {"id": bookmark_manager_id}) - +if BMM_SUPPORT: -@request_handler -async def bookmark_manager_close(backend, data): - bookmark_manager_id = data["id"] - del backend.bookmark_managers[bookmark_manager_id] - await backend.send_response("BookmarkManager", {"id": bookmark_manager_id}) + @request_handler + async def new_bookmark_manager(backend, data): + bookmark_manager_id = backend.next_key() + bmm_kwargs = {} + data.mark_item_as_read("initialBookmarks", recursive=True) + bmm_kwargs["initial_bookmarks"] = data.get("initialBookmarks") + if data.get("bookmarksSupplierRegistered"): + bmm_kwargs["bookmarks_supplier"] = bookmarks_supplier( + backend, bookmark_manager_id + ) + if data.get("bookmarksConsumerRegistered"): + bmm_kwargs["bookmarks_consumer"] = bookmarks_consumer( + backend, bookmark_manager_id + ) -def bookmarks_supplier(backend, bookmark_manager_id): - async def supplier(): - key = backend.next_key() + bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager( + **bmm_kwargs + ) + backend.bookmark_managers[bookmark_manager_id] = bookmark_manager await backend.send_response( - "BookmarksSupplierRequest", - { - "id": key, - "bookmarkManagerId": bookmark_manager_id, - }, + "BookmarkManager", + {"id": bookmark_manager_id}, ) - if not await backend.process_request(): - # connection was closed before end of next message - return [] - if key not in backend.bookmarks_supplies: - raise RuntimeError( - "Backend did not receive expected " - f"BookmarksSupplierCompleted message for id {key}" - ) - return backend.bookmarks_supplies.pop(key) - - return supplier + @request_handler + async def bookmark_manager_close(backend, data): + bookmark_manager_id = data["id"] + del backend.bookmark_managers[bookmark_manager_id] + await backend.send_response( + "BookmarkManager", + {"id": bookmark_manager_id}, + ) -@request_handler -async def bookmarks_supplier_completed(backend, data): - backend.bookmarks_supplies[data["requestId"]] = ( - neo4j.Bookmarks.from_raw_values(data["bookmarks"]) - ) + def bookmarks_supplier(backend, bookmark_manager_id): + async def supplier(): + key = backend.next_key() + await backend.send_response( + "BookmarksSupplierRequest", + { + "id": key, + "bookmarkManagerId": bookmark_manager_id, + }, + ) + if not await backend.process_request(): + # connection was closed before end of next message + return [] + if key not in backend.bookmarks_supplies: + raise RuntimeError( + "Backend did not receive expected " + f"BookmarksSupplierCompleted message for id {key}" + ) + return backend.bookmarks_supplies.pop(key) + return supplier -def bookmarks_consumer(backend, bookmark_manager_id): - async def consumer(bookmarks): - key = backend.next_key() - await backend.send_response( - "BookmarksConsumerRequest", - { - "id": key, - "bookmarkManagerId": bookmark_manager_id, - "bookmarks": list(bookmarks.raw_values), - }, + @request_handler + async def bookmarks_supplier_completed(backend, data): + backend.bookmarks_supplies[data["requestId"]] = ( + neo4j.Bookmarks.from_raw_values(data["bookmarks"]) ) - if not await backend.process_request(): - # connection was closed before end of next message - return - if key not in backend.bookmarks_consumptions: - raise RuntimeError( - "Backend did not receive expected " - f"BookmarksConsumerCompleted message for id {key}" - ) - del backend.bookmarks_consumptions[key] - return consumer + def bookmarks_consumer(backend, bookmark_manager_id): + async def consumer(bookmarks): + key = backend.next_key() + await backend.send_response( + "BookmarksConsumerRequest", + { + "id": key, + "bookmarkManagerId": bookmark_manager_id, + "bookmarks": list(bookmarks.raw_values), + }, + ) + if not await backend.process_request(): + # connection was closed before end of next message + return + if key not in backend.bookmarks_consumptions: + raise RuntimeError( + "Backend did not receive expected " + f"BookmarksConsumerCompleted message for id {key}" + ) + del backend.bookmarks_consumptions[key] + return consumer -@request_handler -async def bookmarks_consumer_completed(backend, data): - backend.bookmarks_consumptions[data["requestId"]] = True + @request_handler + async def bookmarks_consumer_completed(backend, data): + backend.bookmarks_consumptions[data["requestId"]] = True @request_handler @@ -748,10 +805,13 @@ async def new_session(backend, data): config["bookmarks"] = neo4j.Bookmarks.from_raw_values( data["bookmarks"] ) - if data.get("bookmarkManagerId") is not None: - config["bookmark_manager"] = backend.bookmark_managers[ - data["bookmarkManagerId"] - ] + if BMM_SUPPORT: + if data.get("bookmarkManagerId") is not None: + config["bookmark_manager"] = backend.bookmark_managers[ + data["bookmarkManagerId"] + ] + elif data.get("bookmarkManagerId") is not None: + raise TimeWarpError("bookmark managers") for conf_name, data_name in ( ("fetch_size", "fetchSize"), ("impersonated_user", "impersonatedUser"), diff --git a/testkitbackend/_preview_imports.py b/testkitbackend/_preview_imports.py index 20f1bb9c..22f747b5 100644 --- a/testkitbackend/_preview_imports.py +++ b/testkitbackend/_preview_imports.py @@ -14,15 +14,23 @@ # limitations under the License. -import neo4j - from ._warning_check import warning_check +from .time_warp_compat import ( + GQL_STATUS_SUPPORT, + PREVIEW_WARNING_SUPPORTED, +) + + +__all__ = [] -with warning_check(neo4j.PreviewWarning, r".*\bGQLSTATUS\b.*"): - from neo4j import NotificationDisabledClassification +if PREVIEW_WARNING_SUPPORTED: + from neo4j import PreviewWarning + if GQL_STATUS_SUPPORT: + with warning_check(PreviewWarning, r".*\bGQLSTATUS\b.*"): + from neo4j import NotificationDisabledClassification -__all__ = [ - "NotificationDisabledClassification", -] + __all__ += [ + "NotificationDisabledClassification", + ] diff --git a/testkitbackend/_sync/requests.py b/testkitbackend/_sync/requests.py index e960ad8f..8ea15f71 100644 --- a/testkitbackend/_sync/requests.py +++ b/testkitbackend/_sync/requests.py @@ -17,26 +17,16 @@ from __future__ import annotations import datetime -import json import re -import ssl import typing as t import warnings -from os import path from freezegun import freeze_time import neo4j import neo4j.api -import neo4j.auth_management from neo4j._async_compat.util import Util from neo4j._routing import RoutingTable -from neo4j.auth_management import ( - AuthManager, - AuthManagers, - ClientCertificateProvider, - ExpiringAuth, -) from .. import ( fromtestkit, @@ -44,7 +34,26 @@ totestkit, ) from .._warning_check import warnings_check -from ..exceptions import MarkdAsDriverError +from ..exceptions import ( + MarkdAsDriverError, + TimeWarpError, +) +from ..test_config import ( + FEATURES, + SKIPPED_TESTS, +) +from ..time_warp_compat import ( + BMM_SUPPORT, + EXECUTE_QUERY_STABILIZED, + EXECUTE_QUERY_SUPPORT, + LIVENESS_CHECK_SUPPORT, + MTLS_SUPPORT, + NOTIFICATION_WARNINGS_SUPPORTED, + SESSION_AUTH_STABILIZED, + SESSION_AUTH_SUPPORTED, + TELEMETRY_SUPPORT, + VERSION, +) if t.TYPE_CHECKING: @@ -99,20 +108,6 @@ class FrontendError(Exception): pass -def load_config(): - config_path = path.join(path.dirname(__file__), "..", "test_config.json") - with open(config_path, encoding="utf-8") as fd: - config = json.load(fd) - skips = config["skips"] - features = [k for k, v in config["features"].items() if v is True] - if ssl.HAS_TLSv1_3: - features += ["Feature:TLS:1.3"] - return skips, features - - -SKIPPED_TESTS, FEATURES = load_config() - - def _get_skip_reason(test_name): for skip_pattern, reason in SKIPPED_TESTS.items(): if skip_pattern[0] == skip_pattern[-1] == "'": @@ -185,23 +180,33 @@ def new_driver(backend, data): data["resolverRegistered"], data["domainNameResolverRegistered"], ) - for timeout_testkit, timeout_driver in ( + timeout_configs = [ ("connectionTimeoutMs", "connection_timeout"), ("maxTxRetryTimeMs", "max_transaction_retry_time"), ("connectionAcquisitionTimeoutMs", "connection_acquisition_timeout"), - ("livenessCheckTimeoutMs", "liveness_check_timeout"), ("maxConnectionLifetimeMs", "max_connection_lifetime"), - ): + ] + if LIVENESS_CHECK_SUPPORT: + timeout_configs.append( + ("livenessCheckTimeoutMs", "liveness_check_timeout"), + ) + elif data.get("livenessCheckTimeoutMs") is not None: + raise TimeWarpError("liveness check") + for timeout_testkit, timeout_driver in timeout_configs: if data.get(timeout_testkit) is not None: kwargs[timeout_driver] = data[timeout_testkit] / 1000 for k in ("sessionConnectionTimeoutMs", "updateRoutingTableTimeoutMs"): if k in data: data.mark_item_as_read_if_equals(k, None) - for conf_name, data_name in ( + copy_configs = [ ("max_connection_pool_size", "maxConnectionPoolSize"), ("fetch_size", "fetchSize"), - ("telemetry_disabled", "telemetryDisabled"), - ): + ] + if TELEMETRY_SUPPORT: + copy_configs.append(("telemetry_disabled", "telemetryDisabled")) + elif data.get("telemetryDisabled") is not None: + raise TimeWarpError("telemetry") + for conf_name, data_name in copy_configs: if data.get(data_name): kwargs[conf_name] = data[data_name] for conf_name, data_name in (("encrypted", "encrypted"),): @@ -220,18 +225,22 @@ def new_driver(backend, data): kwargs["trusted_certificates"] = neo4j.TrustCustomCAs(*cert_paths) fromtestkit.set_notifications_config(kwargs, data) - expected_warnings.append( - ( - neo4j.PreviewWarning, - r"notification warnings are a preview feature\.", + if NOTIFICATION_WARNINGS_SUPPORTED: + kwargs["warn_notification_severity"] = ( + neo4j.NotificationMinimumSeverity.OFF ) - ) + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"notification warnings are a preview feature\.", + ) + ) + with warnings_check(expected_warnings): driver = neo4j.GraphDatabase.driver( data["uri"], auth=auth, user_agent=data["userAgent"], - warn_notification_severity=neo4j.NotificationMinimumSeverity.OFF, **kwargs, ) key = backend.next_key() @@ -239,221 +248,222 @@ def new_driver(backend, data): backend.send_response("Driver", {"id": key}) -@request_handler -def new_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() +if SESSION_AUTH_STABILIZED: + from neo4j.auth_management import ( + AuthManager, + AuthManagers, + ExpiringAuth, + ) + + @request_handler + def new_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + class TestKitAuthManager(AuthManager): + def get_auth(self): + key = backend.next_key() + backend.send_response( + "AuthTokenManagerGetAuthRequest", + { + "id": key, + "authTokenManagerId": auth_token_manager_id, + }, + ) + if not backend.process_request(): + # connection was closed before end of next message + return None + if key not in backend.auth_token_supplies: + raise RuntimeError( + "Backend did not receive expected " + "AuthTokenManagerGetAuthCompleted message for id " + f"{key}" + ) + return backend.auth_token_supplies.pop(key) + + def handle_security_exception(self, auth, error): + key = backend.next_key() + backend.send_response( + "AuthTokenManagerHandleSecurityExceptionRequest", + { + "id": key, + "authTokenManagerId": auth_token_manager_id, + "auth": totestkit.auth_token(auth), + "errorCode": error.code, + }, + ) + if not backend.process_request(): + # connection was closed before end of next message + return None + if key not in backend.auth_token_on_expiration_supplies: + raise RuntimeError( + "Backend did not receive expected " + "AuthTokenManagerHandleSecurityExceptionCompleted " + f"message for id {key}" + ) + return backend.auth_token_on_expiration_supplies.pop(key) + + auth_manager = TestKitAuthManager() + backend.auth_token_managers[auth_token_manager_id] = auth_manager + backend.send_response( + "AuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + def auth_token_manager_get_auth_completed(backend, data): + auth_token = fromtestkit.to_auth_token(data, "auth") - class TestKitAuthManager(AuthManager): - def get_auth(self): + backend.auth_token_supplies[data["requestId"]] = auth_token + + @request_handler + def auth_token_manager_handle_security_exception_completed( + backend, data + ): + handled = data["handled"] + backend.auth_token_on_expiration_supplies[data["requestId"]] = handled + + @request_handler + def auth_token_manager_close(backend, data): + auth_token_manager_id = data["id"] + del backend.auth_token_managers[auth_token_manager_id] + backend.send_response( + "AuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + def new_basic_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + def auth_token_provider(): key = backend.next_key() backend.send_response( - "AuthTokenManagerGetAuthRequest", + "BasicAuthTokenProviderRequest", { "id": key, - "authTokenManagerId": auth_token_manager_id, + "basicAuthTokenManagerId": auth_token_manager_id, }, ) if not backend.process_request(): # connection was closed before end of next message return None - if key not in backend.auth_token_supplies: + if key not in backend.basic_auth_token_supplies: raise RuntimeError( "Backend did not receive expected " - f"AuthTokenManagerGetAuthCompleted message for id {key}" + "BasicAuthTokenManagerCompleted message for id " + f"{key}" ) - return backend.auth_token_supplies.pop(key) + return backend.basic_auth_token_supplies.pop(key) + + auth_manager = AuthManagers.basic(auth_token_provider) + backend.auth_token_managers[auth_token_manager_id] = auth_manager + backend.send_response( + "BasicAuthTokenManager", {"id": auth_token_manager_id} + ) + + @request_handler + def basic_auth_token_provider_completed(backend, data): + auth = fromtestkit.to_auth_token(data, "auth") + backend.basic_auth_token_supplies[data["requestId"]] = auth - def handle_security_exception(self, auth, error): + @request_handler + def new_bearer_auth_token_manager(backend, data): + auth_token_manager_id = backend.next_key() + + def auth_token_provider(): key = backend.next_key() backend.send_response( - "AuthTokenManagerHandleSecurityExceptionRequest", + "BearerAuthTokenProviderRequest", { "id": key, - "authTokenManagerId": auth_token_manager_id, - "auth": totestkit.auth_token(auth), - "errorCode": error.code, + "bearerAuthTokenManagerId": auth_token_manager_id, }, ) if not backend.process_request(): # connection was closed before end of next message - return None - if key not in backend.auth_token_on_expiration_supplies: + return neo4j.auth_management.ExpiringAuth(None, None) + if key not in backend.expiring_auth_token_supplies: raise RuntimeError( "Backend did not receive expected " - "AuthTokenManagerHandleSecurityExceptionCompleted message " - f"for id {key}" + "BearerAuthTokenManagerCompleted message for id " + f"{key}" ) - return backend.auth_token_on_expiration_supplies.pop(key) - - auth_manager = TestKitAuthManager() - backend.auth_token_managers[auth_token_manager_id] = auth_manager - backend.send_response( - "AuthTokenManager", {"id": auth_token_manager_id} - ) - - -@request_handler -def auth_token_manager_get_auth_completed(backend, data): - auth_token = fromtestkit.to_auth_token(data, "auth") - - backend.auth_token_supplies[data["requestId"]] = auth_token - - -@request_handler -def auth_token_manager_handle_security_exception_completed( - backend, data -): - handled = data["handled"] - backend.auth_token_on_expiration_supplies[data["requestId"]] = handled - + return backend.expiring_auth_token_supplies.pop(key) -@request_handler -def auth_token_manager_close(backend, data): - auth_token_manager_id = data["id"] - del backend.auth_token_managers[auth_token_manager_id] - backend.send_response( - "AuthTokenManager", {"id": auth_token_manager_id} - ) - - -@request_handler -def new_basic_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() - - def auth_token_provider(): - key = backend.next_key() + auth_manager = AuthManagers.bearer(auth_token_provider) + backend.auth_token_managers[auth_token_manager_id] = auth_manager backend.send_response( - "BasicAuthTokenProviderRequest", - { - "id": key, - "basicAuthTokenManagerId": auth_token_manager_id, - }, + "BearerAuthTokenManager", {"id": auth_token_manager_id} ) - if not backend.process_request(): - # connection was closed before end of next message - return None - if key not in backend.basic_auth_token_supplies: - raise RuntimeError( - "Backend did not receive expected " - "BasicAuthTokenManagerCompleted message for id " - f"{key}" - ) - return backend.basic_auth_token_supplies.pop(key) - - auth_manager = AuthManagers.basic(auth_token_provider) - backend.auth_token_managers[auth_token_manager_id] = auth_manager - backend.send_response( - "BasicAuthTokenManager", {"id": auth_token_manager_id} - ) - -@request_handler -def basic_auth_token_provider_completed(backend, data): - auth = fromtestkit.to_auth_token(data, "auth") - backend.basic_auth_token_supplies[data["requestId"]] = auth - - -@request_handler -def new_bearer_auth_token_manager(backend, data): - auth_token_manager_id = backend.next_key() - - def auth_token_provider(): - key = backend.next_key() - backend.send_response( - "BearerAuthTokenProviderRequest", - { - "id": key, - "bearerAuthTokenManagerId": auth_token_manager_id, - }, + @request_handler + def bearer_auth_token_provider_completed(backend, data): + temp_auth_data = data["auth"] + temp_auth_data.mark_item_as_read_if_equals( + "name", "AuthTokenAndExpiration" ) - if not backend.process_request(): - # connection was closed before end of next message - return neo4j.auth_management.ExpiringAuth(None, None) - if key not in backend.expiring_auth_token_supplies: - raise RuntimeError( - "Backend did not receive expected " - "BearerAuthTokenManagerCompleted message for id " - f"{key}" - ) - return backend.expiring_auth_token_supplies.pop(key) - - auth_manager = AuthManagers.bearer(auth_token_provider) - backend.auth_token_managers[auth_token_manager_id] = auth_manager - backend.send_response( - "BearerAuthTokenManager", {"id": auth_token_manager_id} - ) + temp_auth_data = temp_auth_data["data"] + auth_token = fromtestkit.to_auth_token(temp_auth_data, "auth") + expiring_auth = ExpiringAuth(auth_token) + if temp_auth_data["expiresInMs"] is not None: + expires_in = temp_auth_data["expiresInMs"] / 1000 + expiring_auth = expiring_auth.expires_in(expires_in) + backend.expiring_auth_token_supplies[data["requestId"]] = expiring_auth -@request_handler -def bearer_auth_token_provider_completed(backend, data): - temp_auth_data = data["auth"] - temp_auth_data.mark_item_as_read_if_equals( - "name", "AuthTokenAndExpiration" - ) - temp_auth_data = temp_auth_data["data"] - auth_token = fromtestkit.to_auth_token(temp_auth_data, "auth") - expiring_auth = ExpiringAuth(auth_token) - if temp_auth_data["expiresInMs"] is not None: - expires_in = temp_auth_data["expiresInMs"] / 1000 - expiring_auth = expiring_auth.expires_in(expires_in) - backend.expiring_auth_token_supplies[data["requestId"]] = expiring_auth +if MTLS_SUPPORT: + from neo4j.auth_management import ClientCertificateProvider + class TestKitClientCertificateProvider(ClientCertificateProvider): + def __init__(self, backend): + self.id = backend.next_key() + self._backend = backend -class TestKitClientCertificateProvider(ClientCertificateProvider): - def __init__(self, backend): - self.id = backend.next_key() - self._backend = backend - - def get_certificate(self) -> ClientCertificate | None: - request_id = self._backend.next_key() - self._backend.send_response( - "ClientCertificateProviderRequest", - { - "id": request_id, - "clientCertificateProviderId": self.id, - }, - ) - if not self._backend.process_request(): - # connection was closed before end of next message - return None - if request_id not in self._backend.client_cert_supplies: - raise RuntimeError( - "Backend did not receive expected " - "ClientCertificateProviderCompleted message for id " - f"{request_id}" + def get_certificate(self) -> ClientCertificate | None: + request_id = self._backend.next_key() + self._backend.send_response( + "ClientCertificateProviderRequest", + { + "id": request_id, + "clientCertificateProviderId": self.id, + }, ) - return self._backend.client_cert_supplies.pop(request_id) - - -@request_handler -def new_client_certificate_provider(backend, data): - provider = TestKitClientCertificateProvider(backend) - backend.client_cert_providers[provider.id] = provider - backend.send_response( - "ClientCertificateProvider", {"id": provider.id} - ) - + if not self._backend.process_request(): + # connection was closed before end of next message + return None + if request_id not in self._backend.client_cert_supplies: + raise RuntimeError( + "Backend did not receive expected " + "ClientCertificateProviderCompleted message for id " + f"{request_id}" + ) + return self._backend.client_cert_supplies.pop(request_id) -@request_handler -def client_certificate_provider_close(backend, data): - client_cert_provider_id = data["id"] - del backend.client_cert_providers[client_cert_provider_id] - backend.send_response( - "ClientCertificateProvider", {"id": client_cert_provider_id} - ) + @request_handler + def new_client_certificate_provider(backend, data): + provider = TestKitClientCertificateProvider(backend) + backend.client_cert_providers[provider.id] = provider + backend.send_response( + "ClientCertificateProvider", {"id": provider.id} + ) + @request_handler + def client_certificate_provider_close(backend, data): + client_cert_provider_id = data["id"] + del backend.client_cert_providers[client_cert_provider_id] + backend.send_response( + "ClientCertificateProvider", {"id": client_cert_provider_id} + ) -@request_handler -def client_certificate_provider_completed(backend, data): - has_update = data["hasUpdate"] - request_id = data["requestId"] - if not has_update: - data.mark_item_as_read("clientCertificate", recursive=True) - backend.client_cert_supplies[request_id] = None - return - client_cert = fromtestkit.to_client_cert(data, "clientCertificate") - backend.client_cert_supplies[request_id] = client_cert + @request_handler + def client_certificate_provider_completed(backend, data): + has_update = data["hasUpdate"] + request_id = data["requestId"] + if not has_update: + data.mark_item_as_read("clientCertificate", recursive=True) + backend.client_cert_supplies[request_id] = None + return + client_cert = fromtestkit.to_client_cert(data, "clientCertificate") + backend.client_cert_supplies[request_id] = client_cert @request_handler @@ -485,72 +495,114 @@ def get_server_info(backend, data): def check_multi_db_support(backend, data): driver_id = data["driverId"] driver = backend.drivers[driver_id] - available = driver.supports_multi_db() + expected_warnings = [] + if VERSION < (5, 8): + expected_warnings.append( + ( + neo4j.ExperimentalWarning, + "Feature support query, based on Bolt protocol version", + ) + ) + with warnings_check(expected_warnings): + available = driver.supports_multi_db() backend.send_response( "MultiDBSupport", {"id": backend.next_key(), "available": available} ) -@request_handler -def verify_authentication(backend, data): - driver_id = data["driverId"] - driver = backend.drivers[driver_id] - auth = fromtestkit.to_auth_token(data, "authorizationToken") - authenticated = driver.verify_authentication(auth=auth) - backend.send_response( - "DriverIsAuthenticated", - {"id": backend.next_key(), "authenticated": authenticated}, - ) +if SESSION_AUTH_SUPPORTED: + @request_handler + def verify_authentication(backend, data): + driver_id = data["driverId"] + driver = backend.drivers[driver_id] + auth = fromtestkit.to_auth_token(data, "authorizationToken") + expected_warnings = [] + if not SESSION_AUTH_STABILIZED: + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"User switching is a preview feature\.", + ) + ) + with warnings_check(expected_warnings): + authenticated = driver.verify_authentication(auth=auth) + backend.send_response( + "DriverIsAuthenticated", + {"id": backend.next_key(), "authenticated": authenticated}, + ) -@request_handler -def check_session_auth_support(backend, data): - driver_id = data["driverId"] - driver = backend.drivers[driver_id] - available = driver.supports_session_auth() - backend.send_response( - "SessionAuthSupport", - {"id": backend.next_key(), "available": available}, - ) + @request_handler + def check_session_auth_support(backend, data): + driver_id = data["driverId"] + driver = backend.drivers[driver_id] + available = driver.supports_session_auth() + backend.send_response( + "SessionAuthSupport", + {"id": backend.next_key(), "available": available}, + ) -@request_handler -def execute_query(backend, data): - driver = backend.drivers[data["driverId"]] - cypher, params = fromtestkit.to_cypher_and_params(data) - config = data.get("config", {}) - kwargs = {} - for config_key, kwargs_key in ( - ("database", "database_"), - ("routing", "routing_"), - ("impersonatedUser", "impersonated_user_"), - ): - value = config.get(config_key, None) - if value is not None: - kwargs[kwargs_key] = value - tx_kwargs = fromtestkit.to_tx_kwargs(config) - query = neo4j.Query(cypher, **tx_kwargs) if tx_kwargs else cypher - bookmark_manager_id = config.get("bookmarkManagerId") - if bookmark_manager_id is not None: - if bookmark_manager_id == -1: - kwargs["bookmark_manager_"] = None - else: - bookmark_manager = backend.bookmark_managers[bookmark_manager_id] - kwargs["bookmark_manager_"] = bookmark_manager - if "authorizationToken" in config: - kwargs["auth_"] = fromtestkit.to_auth_token( - config, "authorizationToken" - ) +if EXECUTE_QUERY_SUPPORT: + + @request_handler + def execute_query(backend, data): + driver = backend.drivers[data["driverId"]] + cypher, params = fromtestkit.to_cypher_and_params(data) + config = data.get("config", {}) + expected_warnings = [] + kwargs = {} + for config_key, kwargs_key in ( + ("database", "database_"), + ("routing", "routing_"), + ("impersonatedUser", "impersonated_user_"), + ): + value = config.get(config_key, None) + if value is not None: + kwargs[kwargs_key] = value + tx_kwargs = fromtestkit.to_tx_kwargs(config) + query = neo4j.Query(cypher, **tx_kwargs) if tx_kwargs else cypher + bookmark_manager_id = config.get("bookmarkManagerId") + if bookmark_manager_id is not None: + if bookmark_manager_id == -1: + kwargs["bookmark_manager_"] = None + else: + bookmark_manager = backend.bookmark_managers[ + bookmark_manager_id + ] + kwargs["bookmark_manager_"] = bookmark_manager + if "authorizationToken" in config: + if SESSION_AUTH_SUPPORTED: + kwargs["auth_"] = fromtestkit.to_auth_token( + config, "authorizationToken" + ) + if not SESSION_AUTH_STABILIZED and kwargs["auth_"] is not None: + expected_warnings.append( + ( + neo4j.PreviewWarning, + r"User switching is a preview feature\.", + ) + ) + else: + raise TimeWarpError("session auth") + + if not EXECUTE_QUERY_STABILIZED: + expected_warnings.append( + ( + neo4j.ExperimentalWarning, + r"Driver\.execute_query is experimental\.", + ) + ) - eager_result = driver.execute_query(query, params, **kwargs) - backend.send_response( - "EagerResult", - { - "keys": eager_result.keys, - "records": list(map(totestkit.record, eager_result.records)), - "summary": totestkit.summary(eager_result.summary), - }, - ) + eager_result = driver.execute_query(query, params, **kwargs) + backend.send_response( + "EagerResult", + { + "keys": eager_result.keys, + "records": list(map(totestkit.record, eager_result.records)), + "summary": totestkit.summary(eager_result.summary), + }, + ) def resolution_func(backend, custom_resolver=False, custom_dns_resolver=False): @@ -617,91 +669,96 @@ def domain_name_resolution_completed(backend, data): backend.dns_resolutions[data["requestId"]] = data["addresses"] -@request_handler -def new_bookmark_manager(backend, data): - bookmark_manager_id = backend.next_key() - - bmm_kwargs = {} - data.mark_item_as_read("initialBookmarks", recursive=True) - bmm_kwargs["initial_bookmarks"] = data.get("initialBookmarks") - if data.get("bookmarksSupplierRegistered"): - bmm_kwargs["bookmarks_supplier"] = bookmarks_supplier( - backend, bookmark_manager_id - ) - if data.get("bookmarksConsumerRegistered"): - bmm_kwargs["bookmarks_consumer"] = bookmarks_consumer( - backend, bookmark_manager_id - ) - - bookmark_manager = neo4j.GraphDatabase.bookmark_manager(**bmm_kwargs) - backend.bookmark_managers[bookmark_manager_id] = bookmark_manager - backend.send_response("BookmarkManager", {"id": bookmark_manager_id}) - +if BMM_SUPPORT: -@request_handler -def bookmark_manager_close(backend, data): - bookmark_manager_id = data["id"] - del backend.bookmark_managers[bookmark_manager_id] - backend.send_response("BookmarkManager", {"id": bookmark_manager_id}) + @request_handler + def new_bookmark_manager(backend, data): + bookmark_manager_id = backend.next_key() + bmm_kwargs = {} + data.mark_item_as_read("initialBookmarks", recursive=True) + bmm_kwargs["initial_bookmarks"] = data.get("initialBookmarks") + if data.get("bookmarksSupplierRegistered"): + bmm_kwargs["bookmarks_supplier"] = bookmarks_supplier( + backend, bookmark_manager_id + ) + if data.get("bookmarksConsumerRegistered"): + bmm_kwargs["bookmarks_consumer"] = bookmarks_consumer( + backend, bookmark_manager_id + ) -def bookmarks_supplier(backend, bookmark_manager_id): - def supplier(): - key = backend.next_key() + bookmark_manager = neo4j.GraphDatabase.bookmark_manager( + **bmm_kwargs + ) + backend.bookmark_managers[bookmark_manager_id] = bookmark_manager backend.send_response( - "BookmarksSupplierRequest", - { - "id": key, - "bookmarkManagerId": bookmark_manager_id, - }, + "BookmarkManager", + {"id": bookmark_manager_id}, ) - if not backend.process_request(): - # connection was closed before end of next message - return [] - if key not in backend.bookmarks_supplies: - raise RuntimeError( - "Backend did not receive expected " - f"BookmarksSupplierCompleted message for id {key}" - ) - return backend.bookmarks_supplies.pop(key) - - return supplier + @request_handler + def bookmark_manager_close(backend, data): + bookmark_manager_id = data["id"] + del backend.bookmark_managers[bookmark_manager_id] + backend.send_response( + "BookmarkManager", + {"id": bookmark_manager_id}, + ) -@request_handler -def bookmarks_supplier_completed(backend, data): - backend.bookmarks_supplies[data["requestId"]] = ( - neo4j.Bookmarks.from_raw_values(data["bookmarks"]) - ) + def bookmarks_supplier(backend, bookmark_manager_id): + def supplier(): + key = backend.next_key() + backend.send_response( + "BookmarksSupplierRequest", + { + "id": key, + "bookmarkManagerId": bookmark_manager_id, + }, + ) + if not backend.process_request(): + # connection was closed before end of next message + return [] + if key not in backend.bookmarks_supplies: + raise RuntimeError( + "Backend did not receive expected " + f"BookmarksSupplierCompleted message for id {key}" + ) + return backend.bookmarks_supplies.pop(key) + return supplier -def bookmarks_consumer(backend, bookmark_manager_id): - def consumer(bookmarks): - key = backend.next_key() - backend.send_response( - "BookmarksConsumerRequest", - { - "id": key, - "bookmarkManagerId": bookmark_manager_id, - "bookmarks": list(bookmarks.raw_values), - }, + @request_handler + def bookmarks_supplier_completed(backend, data): + backend.bookmarks_supplies[data["requestId"]] = ( + neo4j.Bookmarks.from_raw_values(data["bookmarks"]) ) - if not backend.process_request(): - # connection was closed before end of next message - return - if key not in backend.bookmarks_consumptions: - raise RuntimeError( - "Backend did not receive expected " - f"BookmarksConsumerCompleted message for id {key}" - ) - del backend.bookmarks_consumptions[key] - return consumer + def bookmarks_consumer(backend, bookmark_manager_id): + def consumer(bookmarks): + key = backend.next_key() + backend.send_response( + "BookmarksConsumerRequest", + { + "id": key, + "bookmarkManagerId": bookmark_manager_id, + "bookmarks": list(bookmarks.raw_values), + }, + ) + if not backend.process_request(): + # connection was closed before end of next message + return + if key not in backend.bookmarks_consumptions: + raise RuntimeError( + "Backend did not receive expected " + f"BookmarksConsumerCompleted message for id {key}" + ) + del backend.bookmarks_consumptions[key] + return consumer -@request_handler -def bookmarks_consumer_completed(backend, data): - backend.bookmarks_consumptions[data["requestId"]] = True + @request_handler + def bookmarks_consumer_completed(backend, data): + backend.bookmarks_consumptions[data["requestId"]] = True @request_handler @@ -748,10 +805,13 @@ def new_session(backend, data): config["bookmarks"] = neo4j.Bookmarks.from_raw_values( data["bookmarks"] ) - if data.get("bookmarkManagerId") is not None: - config["bookmark_manager"] = backend.bookmark_managers[ - data["bookmarkManagerId"] - ] + if BMM_SUPPORT: + if data.get("bookmarkManagerId") is not None: + config["bookmark_manager"] = backend.bookmark_managers[ + data["bookmarkManagerId"] + ] + elif data.get("bookmarkManagerId") is not None: + raise TimeWarpError("bookmark managers") for conf_name, data_name in ( ("fetch_size", "fetchSize"), ("impersonated_user", "impersonatedUser"), diff --git a/testkitbackend/exceptions.py b/testkitbackend/exceptions.py index bacd901d..d49b15a9 100644 --- a/testkitbackend/exceptions.py +++ b/testkitbackend/exceptions.py @@ -14,9 +14,27 @@ # limitations under the License. +from .time_warp_compat import VERSION + + class MarkdAsDriverError(Exception): """Wrap any error as DriverException.""" def __init__(self, wrapped_exc): super().__init__() self.wrapped_exc = wrapped_exc + + +class TimeWarpError(Exception): + """ + Request cannot be fulfilled with in the current time warp mode. + + The backend understood the request, but is running in time warp mode + against an older driver that does not support the requested feature. + """ + + def __init__(self, feature_name: str): + super().__init__( + f"{feature_name.capitalize()} is not supported in time warp mode " + f"(driver version {'.'.join(map(str, VERSION))})." + ) diff --git a/testkitbackend/fromtestkit.py b/testkitbackend/fromtestkit.py index 438e08f4..4e09c4dd 100644 --- a/testkitbackend/fromtestkit.py +++ b/testkitbackend/fromtestkit.py @@ -16,17 +16,13 @@ from __future__ import annotations +import typing as t from datetime import timedelta import pytz import neo4j -from neo4j import ( - NotificationDisabledCategory, - NotificationMinimumSeverity, - Query, -) -from neo4j.auth_management import ClientCertificate +from neo4j import Query from neo4j.spatial import ( CartesianPoint, WGS84Point, @@ -38,7 +34,16 @@ Time, ) -from ._preview_imports import NotificationDisabledClassification +from .exceptions import TimeWarpError +from .time_warp_compat import ( + GQL_STATUS_SUPPORT, + MTLS_SUPPORT, + NOTIFICATION_FILTER_SUPPORTED, +) + + +if t.TYPE_CHECKING: + from neo4j.auth_management import ClientCertificate def to_cypher_and_params(data): @@ -221,24 +226,48 @@ def to_client_cert(data, key) -> ClientCertificate | None: if data[key] is None: return None data[key].mark_item_as_read_if_equals("name", "ClientCertificate") - cert_data = data[key]["data"] - return ClientCertificate( - cert_data["certfile"], cert_data["keyfile"], cert_data["password"] - ) + if MTLS_SUPPORT: + from neo4j.auth_management import ClientCertificate + + cert_data = data[key]["data"] + return ClientCertificate( + cert_data["certfile"], + cert_data["keyfile"], + cert_data["password"], + ) + else: + raise TimeWarpError("certificates for mTLS") def set_notifications_config(config, data): - if "notificationsMinSeverity" in data: - config["notifications_min_severity"] = NotificationMinimumSeverity[ - data["notificationsMinSeverity"] - ] - if "notificationsDisabledCategories" in data: - config["notifications_disabled_categories"] = [ - NotificationDisabledCategory[c] - for c in data["notificationsDisabledCategories"] - ] - if "notificationsDisabledClassifications" in data: - config["notifications_disabled_classifications"] = [ - NotificationDisabledClassification[c] - for c in data["notificationsDisabledClassifications"] - ] + if NOTIFICATION_FILTER_SUPPORTED: + from neo4j import ( + NotificationDisabledCategory, + NotificationMinimumSeverity, + ) + + if "notificationsMinSeverity" in data: + config["notifications_min_severity"] = NotificationMinimumSeverity[ + data["notificationsMinSeverity"] + ] + if "notificationsDisabledCategories" in data: + config["notifications_disabled_categories"] = [ + NotificationDisabledCategory[c] + for c in data["notificationsDisabledCategories"] + ] + elif ( + "notificationsMinSeverity" in data + or "notificationsDisabledCategories" in data + ): + raise TimeWarpError("notification filtering") + + if GQL_STATUS_SUPPORT: + from ._preview_imports import NotificationDisabledClassification + + if "notificationsDisabledClassifications" in data: + config["notifications_disabled_classifications"] = [ + NotificationDisabledClassification[c] + for c in data["notificationsDisabledClassifications"] + ] + elif "notificationsDisabledClassifications" in data: + raise TimeWarpError("GQL status object filtering") diff --git a/testkitbackend/test_config.json b/testkitbackend/test_config.json index 1762678c..a13f3223 100644 --- a/testkitbackend/test_config.json +++ b/testkitbackend/test_config.json @@ -60,7 +60,7 @@ "Feature:Bolt:5.2": true, "Feature:Bolt:5.3": true, "Feature:Bolt:5.4": true, - "Feature:Bolt:5.5": true, + "Feature:Bolt:5.5": "Version was never released in a server", "Feature:Bolt:5.6": true, "Feature:Bolt:5.7": true, "Feature:Bolt:5.8": true, diff --git a/testkitbackend/test_config.py b/testkitbackend/test_config.py new file mode 100644 index 00000000..bc0b935f --- /dev/null +++ b/testkitbackend/test_config.py @@ -0,0 +1,46 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [https://neo4j.com] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json +import ssl +from os import path + +from .time_warp_compat import ( + BLOCKED_TESTKIT_FEATURES, + EXTRA_TESTKIT_FEATURES, +) + + +__all__ = ["FEATURES", "SKIPPED_TESTS"] + + +def _load_config(): + config_path = path.join(path.dirname(__file__), "test_config.json") + with open(config_path, encoding="utf-8") as fd: + config = json.load(fd) + skips = config["skips"] + features = [ + k + for k, v in config["features"].items() + if v is True and k not in BLOCKED_TESTKIT_FEATURES + ] + features.extend(EXTRA_TESTKIT_FEATURES) + if ssl.HAS_TLSv1_3: + features += ["Feature:TLS:1.3"] + return skips, features + + +SKIPPED_TESTS, FEATURES = _load_config() diff --git a/testkitbackend/time_warp_compat.py b/testkitbackend/time_warp_compat.py new file mode 100644 index 00000000..946e96d4 --- /dev/null +++ b/testkitbackend/time_warp_compat.py @@ -0,0 +1,169 @@ +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [https://neo4j.com] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +import os +import typing as t + + +if t.TYPE_CHECKING: + import typing_extensions as te + + +__all__ = [ + "BLOCKED_TESTKIT_FEATURES", + "BMM_SUPPORT", + "EXECUTE_QUERY_STABILIZED", + "EXECUTE_QUERY_SUPPORT", + "EXTRA_TESTKIT_FEATURES", + "GQL_ERROR_SUPPORT", + "GQL_STATUS_SUPPORT", + "LIVENESS_CHECK_SUPPORT", + "MTLS_SUPPORT", + "NOTIFICATION_FILTER_SUPPORTED", + "NOTIFICATION_WARNINGS_SUPPORTED", + "PREVIEW_WARNING_SUPPORTED", + "RESULT_FAILED_ERROR_SUPPORT", + "SESSION_AUTH_STABILIZED", + "SESSION_AUTH_SUPPORTED", + "SUMMARY_NOTIFICATION_SUPPORTED", + "TELEMETRY_SUPPORT", + "VERSION", + "is_gql_error", + "is_result_failed_error", +] + + +def _get_time_warp_version() -> tuple[float, ...]: + time_warp_env = os.environ.get("DRIVER_TIME_WARP") + if not time_warp_env: + return (float("inf"),) + return tuple(int(e) for e in time_warp_env.split(".")) + + +VERSION: te.Final[tuple[float, ...]] = _get_time_warp_version() +# was added as preview in 5.0, but changed in 5.3 +BMM_SUPPORT: te.Final[bool] = VERSION >= (5, 3) +EXECUTE_QUERY_SUPPORT: te.Final[bool] = VERSION >= (5, 5) +SUMMARY_NOTIFICATION_SUPPORTED: te.Final[bool] = VERSION >= (5, 7) +NOTIFICATION_FILTER_SUPPORTED: te.Final[bool] = VERSION >= (5, 7) +EXECUTE_QUERY_STABILIZED: te.Final[bool] = VERSION >= (5, 8) +PREVIEW_WARNING_SUPPORTED: te.Final[bool] = VERSION >= (5, 8) +SESSION_AUTH_SUPPORTED: te.Final[bool] = VERSION >= (5, 8) +TELEMETRY_SUPPORT: te.Final[bool] = VERSION >= (5, 13) +SESSION_AUTH_STABILIZED: te.Final[bool] = VERSION >= (5, 14) +RESULT_FAILED_ERROR_SUPPORT: te.Final[bool] = VERSION >= (5, 14) +LIVENESS_CHECK_SUPPORT: te.Final[bool] = VERSION >= (5, 14) +MTLS_SUPPORT: te.Final[bool] = VERSION >= (5, 19) +NOTIFICATION_WARNINGS_SUPPORTED: te.Final[bool] = VERSION >= (5, 21) +GQL_STATUS_SUPPORT: te.Final[bool] = VERSION >= (5, 22) +GQL_ERROR_SUPPORT: te.Final[bool] = VERSION >= (5, 26) + + +def _get_blocked_testkit_features() -> frozenset[str]: + blocked: list[str] = [] + if not BMM_SUPPORT: # 5.3 + blocked.extend(("Feature:API:BookmarkManager",)) + if not NOTIFICATION_FILTER_SUPPORTED: # 5.7 + blocked.extend( + ( + "Feature:API:Driver:NotificationsConfig", + "Feature:API:Session:NotificationsConfig", + ) + ) + if VERSION < (5, 7): + blocked.extend( + ( + "Feature:API:Session:AuthConfig", + "Feature:Bolt:5.1", + "Feature:Bolt:5.2", + "Optimization:AuthPipelining", + ) + ) + if VERSION < (5, 8): + blocked.extend( + ( + "Feature:API:Driver.VerifyAuthentication", + "Feature:API:Driver.SupportsSessionAuth", + "Feature:API:Driver.ExecuteQuery:WithAuth", + "Feature:API:Session:AuthConfig", + ) + ) + if VERSION < (5, 9): + blocked.extend(("Feature:Bolt:5.3",)) + if VERSION < (5, 11): + blocked.extend(("Optimization:ExecuteQueryPipelining",)) + if not SESSION_AUTH_STABILIZED: # 5.12 + blocked.extend(("Feature:Auth:Managed",)) + if not TELEMETRY_SUPPORT: # 5.13 + blocked.extend(("Feature:Bolt:5.4",)) + if not LIVENESS_CHECK_SUPPORT: # 5.14 + blocked.extend(("Feature:API:Liveness.Check",)) + if not MTLS_SUPPORT: # 5.19 + blocked.extend(("Feature:API:SSLClientCertificate",)) + if not GQL_STATUS_SUPPORT: # 5.22 + blocked.extend(("Feature:API:Summary:GqlStatusObjects",)) + if VERSION < (5, 23): + blocked.extend(("Feature:Bolt:5.6",)) + if VERSION < (5, 26): + blocked.extend(("Feature:Bolt:5.7",)) + if VERSION < (5, 28): + blocked.extend( + ( + "Feature:Bolt:HandshakeManifestV1", + "Feature:Bolt:5.8", + "Optimization:HomeDatabaseCache", + "Optimization:HomeDbCacheBasicPrincipalIsImpersonatedUser", + ) + ) + + return frozenset(blocked) + + +def _get_extra_testkit_features() -> frozenset[str]: + extra: list[str] = [] + if VERSION < (5, 28): + extra.extend(("Feature:Bolt:4.0",)) + return frozenset(extra) + + +BLOCKED_TESTKIT_FEATURES: te.Final[frozenset[str]] = ( + _get_blocked_testkit_features() +) +EXTRA_TESTKIT_FEATURES: te.Final[frozenset[str]] = frozenset() + + +if GQL_ERROR_SUPPORT: + from neo4j.exceptions import GqlError + + def is_gql_error(exc): + return isinstance(exc, GqlError) +else: + + def is_gql_error(exc): + return False + + +if RESULT_FAILED_ERROR_SUPPORT: + from neo4j.exceptions import ResultFailedError + + def is_result_failed_error(exc): + return isinstance(exc, ResultFailedError) +else: + + def is_result_failed_error(exc): + return False diff --git a/testkitbackend/totestkit.py b/testkitbackend/totestkit.py index eac8aec3..5d774eac 100644 --- a/testkitbackend/totestkit.py +++ b/testkitbackend/totestkit.py @@ -19,11 +19,7 @@ import math import neo4j -from neo4j.exceptions import ( - GqlError, - Neo4jError, - ResultFailedError, -) +from neo4j.exceptions import Neo4jError from neo4j.graph import ( Node, Path, @@ -42,6 +38,12 @@ from ._warning_check import warning_check from .exceptions import MarkdAsDriverError +from .time_warp_compat import ( + GQL_STATUS_SUPPORT, + is_gql_error, + is_result_failed_error, + SUMMARY_NOTIFICATION_SUPPORTED, +) def record(rec): @@ -72,6 +74,8 @@ def serialize_notifications() -> list[dict] | None: if summary_.notifications is None: gql_aware_protocol = summary_.server.protocol_version >= (5, 5) return [] if gql_aware_protocol else None + if not SUMMARY_NOTIFICATION_SUPPORTED: + return summary_.notifications return [ serialize_notification(n) for n in summary_.summary_notifications ] @@ -147,7 +151,9 @@ def format_address(address: neo4j.Address): ), "database": summary_.database, "notifications": serialize_notifications(), - "gqlStatusObjects": serialize_gql_status_objects(), + "gqlStatusObjects": ( + serialize_gql_status_objects() if GQL_STATUS_SUPPORT else [] + ), "plan": summary_.plan, "profile": summary_.profile, "query": { @@ -316,7 +322,7 @@ def driver_exc(exc, id_=None): payload["msg"] = _exc_msg(exc) if isinstance(exc, Neo4jError): payload["code"] = exc.code - if isinstance(exc, GqlError): + if is_gql_error(exc): with warning_check(neo4j.PreviewWarning, r".*\bGQLSTATUS\b.*"): payload["gqlStatus"] = exc.gql_status with warning_check(neo4j.PreviewWarning, r".*\bGQLSTATUS\b.*"): @@ -341,7 +347,7 @@ def _exc_msg(exc, max_depth=10): return str(exc.message) depth = 0 - if isinstance(exc, GqlError): + if is_gql_error(exc): if isinstance(exc, Neo4jError): res = str(exc.message) if exc.message is not None else str(exc) else: @@ -354,12 +360,12 @@ def _exc_msg(exc, max_depth=10): if ( # Not including GqlError in the chain as they will be serialized # separately in the `cause` field. - isinstance(exc.__cause__, GqlError) + is_gql_error(exc.__cause__) # Special case for ResultFailedError: # Always serialize the cause in the message to please TestKit. # Else, the cause's class name will get lost (can't be serialized # as a field in of an error cause). - and not isinstance(exc, ResultFailedError) + and not is_result_failed_error(exc) ): break depth += 1 @@ -375,7 +381,7 @@ def driver_exc_cause(exc, max_depth=10): return None if max_depth <= 0: return None - if not isinstance(exc, GqlError): + if not is_gql_error(exc): return driver_exc_cause( getattr(exc, "__cause__", None), max_depth=max_depth - 1 )