diff --git a/Dockerfile--altlinux_10.tmpl b/Dockerfile--altlinux_10.tmpl index a75e35a0..d78b05f5 100644 --- a/Dockerfile--altlinux_10.tmpl +++ b/Dockerfile--altlinux_10.tmpl @@ -115,4 +115,4 @@ ssh-keygen -t rsa -f ~/.ssh/id_rsa -q -N ''; \ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys; \ chmod 600 ~/.ssh/authorized_keys; \ ls -la ~/.ssh/; \ -TEST_FILTER=\"TestTestgresLocal or TestOsOpsLocal or local_ops\" bash ./run_tests.sh;" +TEST_FILTER=\"TestTestgresLocal or TestOsOpsLocal or local\" bash ./run_tests.sh;" diff --git a/Dockerfile--altlinux_11.tmpl b/Dockerfile--altlinux_11.tmpl index 5b43da20..5c88585d 100644 --- a/Dockerfile--altlinux_11.tmpl +++ b/Dockerfile--altlinux_11.tmpl @@ -115,4 +115,4 @@ ssh-keygen -t rsa -f ~/.ssh/id_rsa -q -N ''; \ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys; \ chmod 600 ~/.ssh/authorized_keys; \ ls -la ~/.ssh/; \ -TEST_FILTER=\"TestTestgresLocal or TestOsOpsLocal or local_ops\" bash ./run_tests.sh;" +TEST_FILTER=\"TestTestgresLocal or TestOsOpsLocal or local\" bash ./run_tests.sh;" diff --git a/Dockerfile--ubuntu_24_04.tmpl b/Dockerfile--ubuntu_24_04.tmpl index 3bdc6640..7a559776 100644 --- a/Dockerfile--ubuntu_24_04.tmpl +++ b/Dockerfile--ubuntu_24_04.tmpl @@ -10,6 +10,7 @@ RUN apt install -y sudo curl ca-certificates RUN apt update RUN apt install -y openssh-server RUN apt install -y time +RUN apt install -y netcat-traditional RUN apt update RUN apt install -y postgresql-common diff --git a/run_tests.sh b/run_tests.sh index 8202aff5..65c17dbf 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -5,7 +5,7 @@ set -eux if [ -z ${TEST_FILTER+x} ]; \ -then export TEST_FILTER="TestTestgresLocal or (TestTestgresCommon and (not remote_ops))"; \ +then export TEST_FILTER="TestTestgresLocal or (TestTestgresCommon and (not remote))"; \ fi # fail early diff --git a/setup.py b/setup.py index 3f2474dd..b47a1d8a 100755 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ setup( version='1.10.5', name='testgres', - packages=['testgres', 'testgres.operations', 'testgres.helpers'], + packages=['testgres', 'testgres.operations'], description='Testing utility for PostgreSQL and its extensions', url='https://github.com/postgrespro/testgres', long_description=readme, diff --git a/testgres/__init__.py b/testgres/__init__.py index 665548d6..339ae62e 100644 --- a/testgres/__init__.py +++ b/testgres/__init__.py @@ -34,6 +34,7 @@ DumpFormat from .node import PostgresNode, NodeApp +from .node import PortManager from .utils import \ reserve_port, \ @@ -53,8 +54,6 @@ from .operations.local_ops import LocalOperations from .operations.remote_ops import RemoteOperations -from .helpers.port_manager import PortManager - __all__ = [ "get_new_node", "get_remote_node", @@ -64,7 +63,8 @@ "TestgresException", "ExecUtilException", "QueryException", "TimeoutException", "CatchUpException", "StartNodeException", "InitNodeException", "BackupException", "InvalidOperationException", "XLogMethod", "IsolationLevel", "NodeStatus", "ProcessType", "DumpFormat", "PostgresNode", "NodeApp", + "PortManager", "reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version", - "First", "Any", "PortManager", + "First", "Any", "OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams" ] diff --git a/testgres/exceptions.py b/testgres/exceptions.py index d61d4691..20c1a8cf 100644 --- a/testgres/exceptions.py +++ b/testgres/exceptions.py @@ -7,6 +7,10 @@ class TestgresException(Exception): pass +class PortForException(TestgresException): + pass + + @six.python_2_unicode_compatible class ExecUtilException(TestgresException): def __init__(self, message=None, command=None, exit_code=0, out=None, error=None): diff --git a/testgres/helpers/__init__.py b/testgres/helpers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/testgres/helpers/port_manager.py b/testgres/helpers/port_manager.py deleted file mode 100644 index cfc5c096..00000000 --- a/testgres/helpers/port_manager.py +++ /dev/null @@ -1,41 +0,0 @@ -import socket -import random -from typing import Set, Iterable, Optional - - -class PortForException(Exception): - pass - - -class PortManager: - def __init__(self, ports_range=(1024, 65535)): - self.ports_range = ports_range - - @staticmethod - def is_port_free(port: int) -> bool: - """Check if a port is free to use.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - try: - s.bind(("", port)) - return True - except OSError: - return False - - def find_free_port(self, ports: Optional[Set[int]] = None, exclude_ports: Optional[Iterable[int]] = None) -> int: - """Return a random unused port number.""" - if ports is None: - ports = set(range(1024, 65535)) - - assert type(ports) == set # noqa: E721 - - if exclude_ports is not None: - assert isinstance(exclude_ports, Iterable) - ports.difference_update(exclude_ports) - - sampled_ports = random.sample(tuple(ports), min(len(ports), 100)) - - for port in sampled_ports: - if self.is_port_free(port): - return port - - raise PortForException("Can't select a port") diff --git a/testgres/node.py b/testgres/node.py index 1f8fca6e..5039fc43 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -1,4 +1,6 @@ # coding: utf-8 +from __future__ import annotations + import logging import os import random @@ -10,6 +12,7 @@ from queue import Queue import time +import typing try: from collections.abc import Iterable @@ -80,14 +83,16 @@ BackupException, \ InvalidOperationException +from .port_manager import PortManager +from .port_manager import PortManager__ThisHost +from .port_manager import PortManager__Generic + from .logger import TestgresLogger from .pubsub import Publication, Subscription from .standby import First -from . import utils - from .utils import \ PgVer, \ eprint, \ @@ -126,17 +131,31 @@ def __getattr__(self, name): return getattr(self.process, name) def __repr__(self): - return '{}(ptype={}, process={})'.format(self.__class__.__name__, - str(self.ptype), - repr(self.process)) + return '{}(ptype={}, process={})'.format( + self.__class__.__name__, + str(self.ptype), + repr(self.process)) class PostgresNode(object): # a max number of node start attempts _C_MAX_START_ATEMPTS = 5 - def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionParams = ConnectionParams(), - bin_dir=None, prefix=None, os_ops=None): + _name: typing.Optional[str] + _port: typing.Optional[int] + _should_free_port: bool + _os_ops: OsOperations + _port_manager: PortManager + + def __init__(self, + name=None, + base_dir=None, + port: typing.Optional[int] = None, + conn_params: ConnectionParams = ConnectionParams(), + bin_dir=None, + prefix=None, + os_ops: typing.Optional[OsOperations] = None, + port_manager: typing.Optional[PortManager] = None): """ PostgresNode constructor. @@ -145,21 +164,26 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP port: port to accept connections. base_dir: path to node's data directory. bin_dir: path to node's binary directory. + os_ops: None or correct OS operation object. + port_manager: None or correct port manager object. """ + assert port is None or type(port) == int # noqa: E721 + assert os_ops is None or isinstance(os_ops, OsOperations) + assert port_manager is None or isinstance(port_manager, PortManager) # private if os_ops is None: - os_ops = __class__._get_os_ops(conn_params) + self._os_ops = __class__._get_os_ops(conn_params) else: assert conn_params is None + assert isinstance(os_ops, OsOperations) + self._os_ops = os_ops pass - assert os_ops is not None - assert isinstance(os_ops, OsOperations) - self._os_ops = os_ops + assert self._os_ops is not None + assert isinstance(self._os_ops, OsOperations) - self._pg_version = PgVer(get_pg_version2(os_ops, bin_dir)) - self._should_free_port = port is None + self._pg_version = PgVer(get_pg_version2(self._os_ops, bin_dir)) self._base_dir = base_dir self._bin_dir = bin_dir self._prefix = prefix @@ -167,12 +191,29 @@ def __init__(self, name=None, base_dir=None, port=None, conn_params: ConnectionP self._master = None # basic - self.name = name or generate_app_name() + self._name = name or generate_app_name() + + if port is not None: + assert type(port) == int # noqa: E721 + assert port_manager is None + self._port = port + self._should_free_port = False + self._port_manager = None + else: + if port_manager is not None: + assert isinstance(port_manager, PortManager) + self._port_manager = port_manager + else: + self._port_manager = __class__._get_port_manager(self._os_ops) - self.host = os_ops.host - self.port = port or utils.reserve_port() + assert self._port_manager is not None + assert isinstance(self._port_manager, PortManager) - self.ssh_key = os_ops.ssh_key + self._port = self._port_manager.reserve_port() # raises + assert type(self._port) == int # noqa: E721 + self._should_free_port = True + + assert type(self._port) == int # noqa: E721 # defaults for __exit__() self.cleanup_on_good_exit = testgres_config.node_cleanup_on_good_exit @@ -207,7 +248,11 @@ def __exit__(self, type, value, traceback): def __repr__(self): return "{}(name='{}', port={}, base_dir='{}')".format( - self.__class__.__name__, self.name, self.port, self.base_dir) + self.__class__.__name__, + self.name, + str(self._port) if self._port is not None else "None", + self.base_dir + ) @staticmethod def _get_os_ops(conn_params: ConnectionParams) -> OsOperations: @@ -221,19 +266,39 @@ def _get_os_ops(conn_params: ConnectionParams) -> OsOperations: return LocalOperations(conn_params) + @staticmethod + def _get_port_manager(os_ops: OsOperations) -> PortManager: + assert os_ops is not None + assert isinstance(os_ops, OsOperations) + + if isinstance(os_ops, LocalOperations): + return PortManager__ThisHost() + + # TODO: Throw the exception "Please define a port manager." ? + return PortManager__Generic(os_ops) + def clone_with_new_name_and_base_dir(self, name: str, base_dir: str): assert name is None or type(name) == str # noqa: E721 assert base_dir is None or type(base_dir) == str # noqa: E721 assert __class__ == PostgresNode + if self._port_manager is None: + raise InvalidOperationException("PostgresNode without PortManager can't be cloned.") + + assert self._port_manager is not None + assert isinstance(self._port_manager, PortManager) + assert self._os_ops is not None + assert isinstance(self._os_ops, OsOperations) + node = PostgresNode( name=name, base_dir=base_dir, conn_params=None, bin_dir=self._bin_dir, prefix=self._prefix, - os_ops=self._os_ops) + os_ops=self._os_ops, + port_manager=self._port_manager) return node @@ -243,6 +308,33 @@ def os_ops(self) -> OsOperations: assert isinstance(self._os_ops, OsOperations) return self._os_ops + @property + def name(self) -> str: + if self._name is None: + raise InvalidOperationException("PostgresNode name is not defined.") + assert type(self._name) == str # noqa: E721 + return self._name + + @property + def host(self) -> str: + assert self._os_ops is not None + assert isinstance(self._os_ops, OsOperations) + return self._os_ops.host + + @property + def port(self) -> int: + if self._port is None: + raise InvalidOperationException("PostgresNode port is not defined.") + + assert type(self._port) == int # noqa: E721 + return self._port + + @property + def ssh_key(self) -> typing.Optional[str]: + assert self._os_ops is not None + assert isinstance(self._os_ops, OsOperations) + return self._os_ops.ssh_key + @property def pid(self): """ @@ -993,6 +1085,11 @@ def start(self, params=[], wait=True): if self.is_started: return self + if self._port is None: + raise InvalidOperationException("Can't start PostgresNode. Port is not defined.") + + assert type(self._port) == int # noqa: E721 + _params = [self._get_bin_path("pg_ctl"), "-D", self.data_dir, "-l", self.pg_log_file, @@ -1023,6 +1120,8 @@ def LOCAL__raise_cannot_start_node__std(from_exception): LOCAL__raise_cannot_start_node__std(e) else: assert self._should_free_port + assert self._port_manager is not None + assert isinstance(self._port_manager, PortManager) assert __class__._C_MAX_START_ATEMPTS > 1 log_files0 = self._collect_log_files() @@ -1048,20 +1147,20 @@ def LOCAL__raise_cannot_start_node__std(from_exception): log_files0 = log_files1 logging.warning( - "Detected a conflict with using the port {0}. Trying another port after a {1}-second sleep...".format(self.port, timeout) + "Detected a conflict with using the port {0}. Trying another port after a {1}-second sleep...".format(self._port, timeout) ) time.sleep(timeout) timeout = min(2 * timeout, 5) - cur_port = self.port - new_port = utils.reserve_port() # can raise + cur_port = self._port + new_port = self._port_manager.reserve_port() # can raise try: options = {'port': new_port} self.set_auto_conf(options) except: # noqa: E722 - utils.release_port(new_port) + self._port_manager.release_port(new_port) raise - self.port = new_port - utils.release_port(cur_port) + self._port = new_port + self._port_manager.release_port(cur_port) continue break self._maybe_start_logger() @@ -1222,14 +1321,22 @@ def pg_ctl(self, params): def free_port(self): """ Reclaim port owned by this node. - NOTE: does not free auto selected ports. + NOTE: this method does not release manually defined port but reset it. """ + assert type(self._should_free_port) == bool # noqa: E721 + + if not self._should_free_port: + self._port = None + else: + assert type(self._port) == int # noqa: E721 + + assert self._port_manager is not None + assert isinstance(self._port_manager, PortManager) - if self._should_free_port: - port = self.port + port = self._port self._should_free_port = False - self.port = None - utils.release_port(port) + self._port = None + self._port_manager.release_port(port) def cleanup(self, max_attempts=3, full=False): """ diff --git a/testgres/operations/local_ops.py b/testgres/operations/local_ops.py index 35e94210..39c81405 100644 --- a/testgres/operations/local_ops.py +++ b/testgres/operations/local_ops.py @@ -6,6 +6,7 @@ import subprocess import tempfile import time +import socket import psutil @@ -436,6 +437,16 @@ def get_process_children(self, pid): assert type(pid) == int # noqa: E721 return psutil.Process(pid).children() + def is_port_free(self, number: int) -> bool: + assert type(number) == int # noqa: E721 + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", number)) + return True + except OSError: + return False + # Database control def db_connect(self, dbname, user, password=None, host="localhost", port=5432): conn = pglib.connect( diff --git a/testgres/operations/os_ops.py b/testgres/operations/os_ops.py index 3c606871..489a7cb2 100644 --- a/testgres/operations/os_ops.py +++ b/testgres/operations/os_ops.py @@ -127,6 +127,10 @@ def get_pid(self): def get_process_children(self, pid): raise NotImplementedError() + def is_port_free(self, number: int): + assert type(number) == int # noqa: E721 + raise NotImplementedError() + # Database control def db_connect(self, dbname, user, password=None, host="localhost", port=5432): raise NotImplementedError() diff --git a/testgres/operations/remote_ops.py b/testgres/operations/remote_ops.py index a3ecf637..ee747e52 100644 --- a/testgres/operations/remote_ops.py +++ b/testgres/operations/remote_ops.py @@ -629,6 +629,54 @@ def get_process_children(self, pid): raise ExecUtilException(f"Error in getting process children. Error: {result.stderr}") + def is_port_free(self, number: int) -> bool: + assert type(number) == int # noqa: E721 + + cmd = ["nc", "-w", "5", "-z", "-v", "localhost", str(number)] + + exit_status, output, error = self.exec_command(cmd=cmd, encoding=get_default_encoding(), ignore_errors=True, verbose=True) + + assert type(output) == str # noqa: E721 + assert type(error) == str # noqa: E721 + + if exit_status == 0: + return __class__._is_port_free__process_0(error) + + if exit_status == 1: + return __class__._is_port_free__process_1(error) + + errMsg = "nc returns an unknown result code: {0}".format(exit_status) + + RaiseError.CommandExecutionError( + cmd=cmd, + exit_code=exit_status, + message=errMsg, + error=error, + out=output + ) + + @staticmethod + def _is_port_free__process_0(error: str) -> bool: + assert type(error) == str # noqa: E721 + # + # Example of error text: + # "Connection to localhost (127.0.0.1) 1024 port [tcp/*] succeeded!\n" + # + # May be here is needed to check error message? + # + return False + + @staticmethod + def _is_port_free__process_1(error: str) -> bool: + assert type(error) == str # noqa: E721 + # + # Example of error text: + # "nc: connect to localhost (127.0.0.1) port 1024 (tcp) failed: Connection refused\n" + # + # May be here is needed to check error message? + # + return True + # Database control def db_connect(self, dbname, user, password=None, host="localhost", port=5432): conn = pglib.connect( diff --git a/testgres/port_manager.py b/testgres/port_manager.py new file mode 100644 index 00000000..164661e7 --- /dev/null +++ b/testgres/port_manager.py @@ -0,0 +1,102 @@ +from .operations.os_ops import OsOperations + +from .exceptions import PortForException + +from . import utils + +import threading +import random + + +class PortManager: + def __init__(self): + super().__init__() + + def reserve_port(self) -> int: + raise NotImplementedError("PortManager::reserve_port is not implemented.") + + def release_port(self, number: int) -> None: + assert type(number) == int # noqa: E721 + raise NotImplementedError("PortManager::release_port is not implemented.") + + +class PortManager__ThisHost(PortManager): + sm_single_instance: PortManager = None + sm_single_instance_guard = threading.Lock() + + def __init__(self): + pass + + def __new__(cls) -> PortManager: + assert __class__ == PortManager__ThisHost + assert __class__.sm_single_instance_guard is not None + + if __class__.sm_single_instance is None: + with __class__.sm_single_instance_guard: + __class__.sm_single_instance = super().__new__(cls) + assert __class__.sm_single_instance + assert type(__class__.sm_single_instance) == __class__ # noqa: E721 + return __class__.sm_single_instance + + def reserve_port(self) -> int: + return utils.reserve_port() + + def release_port(self, number: int) -> None: + assert type(number) == int # noqa: E721 + return utils.release_port(number) + + +class PortManager__Generic(PortManager): + _os_ops: OsOperations + _guard: object + # TODO: is there better to use bitmap fot _available_ports? + _available_ports: set[int] + _reserved_ports: set[int] + + def __init__(self, os_ops: OsOperations): + assert os_ops is not None + assert isinstance(os_ops, OsOperations) + self._os_ops = os_ops + self._guard = threading.Lock() + self._available_ports = set[int](range(1024, 65535)) + self._reserved_ports = set[int]() + + def reserve_port(self) -> int: + assert self._guard is not None + assert type(self._available_ports) == set # noqa: E721t + assert type(self._reserved_ports) == set # noqa: E721 + + with self._guard: + t = tuple(self._available_ports) + assert len(t) == len(self._available_ports) + sampled_ports = random.sample(t, min(len(t), 100)) + t = None + + for port in sampled_ports: + assert not (port in self._reserved_ports) + assert port in self._available_ports + + if not self._os_ops.is_port_free(port): + continue + + self._reserved_ports.add(port) + self._available_ports.discard(port) + assert port in self._reserved_ports + assert not (port in self._available_ports) + return port + + raise PortForException("Can't select a port.") + + def release_port(self, number: int) -> None: + assert type(number) == int # noqa: E721 + + assert self._guard is not None + assert type(self._reserved_ports) == set # noqa: E721 + + with self._guard: + assert number in self._reserved_ports + assert not (number in self._available_ports) + self._available_ports.add(number) + self._reserved_ports.discard(number) + assert not (number in self._reserved_ports) + assert number in self._available_ports diff --git a/testgres/utils.py b/testgres/utils.py index 92383571..10ae81b6 100644 --- a/testgres/utils.py +++ b/testgres/utils.py @@ -6,6 +6,8 @@ import os import sys +import socket +import random from contextlib import contextmanager from packaging.version import Version, InvalidVersion @@ -13,7 +15,7 @@ from six import iteritems -from .helpers.port_manager import PortManager +from .exceptions import PortForException from .exceptions import ExecUtilException from .config import testgres_config as tconf from .operations.os_ops import OsOperations @@ -41,11 +43,28 @@ def internal__reserve_port(): """ Generate a new port and add it to 'bound_ports'. """ - port_mng = PortManager() - port = port_mng.find_free_port(exclude_ports=bound_ports) - bound_ports.add(port) + def LOCAL__is_port_free(port: int) -> bool: + """Check if a port is free to use.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", port)) + return True + except OSError: + return False - return port + ports = set(range(1024, 65535)) + assert type(ports) == set # noqa: E721 + assert type(bound_ports) == set # noqa: E721 + ports.difference_update(bound_ports) + + sampled_ports = random.sample(tuple(ports), min(len(ports), 100)) + + for port in sampled_ports: + if LOCAL__is_port_free(port): + bound_ports.add(port) + return port + + raise PortForException("Can't select a port") def internal__release_port(port): @@ -53,6 +72,9 @@ def internal__release_port(port): Free port provided by reserve_port(). """ + assert type(port) == int # noqa: E721 + assert port in bound_ports + bound_ports.discard(port) diff --git a/tests/helpers/global_data.py b/tests/helpers/global_data.py new file mode 100644 index 00000000..c21d7dd8 --- /dev/null +++ b/tests/helpers/global_data.py @@ -0,0 +1,78 @@ +from ...testgres.operations.os_ops import OsOperations +from ...testgres.operations.os_ops import ConnectionParams +from ...testgres.operations.local_ops import LocalOperations +from ...testgres.operations.remote_ops import RemoteOperations + +from ...testgres.node import PortManager +from ...testgres.node import PortManager__ThisHost +from ...testgres.node import PortManager__Generic + +import os + + +class OsOpsDescr: + sign: str + os_ops: OsOperations + + def __init__(self, sign: str, os_ops: OsOperations): + assert type(sign) == str # noqa: E721 + assert isinstance(os_ops, OsOperations) + self.sign = sign + self.os_ops = os_ops + + +class OsOpsDescrs: + sm_remote_conn_params = ConnectionParams( + host=os.getenv('RDBMS_TESTPOOL1_HOST') or '127.0.0.1', + username=os.getenv('USER'), + ssh_key=os.getenv('RDBMS_TESTPOOL_SSHKEY')) + + sm_remote_os_ops = RemoteOperations(sm_remote_conn_params) + + sm_remote_os_ops_descr = OsOpsDescr("remote_ops", sm_remote_os_ops) + + sm_local_os_ops = LocalOperations() + + sm_local_os_ops_descr = OsOpsDescr("local_ops", sm_local_os_ops) + + +class PortManagers: + sm_remote_port_manager = PortManager__Generic(OsOpsDescrs.sm_remote_os_ops) + + sm_local_port_manager = PortManager__ThisHost() + + sm_local2_port_manager = PortManager__Generic(OsOpsDescrs.sm_local_os_ops) + + +class PostgresNodeService: + sign: str + os_ops: OsOperations + port_manager: PortManager + + def __init__(self, sign: str, os_ops: OsOperations, port_manager: PortManager): + assert type(sign) == str # noqa: E721 + assert isinstance(os_ops, OsOperations) + assert isinstance(port_manager, PortManager) + self.sign = sign + self.os_ops = os_ops + self.port_manager = port_manager + + +class PostgresNodeServices: + sm_remote = PostgresNodeService( + "remote", + OsOpsDescrs.sm_remote_os_ops, + PortManagers.sm_remote_port_manager + ) + + sm_local = PostgresNodeService( + "local", + OsOpsDescrs.sm_local_os_ops, + PortManagers.sm_local_port_manager + ) + + sm_local2 = PostgresNodeService( + "local2", + OsOpsDescrs.sm_local_os_ops, + PortManagers.sm_local2_port_manager + ) diff --git a/tests/helpers/os_ops_descrs.py b/tests/helpers/os_ops_descrs.py deleted file mode 100644 index 02297adb..00000000 --- a/tests/helpers/os_ops_descrs.py +++ /dev/null @@ -1,32 +0,0 @@ -from ...testgres.operations.os_ops import OsOperations -from ...testgres.operations.os_ops import ConnectionParams -from ...testgres.operations.local_ops import LocalOperations -from ...testgres.operations.remote_ops import RemoteOperations - -import os - - -class OsOpsDescr: - os_ops: OsOperations - sign: str - - def __init__(self, os_ops: OsOperations, sign: str): - assert isinstance(os_ops, OsOperations) - assert type(sign) == str # noqa: E721 - self.os_ops = os_ops - self.sign = sign - - -class OsOpsDescrs: - sm_remote_conn_params = ConnectionParams( - host=os.getenv('RDBMS_TESTPOOL1_HOST') or '127.0.0.1', - username=os.getenv('USER'), - ssh_key=os.getenv('RDBMS_TESTPOOL_SSHKEY')) - - sm_remote_os_ops = RemoteOperations(sm_remote_conn_params) - - sm_remote_os_ops_descr = OsOpsDescr(sm_remote_os_ops, "remote_ops") - - sm_local_os_ops = LocalOperations() - - sm_local_os_ops_descr = OsOpsDescr(sm_local_os_ops, "local_ops") diff --git a/tests/test_os_ops_common.py b/tests/test_os_ops_common.py index c3944c3b..7d183775 100644 --- a/tests/test_os_ops_common.py +++ b/tests/test_os_ops_common.py @@ -1,7 +1,7 @@ # coding: utf-8 -from .helpers.os_ops_descrs import OsOpsDescr -from .helpers.os_ops_descrs import OsOpsDescrs -from .helpers.os_ops_descrs import OsOperations +from .helpers.global_data import OsOpsDescr +from .helpers.global_data import OsOpsDescrs +from .helpers.global_data import OsOperations from .helpers.run_conditions import RunConditions import os @@ -10,6 +10,8 @@ import re import tempfile import logging +import socket +import threading from ..testgres import InvalidOperationException from ..testgres import ExecUtilException @@ -648,3 +650,100 @@ def test_touch(self, os_ops: OsOperations): assert os_ops.isfile(filename) os_ops.remove_file(filename) + + def test_is_port_free__true(self, os_ops: OsOperations): + assert isinstance(os_ops, OsOperations) + + C_LIMIT = 128 + + ports = set(range(1024, 65535)) + assert type(ports) == set # noqa: E721 + + ok_count = 0 + no_count = 0 + + for port in ports: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", port)) + except OSError: + continue + + r = os_ops.is_port_free(port) + + if r: + ok_count += 1 + logging.info("OK. Port {} is free.".format(port)) + else: + no_count += 1 + logging.warning("NO. Port {} is not free.".format(port)) + + if ok_count == C_LIMIT: + return + + if no_count == C_LIMIT: + raise RuntimeError("To many false positive test attempts.") + + if ok_count == 0: + raise RuntimeError("No one free port was found.") + + def test_is_port_free__false(self, os_ops: OsOperations): + assert isinstance(os_ops, OsOperations) + + C_LIMIT = 10 + + ports = set(range(1024, 65535)) + assert type(ports) == set # noqa: E721 + + def LOCAL_server(s: socket.socket): + assert s is not None + assert type(s) == socket.socket # noqa: E721 + + try: + while True: + r = s.accept() + + if r is None: + break + except Exception as e: + assert e is not None + pass + + ok_count = 0 + no_count = 0 + + for port in ports: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", port)) + except OSError: + continue + + th = threading.Thread(target=LOCAL_server, args=[s]) + + s.listen(10) + + assert type(th) == threading.Thread # noqa: E721 + th.start() + + try: + r = os_ops.is_port_free(port) + finally: + s.shutdown(2) + th.join() + + if not r: + ok_count += 1 + logging.info("OK. Port {} is not free.".format(port)) + else: + no_count += 1 + logging.warning("NO. Port {} does not accept connection.".format(port)) + + if ok_count == C_LIMIT: + return + + if no_count == C_LIMIT: + raise RuntimeError("To many false positive test attempts.") + + if ok_count == 0: + raise RuntimeError("No one free port was found.") diff --git a/tests/test_os_ops_local.py b/tests/test_os_ops_local.py index 2e3c30b7..f60c3fc9 100644 --- a/tests/test_os_ops_local.py +++ b/tests/test_os_ops_local.py @@ -1,6 +1,6 @@ # coding: utf-8 -from .helpers.os_ops_descrs import OsOpsDescrs -from .helpers.os_ops_descrs import OsOperations +from .helpers.global_data import OsOpsDescrs +from .helpers.global_data import OsOperations import os diff --git a/tests/test_os_ops_remote.py b/tests/test_os_ops_remote.py index 58b09242..338e49f3 100755 --- a/tests/test_os_ops_remote.py +++ b/tests/test_os_ops_remote.py @@ -1,7 +1,7 @@ # coding: utf-8 -from .helpers.os_ops_descrs import OsOpsDescrs -from .helpers.os_ops_descrs import OsOperations +from .helpers.global_data import OsOpsDescrs +from .helpers.global_data import OsOperations from ..testgres import ExecUtilException diff --git a/tests/test_testgres_common.py b/tests/test_testgres_common.py index 4e23c4af..b286a1c6 100644 --- a/tests/test_testgres_common.py +++ b/tests/test_testgres_common.py @@ -1,6 +1,7 @@ -from .helpers.os_ops_descrs import OsOpsDescr -from .helpers.os_ops_descrs import OsOpsDescrs -from .helpers.os_ops_descrs import OsOperations +from .helpers.global_data import PostgresNodeService +from .helpers.global_data import PostgresNodeServices +from .helpers.global_data import OsOperations +from .helpers.global_data import PortManager from ..testgres.node import PgVer from ..testgres.node import PostgresNode @@ -37,6 +38,8 @@ import uuid import os import re +import subprocess +import typing @contextmanager @@ -54,22 +57,25 @@ def removing(os_ops: OsOperations, f): class TestTestgresCommon: - sm_os_ops_descrs: list[OsOpsDescr] = [ - OsOpsDescrs.sm_local_os_ops_descr, - OsOpsDescrs.sm_remote_os_ops_descr + sm_node_svcs: list[PostgresNodeService] = [ + PostgresNodeServices.sm_local, + PostgresNodeServices.sm_local2, + PostgresNodeServices.sm_remote, ] @pytest.fixture( - params=[descr.os_ops for descr in sm_os_ops_descrs], - ids=[descr.sign for descr in sm_os_ops_descrs] + params=sm_node_svcs, + ids=[descr.sign for descr in sm_node_svcs] ) - def os_ops(self, request: pytest.FixtureRequest) -> OsOperations: + def node_svc(self, request: pytest.FixtureRequest) -> PostgresNodeService: assert isinstance(request, pytest.FixtureRequest) - assert isinstance(request.param, OsOperations) + assert isinstance(request.param, PostgresNodeService) + assert isinstance(request.param.os_ops, OsOperations) + assert isinstance(request.param.port_manager, PortManager) return request.param - def test_version_management(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_version_management(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) a = PgVer('10.0') b = PgVer('10') @@ -93,42 +99,42 @@ def test_version_management(self, os_ops: OsOperations): assert (g == k) assert (g > h) - version = get_pg_version2(os_ops) + version = get_pg_version2(node_svc.os_ops) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: assert (isinstance(version, six.string_types)) assert (isinstance(node.version, PgVer)) assert (node.version == PgVer(version)) - def test_double_init(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_double_init(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops).init() as node: + with __class__.helper__get_node(node_svc).init() as node: # can't initialize node more than once with pytest.raises(expected_exception=InitNodeException): node.init() - def test_init_after_cleanup(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_init_after_cleanup(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: node.init().start().execute('select 1') node.cleanup() node.init().start().execute('select 1') - def test_init_unique_system_id(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_init_unique_system_id(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) # this function exists in PostgreSQL 9.6+ - current_version = get_pg_version2(os_ops) + current_version = get_pg_version2(node_svc.os_ops) - __class__.helper__skip_test_if_util_not_exist(os_ops, "pg_resetwal") + __class__.helper__skip_test_if_util_not_exist(node_svc.os_ops, "pg_resetwal") __class__.helper__skip_test_if_pg_version_is_not_ge(current_version, '9.6') query = 'select system_identifier from pg_control_system()' with scoped_config(cache_initdb=False): - with __class__.helper__get_node(os_ops).init().start() as node0: + with __class__.helper__get_node(node_svc).init().start() as node0: id0 = node0.execute(query)[0] with scoped_config(cache_initdb=True, @@ -137,8 +143,8 @@ def test_init_unique_system_id(self, os_ops: OsOperations): assert (config.cached_initdb_unique) # spawn two nodes; ids must be different - with __class__.helper__get_node(os_ops).init().start() as node1, \ - __class__.helper__get_node(os_ops).init().start() as node2: + with __class__.helper__get_node(node_svc).init().start() as node1, \ + __class__.helper__get_node(node_svc).init().start() as node2: id1 = node1.execute(query)[0] id2 = node2.execute(query)[0] @@ -146,44 +152,44 @@ def test_init_unique_system_id(self, os_ops: OsOperations): assert (id1 > id0) assert (id2 > id1) - def test_node_exit(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_node_exit(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) with pytest.raises(expected_exception=QueryException): - with __class__.helper__get_node(os_ops).init() as node: + with __class__.helper__get_node(node_svc).init() as node: base_dir = node.base_dir node.safe_psql('select 1') # we should save the DB for "debugging" - assert (os_ops.path_exists(base_dir)) - os_ops.rmdirs(base_dir, ignore_errors=True) + assert (node_svc.os_ops.path_exists(base_dir)) + node_svc.os_ops.rmdirs(base_dir, ignore_errors=True) - with __class__.helper__get_node(os_ops).init() as node: + with __class__.helper__get_node(node_svc).init() as node: base_dir = node.base_dir # should have been removed by default - assert not (os_ops.path_exists(base_dir)) + assert not (node_svc.os_ops.path_exists(base_dir)) - def test_double_start(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_double_start(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops).init().start() as node: + with __class__.helper__get_node(node_svc).init().start() as node: # can't start node more than once node.start() assert (node.is_started) - def test_uninitialized_start(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_uninitialized_start(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: # node is not initialized yet with pytest.raises(expected_exception=StartNodeException): node.start() - def test_restart(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_restart(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: node.init().start() # restart, ok @@ -198,10 +204,10 @@ def test_restart(self, os_ops: OsOperations): node.append_conf('pg_hba.conf', 'DUMMY') node.restart() - def test_reload(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_reload(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: node.init().start() # change client_min_messages and save old value @@ -216,24 +222,24 @@ def test_reload(self, os_ops: OsOperations): assert ('debug1' == cmm_new[0][0].lower()) assert (cmm_old != cmm_new) - def test_pg_ctl(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_pg_ctl(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: node.init().start() status = node.pg_ctl(['status']) assert ('PID' in status) - def test_status(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_status(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) assert (NodeStatus.Running) assert not (NodeStatus.Stopped) assert not (NodeStatus.Uninitialized) # check statuses after each operation - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: assert (node.pid == 0) assert (node.status() == NodeStatus.Uninitialized) @@ -257,8 +263,8 @@ def test_status(self, os_ops: OsOperations): assert (node.pid == 0) assert (node.status() == NodeStatus.Uninitialized) - def test_child_pids(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_child_pids(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) master_processes = [ ProcessType.AutovacuumLauncher, @@ -269,7 +275,7 @@ def test_child_pids(self, os_ops: OsOperations): ProcessType.WalWriter, ] - postgresVersion = get_pg_version2(os_ops) + postgresVersion = get_pg_version2(node_svc.os_ops) if __class__.helper__pg_version_ge(postgresVersion, '10'): master_processes.append(ProcessType.LogicalReplicationLauncher) @@ -338,7 +344,7 @@ def LOCAL__check_auxiliary_pids__multiple_attempts( absenceList )) - with __class__.helper__get_node(os_ops).init().start() as master: + with __class__.helper__get_node(node_svc).init().start() as master: # master node doesn't have a source walsender! with pytest.raises(expected_exception=testgres_TestgresException): @@ -380,10 +386,10 @@ def test_exceptions(self): str(ExecUtilException('msg', 'cmd', 1, 'out')) str(QueryException('msg', 'query')) - def test_auto_name(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_auto_name(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(os_ops).init(allow_streaming=True).start() as m: + with __class__.helper__get_node(node_svc).init(allow_streaming=True).start() as m: with m.replicate().start() as r: # check that nodes are running assert (m.status()) @@ -417,9 +423,9 @@ def test_file_tail(self): lines = file_tail(f, 1) assert (lines[0] == s3) - def test_isolation_levels(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops).init().start() as node: + def test_isolation_levels(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc).init().start() as node: with node.connect() as con: # string levels con.begin('Read Uncommitted').commit() @@ -437,17 +443,17 @@ def test_isolation_levels(self, os_ops: OsOperations): with pytest.raises(expected_exception=QueryException): con.begin('Garbage').commit() - def test_users(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops).init().start() as node: + def test_users(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc).init().start() as node: node.psql('create role test_user login') value = node.safe_psql('select 1', username='test_user') value = __class__.helper__rm_carriage_returns(value) assert (value == b'1\n') - def test_poll_query_until(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_poll_query_until(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init().start() get_time = 'select extract(epoch from now())' @@ -507,8 +513,8 @@ def test_poll_query_until(self, os_ops: OsOperations): # check 1 arg, ok node.poll_query_until('select true') - def test_logging(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_logging(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) C_MAX_ATTEMPTS = 50 # This name is used for testgres logging, too. C_NODE_NAME = "testgres_tests." + __class__.__name__ + "test_logging-master-" + uuid.uuid4().hex @@ -529,7 +535,7 @@ def test_logging(self, os_ops: OsOperations): logger.addHandler(handler) with scoped_config(use_python_logging=True): - with __class__.helper__get_node(os_ops, name=C_NODE_NAME) as master: + with __class__.helper__get_node(node_svc, name=C_NODE_NAME) as master: logging.info("Master node is initilizing") master.init() @@ -599,9 +605,9 @@ def LOCAL__test_lines(): # GO HOME! return - def test_psql(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops).init().start() as node: + def test_psql(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc).init().start() as node: # check returned values (1 arg) res = node.psql('select 1') @@ -636,17 +642,20 @@ def test_psql(self, os_ops: OsOperations): # check psql's default args, fails with pytest.raises(expected_exception=QueryException): - node.psql() + r = node.psql() # raises! + logging.error("node.psql returns [{}]".format(r)) node.stop() # check psql on stopped node, fails with pytest.raises(expected_exception=QueryException): - node.safe_psql('select 1') + # [2025-04-03] This call does not raise exception! I do not know why. + r = node.safe_psql('select 1') # raises! + logging.error("node.safe_psql returns [{}]".format(r)) - def test_safe_psql__expect_error(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops).init().start() as node: + def test_safe_psql__expect_error(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc).init().start() as node: err = node.safe_psql('select_or_not_select 1', expect_error=True) assert (type(err) == str) # noqa: E721 assert ('select_or_not_select' in err) @@ -663,9 +672,9 @@ def test_safe_psql__expect_error(self, os_ops: OsOperations): res = node.safe_psql("select 1;", expect_error=False) assert (__class__.helper__rm_carriage_returns(res) == b'1\n') - def test_transactions(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops).init().start() as node: + def test_transactions(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc).init().start() as node: with node.connect() as con: con.begin() @@ -688,9 +697,9 @@ def test_transactions(self, os_ops: OsOperations): con.execute('drop table test') con.commit() - def test_control_data(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_control_data(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: # node is not initialized yet with pytest.raises(expected_exception=ExecUtilException): @@ -703,9 +712,9 @@ def test_control_data(self, os_ops: OsOperations): assert data is not None assert (any('pg_control' in s for s in data.keys())) - def test_backup_simple(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as master: + def test_backup_simple(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as master: # enable streaming for backups master.init(allow_streaming=True) @@ -725,9 +734,9 @@ def test_backup_simple(self, os_ops: OsOperations): res = slave.execute('select * from test order by i asc') assert (res == [(1, ), (2, ), (3, ), (4, )]) - def test_backup_multiple(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_backup_multiple(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() with node.backup(xlog_method='fetch') as backup1, \ @@ -739,9 +748,9 @@ def test_backup_multiple(self, os_ops: OsOperations): backup.spawn_primary('node2', destroy=False) as node2: assert (node1.base_dir != node2.base_dir) - def test_backup_exhaust(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_backup_exhaust(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() with node.backup(xlog_method='fetch') as backup: @@ -753,9 +762,9 @@ def test_backup_exhaust(self, os_ops: OsOperations): with pytest.raises(expected_exception=BackupException): backup.spawn_primary() - def test_backup_wrong_xlog_method(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_backup_wrong_xlog_method(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() with pytest.raises( @@ -764,11 +773,11 @@ def test_backup_wrong_xlog_method(self, os_ops: OsOperations): ): node.backup(xlog_method='wrong') - def test_pg_ctl_wait_option(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_pg_ctl_wait_option(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) C_MAX_ATTEMPTS = 50 - node = __class__.helper__get_node(os_ops) + node = __class__.helper__get_node(node_svc) assert node.status() == NodeStatus.Uninitialized node.init() assert node.status() == NodeStatus.Stopped @@ -835,9 +844,9 @@ def test_pg_ctl_wait_option(self, os_ops: OsOperations): logging.info("OK. Node is stopped.") node.cleanup() - def test_replicate(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_replicate(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() with node.replicate().start() as replica: @@ -851,14 +860,14 @@ def test_replicate(self, os_ops: OsOperations): res = node.execute('select * from test') assert (res == []) - def test_synchronous_replication(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_synchronous_replication(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - current_version = get_pg_version2(os_ops) + current_version = get_pg_version2(node_svc.os_ops) __class__.helper__skip_test_if_pg_version_is_not_ge(current_version, "9.6") - with __class__.helper__get_node(os_ops) as master: + with __class__.helper__get_node(node_svc) as master: old_version = not __class__.helper__pg_version_ge(current_version, '9.6') master.init(allow_streaming=True).start() @@ -897,14 +906,14 @@ def test_synchronous_replication(self, os_ops: OsOperations): res = standby1.safe_psql('select count(*) from abc') assert (__class__.helper__rm_carriage_returns(res) == b'1000000\n') - def test_logical_replication(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_logical_replication(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - current_version = get_pg_version2(os_ops) + current_version = get_pg_version2(node_svc.os_ops) __class__.helper__skip_test_if_pg_version_is_not_ge(current_version, "10") - with __class__.helper__get_node(os_ops) as node1, __class__.helper__get_node(os_ops) as node2: + with __class__.helper__get_node(node_svc) as node1, __class__.helper__get_node(node_svc) as node2: node1.init(allow_logical=True) node1.start() node2.init().start() @@ -971,15 +980,15 @@ def test_logical_replication(self, os_ops: OsOperations): res = node2.execute('select * from test2') assert (res == [('a', ), ('b', )]) - def test_logical_catchup(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_logical_catchup(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) """ Runs catchup for 100 times to be sure that it is consistent """ - current_version = get_pg_version2(os_ops) + current_version = get_pg_version2(node_svc.os_ops) __class__.helper__skip_test_if_pg_version_is_not_ge(current_version, "10") - with __class__.helper__get_node(os_ops) as node1, __class__.helper__get_node(os_ops) as node2: + with __class__.helper__get_node(node_svc) as node1, __class__.helper__get_node(node_svc) as node2: node1.init(allow_logical=True) node1.start() node2.init().start() @@ -999,20 +1008,20 @@ def test_logical_catchup(self, os_ops: OsOperations): assert (res == [(i, i, )]) node1.execute('delete from test') - def test_logical_replication_fail(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_logical_replication_fail(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) - current_version = get_pg_version2(os_ops) + current_version = get_pg_version2(node_svc.os_ops) __class__.helper__skip_test_if_pg_version_is_ge(current_version, "10") - with __class__.helper__get_node(os_ops) as node: + with __class__.helper__get_node(node_svc) as node: with pytest.raises(expected_exception=InitNodeException): node.init(allow_logical=True) - def test_replication_slots(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_replication_slots(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() with node.replicate(slot='slot1').start() as replica: @@ -1022,18 +1031,18 @@ def test_replication_slots(self, os_ops: OsOperations): with pytest.raises(expected_exception=testgres_TestgresException): node.replicate(slot='slot1') - def test_incorrect_catchup(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as node: + def test_incorrect_catchup(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as node: node.init(allow_streaming=True).start() # node has no master, can't catch up with pytest.raises(expected_exception=testgres_TestgresException): node.catchup() - def test_promotion(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) - with __class__.helper__get_node(os_ops) as master: + def test_promotion(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + with __class__.helper__get_node(node_svc) as master: master.init().start() master.safe_psql('create table abc(id serial)') @@ -1046,17 +1055,17 @@ def test_promotion(self, os_ops: OsOperations): res = replica.safe_psql('select * from abc') assert (__class__.helper__rm_carriage_returns(res) == b'1\n') - def test_dump(self, os_ops: OsOperations): - assert isinstance(os_ops, OsOperations) + def test_dump(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) query_create = 'create table test as select generate_series(1, 2) as val' query_select = 'select * from test order by val asc' - with __class__.helper__get_node(os_ops).init().start() as node1: + with __class__.helper__get_node(node_svc).init().start() as node1: node1.execute(query_create) for format in ['plain', 'custom', 'directory', 'tar']: - with removing(os_ops, node1.dump(format=format)) as dump: - with __class__.helper__get_node(os_ops).init().start() as node3: + with removing(node_svc.os_ops, node1.dump(format=format)) as dump: + with __class__.helper__get_node(node_svc).init().start() as node3: if format == 'directory': assert (os.path.isdir(dump)) else: @@ -1066,14 +1075,16 @@ def test_dump(self, os_ops: OsOperations): res = node3.execute(query_select) assert (res == [(1, ), (2, )]) - def test_get_pg_config2(self, os_ops: OsOperations): + def test_get_pg_config2(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + # check same instances - a = get_pg_config2(os_ops, None) - b = get_pg_config2(os_ops, None) + a = get_pg_config2(node_svc.os_ops, None) + b = get_pg_config2(node_svc.os_ops, None) assert (id(a) == id(b)) # save right before config change - c1 = get_pg_config2(os_ops, None) + c1 = get_pg_config2(node_svc.os_ops, None) # modify setting for this scope with scoped_config(cache_pg_config=False) as config: @@ -1081,20 +1092,315 @@ def test_get_pg_config2(self, os_ops: OsOperations): assert not (config.cache_pg_config) # save right after config change - c2 = get_pg_config2(os_ops, None) + c2 = get_pg_config2(node_svc.os_ops, None) # check different instances after config change assert (id(c1) != id(c2)) # check different instances - a = get_pg_config2(os_ops, None) - b = get_pg_config2(os_ops, None) + a = get_pg_config2(node_svc.os_ops, None) + b = get_pg_config2(node_svc.os_ops, None) assert (id(a) != id(b)) + def test_pgbench(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + __class__.helper__skip_test_if_util_not_exist(node_svc.os_ops, "pgbench") + + with __class__.helper__get_node(node_svc).init().start() as node: + # initialize pgbench DB and run benchmarks + node.pgbench_init( + scale=2, + foreign_keys=True, + options=['-q'] + ).pgbench_run(time=2) + + # run TPC-B benchmark + proc = node.pgbench(stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + options=['-T3']) + out = proc.communicate()[0] + assert (b'tps = ' in out) + + def test_unix_sockets(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + node.init(unix_sockets=False, allow_streaming=True) + node.start() + + res_exec = node.execute('select 1') + assert (res_exec == [(1,)]) + res_psql = node.safe_psql('select 1') + assert (res_psql == b'1\n') + + with node.replicate() as r: + assert type(r) == PostgresNode # noqa: E721 + r.start() + res_exec = r.execute('select 1') + assert (res_exec == [(1,)]) + res_psql = r.safe_psql('select 1') + assert (res_psql == b'1\n') + + def test_the_same_port(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + node.init().start() + assert (node._should_free_port) + assert (type(node.port) == int) # noqa: E721 + node_port_copy = node.port + r = node.safe_psql("SELECT 1;") + assert (__class__.helper__rm_carriage_returns(r) == b'1\n') + + with __class__.helper__get_node(node_svc, port=node.port) as node2: + assert (type(node2.port) == int) # noqa: E721 + assert (node2.port == node.port) + assert not (node2._should_free_port) + + with pytest.raises( + expected_exception=StartNodeException, + match=re.escape("Cannot start node") + ): + node2.init().start() + + # node is still working + assert (node.port == node_port_copy) + assert (node._should_free_port) + r = node.safe_psql("SELECT 3;") + assert (__class__.helper__rm_carriage_returns(r) == b'3\n') + + class tagPortManagerProxy(PortManager): + m_PrevPortManager: PortManager + + m_DummyPortNumber: int + m_DummyPortMaxUsage: int + + m_DummyPortCurrentUsage: int + m_DummyPortTotalUsage: int + + def __init__(self, prevPortManager: PortManager, dummyPortNumber: int, dummyPortMaxUsage: int): + assert isinstance(prevPortManager, PortManager) + assert type(dummyPortNumber) == int # noqa: E721 + assert type(dummyPortMaxUsage) == int # noqa: E721 + assert dummyPortNumber >= 0 + assert dummyPortMaxUsage >= 0 + + super().__init__() + + self.m_PrevPortManager = prevPortManager + + self.m_DummyPortNumber = dummyPortNumber + self.m_DummyPortMaxUsage = dummyPortMaxUsage + + self.m_DummyPortCurrentUsage = 0 + self.m_DummyPortTotalUsage = 0 + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + assert self.m_DummyPortCurrentUsage == 0 + + assert self.m_PrevPortManager is not None + + def reserve_port(self) -> int: + assert type(self.m_DummyPortMaxUsage) == int # noqa: E721 + assert type(self.m_DummyPortTotalUsage) == int # noqa: E721 + assert type(self.m_DummyPortCurrentUsage) == int # noqa: E721 + assert self.m_DummyPortTotalUsage >= 0 + assert self.m_DummyPortCurrentUsage >= 0 + + assert self.m_DummyPortTotalUsage <= self.m_DummyPortMaxUsage + assert self.m_DummyPortCurrentUsage <= self.m_DummyPortTotalUsage + + assert self.m_PrevPortManager is not None + assert isinstance(self.m_PrevPortManager, PortManager) + + if self.m_DummyPortTotalUsage == self.m_DummyPortMaxUsage: + return self.m_PrevPortManager.reserve_port() + + self.m_DummyPortTotalUsage += 1 + self.m_DummyPortCurrentUsage += 1 + return self.m_DummyPortNumber + + def release_port(self, dummyPortNumber: int): + assert type(dummyPortNumber) == int # noqa: E721 + + assert type(self.m_DummyPortMaxUsage) == int # noqa: E721 + assert type(self.m_DummyPortTotalUsage) == int # noqa: E721 + assert type(self.m_DummyPortCurrentUsage) == int # noqa: E721 + assert self.m_DummyPortTotalUsage >= 0 + assert self.m_DummyPortCurrentUsage >= 0 + + assert self.m_DummyPortTotalUsage <= self.m_DummyPortMaxUsage + assert self.m_DummyPortCurrentUsage <= self.m_DummyPortTotalUsage + + assert self.m_PrevPortManager is not None + assert isinstance(self.m_PrevPortManager, PortManager) + + if self.m_DummyPortCurrentUsage > 0 and dummyPortNumber == self.m_DummyPortNumber: + assert self.m_DummyPortTotalUsage > 0 + self.m_DummyPortCurrentUsage -= 1 + return + + return self.m_PrevPortManager.release_port(dummyPortNumber) + + def test_port_rereserve_during_node_start(self, node_svc: PostgresNodeService): + assert type(node_svc) == PostgresNodeService # noqa: E721 + assert PostgresNode._C_MAX_START_ATEMPTS == 5 + + C_COUNT_OF_BAD_PORT_USAGE = 3 + + with __class__.helper__get_node(node_svc) as node1: + node1.init().start() + assert node1._should_free_port + assert type(node1.port) == int # noqa: E721 + node1_port_copy = node1.port + assert __class__.helper__rm_carriage_returns(node1.safe_psql("SELECT 1;")) == b'1\n' + + with __class__.tagPortManagerProxy(node_svc.port_manager, node1.port, C_COUNT_OF_BAD_PORT_USAGE) as proxy: + assert proxy.m_DummyPortNumber == node1.port + with __class__.helper__get_node(node_svc, port_manager=proxy) as node2: + assert node2._should_free_port + assert node2.port == node1.port + + node2.init().start() + + assert node2.port != node1.port + assert node2._should_free_port + assert proxy.m_DummyPortCurrentUsage == 0 + assert proxy.m_DummyPortTotalUsage == C_COUNT_OF_BAD_PORT_USAGE + assert node2.is_started + r = node2.safe_psql("SELECT 2;") + assert __class__.helper__rm_carriage_returns(r) == b'2\n' + + # node1 is still working + assert node1.port == node1_port_copy + assert node1._should_free_port + r = node1.safe_psql("SELECT 3;") + assert __class__.helper__rm_carriage_returns(r) == b'3\n' + + def test_port_conflict(self, node_svc: PostgresNodeService): + assert type(node_svc) == PostgresNodeService # noqa: E721 + assert PostgresNode._C_MAX_START_ATEMPTS > 1 + + C_COUNT_OF_BAD_PORT_USAGE = PostgresNode._C_MAX_START_ATEMPTS + + with __class__.helper__get_node(node_svc) as node1: + node1.init().start() + assert node1._should_free_port + assert type(node1.port) == int # noqa: E721 + node1_port_copy = node1.port + assert __class__.helper__rm_carriage_returns(node1.safe_psql("SELECT 1;")) == b'1\n' + + with __class__.tagPortManagerProxy(node_svc.port_manager, node1.port, C_COUNT_OF_BAD_PORT_USAGE) as proxy: + assert proxy.m_DummyPortNumber == node1.port + with __class__.helper__get_node(node_svc, port_manager=proxy) as node2: + assert node2._should_free_port + assert node2.port == node1.port + + with pytest.raises( + expected_exception=StartNodeException, + match=re.escape("Cannot start node after multiple attempts.") + ): + node2.init().start() + + assert node2.port == node1.port + assert node2._should_free_port + assert proxy.m_DummyPortCurrentUsage == 1 + assert proxy.m_DummyPortTotalUsage == C_COUNT_OF_BAD_PORT_USAGE + assert not node2.is_started + + # node2 must release our dummyPort (node1.port) + assert (proxy.m_DummyPortCurrentUsage == 0) + + # node1 is still working + assert node1.port == node1_port_copy + assert node1._should_free_port + r = node1.safe_psql("SELECT 3;") + assert __class__.helper__rm_carriage_returns(r) == b'3\n' + + def test_try_to_get_port_after_free_manual_port(self, node_svc: PostgresNodeService): + assert type(node_svc) == PostgresNodeService # noqa: E721 + + assert node_svc.port_manager is not None + assert isinstance(node_svc.port_manager, PortManager) + + with __class__.helper__get_node(node_svc) as node1: + assert node1 is not None + assert type(node1) == PostgresNode # noqa: E721 + assert node1.port is not None + assert type(node1.port) == int # noqa: E721 + with __class__.helper__get_node(node_svc, port=node1.port, port_manager=None) as node2: + assert node2 is not None + assert type(node1) == PostgresNode # noqa: E721 + assert node2 is not node1 + assert node2.port is not None + assert type(node2.port) == int # noqa: E721 + assert node2.port == node1.port + + logging.info("Release node2 port") + node2.free_port() + + logging.info("try to get node2.port...") + with pytest.raises( + InvalidOperationException, + match="^" + re.escape("PostgresNode port is not defined.") + "$" + ): + p = node2.port + assert p is None + + def test_try_to_start_node_after_free_manual_port(self, node_svc: PostgresNodeService): + assert type(node_svc) == PostgresNodeService # noqa: E721 + + assert node_svc.port_manager is not None + assert isinstance(node_svc.port_manager, PortManager) + + with __class__.helper__get_node(node_svc) as node1: + assert node1 is not None + assert type(node1) == PostgresNode # noqa: E721 + assert node1.port is not None + assert type(node1.port) == int # noqa: E721 + with __class__.helper__get_node(node_svc, port=node1.port, port_manager=None) as node2: + assert node2 is not None + assert type(node1) == PostgresNode # noqa: E721 + assert node2 is not node1 + assert node2.port is not None + assert type(node2.port) == int # noqa: E721 + assert node2.port == node1.port + + logging.info("Release node2 port") + node2.free_port() + + logging.info("node2 is trying to start...") + with pytest.raises( + InvalidOperationException, + match="^" + re.escape("Can't start PostgresNode. Port is not defined.") + "$" + ): + node2.start() + @staticmethod - def helper__get_node(os_ops: OsOperations, name=None): - assert isinstance(os_ops, OsOperations) - return PostgresNode(name, conn_params=None, os_ops=os_ops) + def helper__get_node( + node_svc: PostgresNodeService, + name: typing.Optional[str] = None, + port: typing.Optional[int] = None, + port_manager: typing.Optional[PortManager] = None + ) -> PostgresNode: + assert isinstance(node_svc, PostgresNodeService) + assert isinstance(node_svc.os_ops, OsOperations) + assert isinstance(node_svc.port_manager, PortManager) + + if port_manager is None: + port_manager = node_svc.port_manager + + return PostgresNode( + name, + port=port, + conn_params=None, + os_ops=node_svc.os_ops, + port_manager=port_manager if port is None else None + ) @staticmethod def helper__skip_test_if_pg_version_is_not_ge(ver1: str, ver2: str): diff --git a/tests/test_testgres_local.py b/tests/test_testgres_local.py index 01f975a0..bef80d0f 100644 --- a/tests/test_testgres_local.py +++ b/tests/test_testgres_local.py @@ -100,27 +100,6 @@ def test_custom_init(self): # there should be no trust entries at all assert not (any('trust' in s for s in lines)) - def test_pgbench(self): - __class__.helper__skip_test_if_util_not_exist("pgbench") - - with get_new_node().init().start() as node: - - # initialize pgbench DB and run benchmarks - node.pgbench_init(scale=2, foreign_keys=True, - options=['-q']).pgbench_run(time=2) - - # run TPC-B benchmark - proc = node.pgbench(stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - options=['-T3']) - - out, _ = proc.communicate() - out = out.decode('utf-8') - - proc.stdout.close() - - assert ('tps' in out) - def test_pg_config(self): # check same instances a = get_pg_config() @@ -177,18 +156,6 @@ def test_config_stack(self): assert (TestgresConfig.cached_initdb_dir == d0) - def test_unix_sockets(self): - with get_new_node() as node: - node.init(unix_sockets=False, allow_streaming=True) - node.start() - - node.execute('select 1') - node.safe_psql('select 1') - - with node.replicate().start() as r: - r.execute('select 1') - r.safe_psql('select 1') - def test_ports_management(self): assert bound_ports is not None assert type(bound_ports) == set # noqa: E721 @@ -277,30 +244,6 @@ def test_parse_pg_version(self): # Macos assert parse_pg_version("postgres (PostgreSQL) 14.9 (Homebrew)") == "14.9" - def test_the_same_port(self): - with get_new_node() as node: - node.init().start() - assert (node._should_free_port) - assert (type(node.port) == int) # noqa: E721 - node_port_copy = node.port - assert (rm_carriage_returns(node.safe_psql("SELECT 1;")) == b'1\n') - - with get_new_node(port=node.port) as node2: - assert (type(node2.port) == int) # noqa: E721 - assert (node2.port == node.port) - assert not (node2._should_free_port) - - with pytest.raises( - expected_exception=StartNodeException, - match=re.escape("Cannot start node") - ): - node2.init().start() - - # node is still working - assert (node.port == node_port_copy) - assert (node._should_free_port) - assert (rm_carriage_returns(node.safe_psql("SELECT 3;")) == b'3\n') - class tagPortManagerProxy: sm_prev_testgres_reserve_port = None sm_prev_testgres_release_port = None diff --git a/tests/test_testgres_remote.py b/tests/test_testgres_remote.py index 2142e5ba..ef4bd0c8 100755 --- a/tests/test_testgres_remote.py +++ b/tests/test_testgres_remote.py @@ -1,14 +1,12 @@ # coding: utf-8 import os import re -import subprocess import pytest -import psutil import logging -from .helpers.os_ops_descrs import OsOpsDescrs -from .helpers.os_ops_descrs import OsOperations +from .helpers.global_data import PostgresNodeService +from .helpers.global_data import PostgresNodeServices from .. import testgres @@ -27,8 +25,6 @@ get_pg_config # NOTE: those are ugly imports -from ..testgres import bound_ports -from ..testgres.node import ProcessProxy def util_exists(util): @@ -48,17 +44,17 @@ def good_properties(f): class TestTestgresRemote: - sm_os_ops = OsOpsDescrs.sm_remote_os_ops - @pytest.fixture(autouse=True, scope="class") def implicit_fixture(self): + cur_os_ops = PostgresNodeServices.sm_remote.os_ops + assert cur_os_ops is not None + prev_ops = testgres_config.os_ops assert prev_ops is not None - assert __class__.sm_os_ops is not None - testgres_config.set_os_ops(os_ops=__class__.sm_os_ops) - assert testgres_config.os_ops is __class__.sm_os_ops + testgres_config.set_os_ops(os_ops=cur_os_ops) + assert testgres_config.os_ops is cur_os_ops yield - assert testgres_config.os_ops is __class__.sm_os_ops + assert testgres_config.os_ops is cur_os_ops testgres_config.set_os_ops(os_ops=prev_ops) assert testgres_config.os_ops is prev_ops @@ -172,21 +168,6 @@ def test_init__unk_LANG_and_LC_CTYPE(self): __class__.helper__restore_envvar("LC_CTYPE", prev_LC_CTYPE) __class__.helper__restore_envvar("LC_COLLATE", prev_LC_COLLATE) - def test_pgbench(self): - __class__.helper__skip_test_if_util_not_exist("pgbench") - - with __class__.helper__get_node().init().start() as node: - # initialize pgbench DB and run benchmarks - node.pgbench_init(scale=2, foreign_keys=True, - options=['-q']).pgbench_run(time=2) - - # run TPC-B benchmark - proc = node.pgbench(stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - options=['-T3']) - out = proc.communicate()[0] - assert (b'tps = ' in out) - def test_pg_config(self): # check same instances a = get_pg_config() @@ -243,90 +224,19 @@ def test_config_stack(self): assert (TestgresConfig.cached_initdb_dir == d0) - def test_unix_sockets(self): - with __class__.helper__get_node() as node: - node.init(unix_sockets=False, allow_streaming=True) - node.start() - - res_exec = node.execute('select 1') - res_psql = node.safe_psql('select 1') - assert (res_exec == [(1,)]) - assert (res_psql == b'1\n') - - with node.replicate().start() as r: - res_exec = r.execute('select 1') - res_psql = r.safe_psql('select 1') - assert (res_exec == [(1,)]) - assert (res_psql == b'1\n') - - def test_ports_management(self): - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - - if len(bound_ports) != 0: - logging.warning("bound_ports is not empty: {0}".format(bound_ports)) - - stage0__bound_ports = bound_ports.copy() - - with __class__.helper__get_node() as node: - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - - assert node.port is not None - assert type(node.port) == int # noqa: E721 - - logging.info("node port is {0}".format(node.port)) - - assert node.port in bound_ports - assert node.port not in stage0__bound_ports - - assert stage0__bound_ports <= bound_ports - assert len(stage0__bound_ports) + 1 == len(bound_ports) - - stage1__bound_ports = stage0__bound_ports.copy() - stage1__bound_ports.add(node.port) - - assert stage1__bound_ports == bound_ports - - # check that port has been freed successfully - assert bound_ports is not None - assert type(bound_ports) == set # noqa: E721 - assert bound_ports == stage0__bound_ports - - # TODO: Why does not this test work with remote host? - def test_child_process_dies(self): - nAttempt = 0 - - while True: - if nAttempt == 5: - raise Exception("Max attempt number is exceed.") - - nAttempt += 1 - - logging.info("Attempt #{0}".format(nAttempt)) - - # test for FileNotFound exception during child_processes() function - with subprocess.Popen(["sleep", "60"]) as process: - r = process.poll() - - if r is not None: - logging.warning("process.pool() returns an unexpected result: {0}.".format(r)) - continue - - assert r is None - # collect list of processes currently running - children = psutil.Process(os.getpid()).children() - # kill a process, so received children dictionary becomes invalid - process.kill() - process.wait() - # try to handle children list -- missing processes will have ptype "ProcessType.Unknown" - [ProcessProxy(p) for p in children] - break - @staticmethod def helper__get_node(name=None): - assert isinstance(__class__.sm_os_ops, OsOperations) - return testgres.PostgresNode(name, conn_params=None, os_ops=__class__.sm_os_ops) + svc = PostgresNodeServices.sm_remote + + assert isinstance(svc, PostgresNodeService) + assert isinstance(svc.os_ops, testgres.OsOperations) + assert isinstance(svc.port_manager, testgres.PortManager) + + return testgres.PostgresNode( + name, + conn_params=None, + os_ops=svc.os_ops, + port_manager=svc.port_manager) @staticmethod def helper__restore_envvar(name, prev_value):