diff --git a/core/testcontainers/compose/compose.py b/core/testcontainers/compose/compose.py index 35ca5b335..2a5b9b571 100644 --- a/core/testcontainers/compose/compose.py +++ b/core/testcontainers/compose/compose.py @@ -5,18 +5,19 @@ from os import PathLike from platform import system from re import split -from subprocess import CompletedProcess +from subprocess import CalledProcessError, CompletedProcess from subprocess import run as subprocess_run from typing import Any, Callable, Literal, Optional, TypeVar, Union, cast -from urllib.error import HTTPError, URLError -from urllib.request import urlopen from testcontainers.core.exceptions import ContainerIsNotRunning, NoSuchPortExposed -from testcontainers.core.waiting_utils import wait_container_is_ready +from testcontainers.core.utils import setup_logger +from testcontainers.core.waiting_utils import WaitStrategy _IPT = TypeVar("_IPT") _WARNINGS = {"DOCKER_COMPOSE_GET_CONFIG": "get_config is experimental, see testcontainers/testcontainers-python#669"} +logger = setup_logger(__name__) + def _ignore_properties(cls: type[_IPT], dict_: any) -> _IPT: """omits extra fields like @JsonIgnoreProperties(ignoreUnknown = true) @@ -77,6 +78,7 @@ class ComposeContainer: Health: Optional[str] = None ExitCode: Optional[str] = None Publishers: list[PublishedPort] = field(default_factory=list) + _docker_compose: Optional["DockerCompose"] = field(default=None, init=False, repr=False) def __post_init__(self): if self.Publishers: @@ -112,6 +114,41 @@ def get_publisher( def _matches_protocol(prefer_ip_version, r): return (":" in r.URL) is (prefer_ip_version == "IPv6") + # WaitStrategy compatibility methods + def get_container_host_ip(self) -> str: + """Get the host IP for the container.""" + # Simplified implementation - wait strategies don't use this yet + return "127.0.0.1" + + def get_exposed_port(self, port: int) -> int: + """Get the exposed port mapping for the given internal port.""" + # Simplified implementation - wait strategies don't use this yet + return port + + def get_logs(self) -> tuple[bytes, bytes]: + """Get container logs.""" + if not self._docker_compose: + raise RuntimeError("DockerCompose reference not set on ComposeContainer") + if not self.Service: + raise RuntimeError("Service name not set on ComposeContainer") + stdout, stderr = self._docker_compose.get_logs(self.Service) + return stdout.encode(), stderr.encode() + + def get_wrapped_container(self) -> "ComposeContainer": + """Get the underlying container object for compatibility.""" + return self + + def reload(self) -> None: + """Reload container information for compatibility with wait strategies.""" + # ComposeContainer doesn't need explicit reloading as it's fetched fresh + # each time through get_container(), but we need this method for compatibility + pass + + @property + def status(self) -> str: + """Get container status for compatibility with wait strategies.""" + return self.State or "unknown" + @dataclass class DockerCompose: @@ -174,6 +211,7 @@ class DockerCompose: services: Optional[list[str]] = None docker_command_path: Optional[str] = None profiles: Optional[list[str]] = None + _wait_strategies: Optional[dict[str, Any]] = field(default=None, init=False, repr=False) def __post_init__(self): if isinstance(self.compose_file_name, str): @@ -207,6 +245,16 @@ def compose_command_property(self) -> list[str]: docker_compose_cmd += ["--env-file", self.env_file] return docker_compose_cmd + def waiting_for(self, strategies: dict[str, WaitStrategy]) -> "DockerCompose": + """ + Set wait strategies for specific services. + + Args: + strategies: Dictionary mapping service names to wait strategies + """ + self._wait_strategies = strategies + return self + def start(self) -> None: """ Starts the docker compose environment. @@ -235,6 +283,11 @@ def start(self) -> None: self._run_command(cmd=up_cmd) + if self._wait_strategies: + for service, strategy in self._wait_strategies.items(): + container = self.get_container(service_name=service) + strategy.wait_until_ready(container) + def stop(self, down=True) -> None: """ Stops the docker compose environment. @@ -322,6 +375,10 @@ def get_containers(self, include_all=False) -> list[ComposeContainer]: else: containers.append(_ignore_properties(ComposeContainer, data)) + # Set the docker_compose reference on each container + for container in containers: + container._docker_compose = self + return containers def get_container( @@ -369,7 +426,13 @@ def exec_in_container( exit_code: The command's exit code. """ if not service_name: - service_name = self.get_container().Service + containers = self.get_containers() + if len(containers) != 1: + raise ContainerIsNotRunning( + f"exec_in_container failed because no service_name given " + f"and there is not exactly 1 container (but {len(containers)})" + ) + service_name = containers[0].Service exec_cmd = [*self.compose_command_property, "exec", "-T", service_name, *command] result = self._run_command(cmd=exec_cmd) @@ -381,12 +444,18 @@ def _run_command( context: Optional[str] = None, ) -> CompletedProcess[bytes]: context = context or self.context - return subprocess_run( - cmd, - capture_output=True, - check=True, - cwd=context, - ) + try: + return subprocess_run( + cmd, + capture_output=True, + check=True, + cwd=context, + ) + except CalledProcessError as e: + logger.error(f"Command '{e.cmd}' failed with exit code {e.returncode}") + logger.error(f"STDOUT:\n{e.stdout.decode(errors='ignore')}") + logger.error(f"STDERR:\n{e.stderr.decode(errors='ignore')}") + raise e from e def get_service_port( self, @@ -440,16 +509,52 @@ def get_service_host_and_port( publisher = self.get_container(service_name).get_publisher(by_port=port).normalize() return publisher.URL, publisher.PublishedPort - @wait_container_is_ready(HTTPError, URLError) def wait_for(self, url: str) -> "DockerCompose": """ Waits for a response from a given URL. This is typically used to block until a service in the environment has started and is responding. Note that it does not assert any sort of return code, only check that the connection was successful. + This is a convenience method that internally uses HttpWaitStrategy. For more complex + wait scenarios, consider using the structured wait strategies with `waiting_for()`. + Args: url: URL from one of the services in the environment to use to wait on. + + Example: + # Simple URL wait (legacy style) + compose.wait_for("http://localhost:8080") + + # For more complex scenarios, use structured wait strategies: + from testcontainers.core.waiting_utils import HttpWaitStrategy, LogMessageWaitStrategy + + compose.waiting_for({ + "web": HttpWaitStrategy(8080).for_status_code(200), + "db": LogMessageWaitStrategy("database system is ready to accept connections") + }) """ - with urlopen(url) as response: - response.read() + import time + from urllib.error import HTTPError, URLError + from urllib.request import Request, urlopen + + # For simple URL waiting when we have multiple containers, + # we'll do a direct HTTP check instead of using the container-based strategy + start_time = time.time() + timeout = 120 # Default timeout + + while True: + if time.time() - start_time > timeout: + raise TimeoutError(f"URL {url} not ready within {timeout} seconds") + + try: + request = Request(url, method="GET") + with urlopen(request, timeout=1) as response: + if 200 <= response.status < 400: + return self + except (URLError, HTTPError, ConnectionResetError, ConnectionRefusedError, BrokenPipeError, OSError): + # Any connection error means we should keep waiting + pass + + time.sleep(1) + return self diff --git a/core/testcontainers/core/container.py b/core/testcontainers/core/container.py index b7979a613..a4589937f 100644 --- a/core/testcontainers/core/container.py +++ b/core/testcontainers/core/container.py @@ -16,7 +16,8 @@ from testcontainers.core.labels import LABEL_SESSION_ID, SESSION_ID from testcontainers.core.network import Network from testcontainers.core.utils import is_arm, setup_logger -from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs +from testcontainers.core.wait_strategies import LogMessageWaitStrategy +from testcontainers.core.waiting_utils import WaitStrategy if TYPE_CHECKING: from docker.models.containers import Container @@ -90,6 +91,7 @@ def __init__( self.with_network_aliases(*network_aliases) self._kwargs = kwargs + self._wait_strategy: Optional[WaitStrategy] = None def with_env(self, key: str, value: str) -> Self: self.env[key] = value @@ -154,6 +156,11 @@ def maybe_emulate_amd64(self) -> Self: return self.with_kwargs(platform="linux/amd64") return self + def waiting_for(self, strategy: WaitStrategy) -> "DockerContainer": + """Set a wait strategy to be used after container start.""" + self._wait_strategy = strategy + return self + def start(self) -> Self: if not c.ryuk_disabled and self.image != c.ryuk_image: logger.debug("Creating Ryuk container") @@ -186,6 +193,9 @@ def start(self) -> Self: ) logger.info("Container started: %s", self._container.short_id) + + if self._wait_strategy is not None: + self._wait_strategy.wait_until_ready(self) return self def stop(self, force=True, delete_volume=True) -> None: @@ -212,7 +222,6 @@ def get_container_host_ip(self) -> str: # ensure that we covered all possible connection_modes assert_never(connection_mode) - @wait_container_is_ready() def get_exposed_port(self, port: int) -> int: if self.get_docker_client().get_connection_mode().use_mapped_port: return self.get_docker_client().port(self._container.id, port) @@ -242,6 +251,18 @@ def get_logs(self) -> tuple[bytes, bytes]: raise ContainerStartException("Container should be started before getting logs") return self._container.logs(stderr=False), self._container.logs(stdout=False) + def reload(self) -> None: + """Reload container information for compatibility with wait strategies.""" + if self._container: + self._container.reload() + + @property + def status(self) -> str: + """Get container status for compatibility with wait strategies.""" + if not self._container: + return "not_started" + return self._container.status + def exec(self, command: Union[str, list[str]]) -> tuple[int, bytes]: if not self._container: raise ContainerStartException("Container should be started before executing a command") @@ -291,7 +312,7 @@ def _create_instance(cls) -> "Reaper": .with_env("RYUK_RECONNECTION_TIMEOUT", c.ryuk_reconnection_timeout) .start() ) - wait_for_logs(Reaper._container, r".* Started!", timeout=20, raise_on_exit=True) + Reaper._container.waiting_for(LogMessageWaitStrategy(r".* Started!").with_startup_timeout(20)) container_host = Reaper._container.get_container_host_ip() container_port = int(Reaper._container.get_exposed_port(8080)) diff --git a/core/testcontainers/core/wait_strategies.py b/core/testcontainers/core/wait_strategies.py new file mode 100644 index 000000000..54f897d53 --- /dev/null +++ b/core/testcontainers/core/wait_strategies.py @@ -0,0 +1,154 @@ +""" +Structured wait strategies for containers. + +- LogMessageWaitStrategy: Wait for specific log messages +- HttpWaitStrategy: Wait for HTTP endpoints to be available +- HealthcheckWaitStrategy: Wait for Docker health checks to pass +- PortWaitStrategy: Wait for TCP ports to be available +- FileExistsWaitStrategy: Wait for files to exist on the filesystem +- CompositeWaitStrategy: Combine multiple wait strategies + +Example: + Basic usage with containers: + + from testcontainers.core.wait_strategies import HttpWaitStrategy, LogMessageWaitStrategy + + # Wait for HTTP endpoint + container.waiting_for(HttpWaitStrategy(8080).for_status_code(200)) + + # Wait for log message + container.waiting_for(LogMessageWaitStrategy("Server started")) + + # Combine multiple strategies + container.waiting_for(CompositeWaitStrategy( + LogMessageWaitStrategy("Database ready"), + HttpWaitStrategy(8080) + )) +""" + +import re +import time +from datetime import timedelta +from typing import Union + +from testcontainers.core.utils import setup_logger + +# Import base classes from waiting_utils to make them available for tests +from .waiting_utils import WaitStrategy + +logger = setup_logger(__name__) + + +class LogMessageWaitStrategy(WaitStrategy): + """ + Wait for a specific message to appear in the container logs. + + This strategy monitors the container's stdout and stderr streams for a specific + message or regex pattern. It can be configured to wait for the message to appear + multiple times or to require the message in both streams. + + Raises error if container exits before message is found. + + Args: + message: The message or regex pattern to search for in the logs + times: Number of times the message must appear (default: 1) + predicate_streams_and: If True, message must appear in both stdout and stderr (default: False) + + Example: + # Wait for a simple message + strategy = LogMessageWaitStrategy("ready for start") + + # Wait for a regex pattern + strategy = LogMessageWaitStrategy(re.compile(r"database.*ready")) + + # Wait for message in both streams + strategy = LogMessageWaitStrategy("ready", predicate_streams_and=True) + """ + + def __init__( + self, message: Union[str, re.Pattern[str]], times: int = 1, predicate_streams_and: bool = False + ) -> None: + super().__init__() + self._message = message if isinstance(message, re.Pattern) else re.compile(message, re.MULTILINE) + self._times = times + self._predicate_streams_and = predicate_streams_and + + def with_startup_timeout(self, timeout: Union[int, timedelta]) -> "LogMessageWaitStrategy": + """Set the maximum time to wait for the container to be ready.""" + if isinstance(timeout, timedelta): + self._startup_timeout = int(timeout.total_seconds()) + else: + self._startup_timeout = timeout + return self + + def with_poll_interval(self, interval: Union[float, timedelta]) -> "LogMessageWaitStrategy": + """Set how frequently to check if the container is ready.""" + if isinstance(interval, timedelta): + self._poll_interval = interval.total_seconds() + else: + self._poll_interval = interval + return self + + def wait_until_ready(self, container) -> None: + """ + Wait until the specified message appears in the container logs. + + Args: + container: The container to monitor + + Raises: + TimeoutError: If the message doesn't appear within the timeout period + RuntimeError: If the container exits before the message appears + """ + from .waiting_utils import _NOT_EXITED_STATUSES, _get_container_logs_for_debugging, _get_container_status_info + + # Implement our own wait logic to avoid recursive calls to wait_for_logs + wrapped = container.get_wrapped_container() + start_time = time.time() + + while True: + duration = time.time() - start_time + if duration > self._startup_timeout: + # Get current logs and status for debugging + stdout_str, stderr_str = _get_container_logs_for_debugging(container) + status_info = _get_container_status_info(container) + + message_pattern = self._message.pattern if hasattr(self._message, "pattern") else str(self._message) + + raise TimeoutError( + f"Container did not emit logs containing '{message_pattern}' within {self._startup_timeout:.3f} seconds. " + f"Container status: {status_info['status']}, health: {status_info['health_status']}. " + f"Recent stdout: {stdout_str}. " + f"Recent stderr: {stderr_str}. " + f"Hint: Check if the container is starting correctly, the expected message is being logged, " + f"and the log pattern matches what the application actually outputs." + ) + + stdout_bytes, stderr_bytes = container.get_logs() + stdout = stdout_bytes.decode() + stderr = stderr_bytes.decode() + + predicate_result = ( + self._message.search(stdout) or self._message.search(stderr) + if self._predicate_streams_and is False + else self._message.search(stdout) and self._message.search(stderr) + ) + + if predicate_result: + return + + # Check if container has exited + wrapped.reload() + if wrapped.status not in _NOT_EXITED_STATUSES: + # Get exit information for better debugging + status_info = _get_container_status_info(container) + + raise RuntimeError( + f"Container exited (status: {status_info['status']}, exit code: {status_info['exit_code']}) " + f"before emitting logs containing '{self._message.pattern if hasattr(self._message, 'pattern') else str(self._message)}'. " + f"Container error: {status_info['error']}. " + f"Hint: Check container logs and ensure the application is configured to start correctly. " + f"The application may be crashing or exiting early." + ) + + time.sleep(self._poll_interval) diff --git a/core/testcontainers/core/waiting_utils.py b/core/testcontainers/core/waiting_utils.py index 36e6a812f..f06d563fd 100644 --- a/core/testcontainers/core/waiting_utils.py +++ b/core/testcontainers/core/waiting_utils.py @@ -11,69 +11,184 @@ # License for the specific language governing permissions and limitations # under the License. - import re import time -import traceback -from typing import TYPE_CHECKING, Any, Callable, Union +import warnings +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Any, Callable, Protocol, TypeVar, Union, cast import wrapt from testcontainers.core.config import testcontainers_config as config from testcontainers.core.utils import setup_logger -if TYPE_CHECKING: - from testcontainers.core.container import DockerContainer - logger = setup_logger(__name__) # Get a tuple of transient exceptions for which we'll retry. Other exceptions will be raised. TRANSIENT_EXCEPTIONS = (TimeoutError, ConnectionError) +# Type variables for generic functions +F = TypeVar("F", bound=Callable[..., Any]) + -def wait_container_is_ready(*transient_exceptions) -> Callable: +class WaitStrategyTarget(Protocol): """ - Wait until container is ready. + Protocol defining the interface that containers must implement for wait strategies. - Function that spawn container should be decorated by this method Max wait is configured by - config. Default is 120 sec. Polling interval is 1 sec. + This allows wait strategies to work with both DockerContainer and ComposeContainer + without requiring inheritance or type ignores. - Args: - *transient_exceptions: Additional transient exceptions that should be retried if raised. Any - non-transient exceptions are fatal, and the exception is re-raised immediately. + Implementation requirement: + - DockerContainer: Implements this protocol (see core/tests/test_protocol_compliance.py) + - ComposeContainer: Implements this protocol (see core/tests/test_protocol_compliance.py) """ - transient_exceptions = TRANSIENT_EXCEPTIONS + tuple(transient_exceptions) - @wrapt.decorator - def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any: - from testcontainers.core.container import DockerContainer + def get_container_host_ip(self) -> str: + """Get the host IP address for the container.""" + ... + + def get_exposed_port(self, port: int) -> int: + """Get the exposed port mapping for the given internal port.""" + ... + + def get_wrapped_container(self) -> Any: + """Get the underlying container object.""" + ... + + def get_logs(self) -> tuple[bytes, bytes]: + """Get container logs as (stdout, stderr) tuple.""" + ... + + def reload(self) -> None: + """Reload container information.""" + ... - if isinstance(instance, DockerContainer): - logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image) + @property + def status(self) -> str: + """Get container status.""" + ... + + +class WaitStrategy(ABC): + """Base class for all wait strategies.""" + + def __init__(self) -> None: + self._startup_timeout: int = config.timeout + self._poll_interval: float = config.sleep_time + + def with_startup_timeout(self, timeout: Union[int, timedelta]) -> "WaitStrategy": + """Set the maximum time to wait for the container to be ready.""" + if isinstance(timeout, timedelta): + self._startup_timeout = int(timeout.total_seconds()) else: - logger.info("Waiting for %s to be ready ...", instance) - - exception = None - for attempt_no in range(config.max_tries): - try: - return wrapped(*args, **kwargs) - except transient_exceptions as e: - logger.debug( - f"Connection attempt '{attempt_no + 1}' of '{config.max_tries + 1}' " - f"failed: {traceback.format_exc()}" - ) - time.sleep(config.sleep_time) - exception = e - raise TimeoutError( - f"Wait time ({config.timeout}s) exceeded for {wrapped.__name__}(args: {args}, kwargs: " - f"{kwargs}). Exception: {exception}" - ) + self._startup_timeout = timeout + return self + + def with_poll_interval(self, interval: Union[float, timedelta]) -> "WaitStrategy": + """Set how frequently to check if the container is ready.""" + if isinstance(interval, timedelta): + self._poll_interval = interval.total_seconds() + else: + self._poll_interval = interval + return self - return wrapper + @abstractmethod + def wait_until_ready(self, container: WaitStrategyTarget) -> None: + """Wait until the container is ready.""" + pass + + +# Keep existing wait_container_is_ready but make it use the new system internally +def wait_container_is_ready(*transient_exceptions: type[Exception]) -> Callable[[F], F]: + """ + Legacy wait decorator that uses the new wait strategy system internally. + Maintains backwards compatibility with existing code. + + This decorator can be used to wait for a function to succeed without raising + transient exceptions. It's useful for simple wait scenarios, but for more + complex cases, consider using structured wait strategies directly. + + Example: + @wait_container_is_ready(HTTPError, URLError) + def check_http(container): + with urlopen("http://localhost:8080") as response: + return response.status == 200 + + # For more complex scenarios, use structured wait strategies: + container.waiting_for(HttpWaitStrategy(8080).for_status_code(200)) + """ + warnings.warn( + "The @wait_container_is_ready decorator is deprecated and will be removed in a future version. " + "Use structured wait strategies instead: " + "container.waiting_for(HttpWaitStrategy(8080).for_status_code(200)) or " + "container.waiting_for(LogMessageWaitStrategy('ready'))", + DeprecationWarning, + stacklevel=2, + ) + + class LegacyWaitStrategy(WaitStrategy): + def __init__(self, func: Callable[..., Any], instance: Any, args: list[Any], kwargs: dict[str, Any]): + super().__init__() + self.func = func + self.instance = instance + self.args = args + self.kwargs = kwargs + self.transient_exceptions: tuple[type[Exception], ...] = TRANSIENT_EXCEPTIONS + tuple(transient_exceptions) + + def wait_until_ready(self, container: WaitStrategyTarget) -> Any: + start_time = time.time() + while True: + try: + # Handle different function call patterns: + # 1. Standalone functions (like wait_for): call with just args/kwargs + # 2. Methods: call with instance as first argument + if self.instance is None: + # Standalone function case + result = self.func(*self.args, **self.kwargs) + elif self.instance is container: + # Staticmethod case: self.instance is the container + result = self.func(*self.args, **self.kwargs) + else: + # Method case: self.instance is the instance (self) + result = self.func(self.instance, *self.args, **self.kwargs) + return result + except self.transient_exceptions as e: + if time.time() - start_time > self._startup_timeout: + raise TimeoutError( + f"Wait time ({self._startup_timeout}s) exceeded for {self.func.__name__}" + f"(args: {self.args}, kwargs: {self.kwargs}). Exception: {e}. " + f"Hint: Check if the container is ready, the function parameters are correct, " + f"and the expected conditions are met for the function to succeed." + ) from e + logger.debug(f"Connection attempt failed: {e!s}") + time.sleep(self._poll_interval) + + @wrapt.decorator + def wrapper(wrapped: Callable[..., Any], instance: Any, args: list[Any], kwargs: dict[str, Any]) -> Any: + # Use the LegacyWaitStrategy to handle retries with proper timeout + strategy = LegacyWaitStrategy(wrapped, instance, args, kwargs) + # For backwards compatibility, assume the instance is the container + container = instance if hasattr(instance, "get_container_host_ip") else args[0] if args else None + if container: + return strategy.wait_until_ready(container) + else: + # Fallback to direct call if we can't identify the container + return wrapped(*args, **kwargs) + + return cast("Callable[[F], F]", wrapper) @wait_container_is_ready() def wait_for(condition: Callable[..., bool]) -> bool: + warnings.warn( + "The wait_for function is deprecated and will be removed in a future version. " + "Use structured wait strategies instead: " + "container.waiting_for(LogMessageWaitStrategy('ready')) or " + "container.waiting_for(HttpWaitStrategy(8080).for_status_code(200))", + DeprecationWarning, + stacklevel=2, + ) return condition() @@ -81,52 +196,185 @@ def wait_for(condition: Callable[..., bool]) -> bool: def wait_for_logs( - container: "DockerContainer", - predicate: Union[Callable, str], - timeout: Union[float, None] = None, + container: WaitStrategyTarget, + predicate: Union[Callable[[str], bool], str, WaitStrategy], + timeout: float = config.timeout, interval: float = 1, predicate_streams_and: bool = False, raise_on_exit: bool = False, - # ) -> float: """ - Wait for the container to emit logs satisfying the predicate. + Enhanced version of wait_for_logs that supports both old and new interfaces. + + This function waits for container logs to satisfy a predicate. It supports + multiple input types for the predicate and maintains backwards compatibility + with existing code while adding support for the new WaitStrategy system. + + This is a convenience function that can be used for simple log-based waits. + For more complex scenarios, consider using structured wait strategies directly. Args: - container: Container whose logs to wait for. - predicate: Predicate that should be satisfied by the logs. If a string, then it is used as - the pattern for a multiline regular expression search. - timeout: Number of seconds to wait for the predicate to be satisfied. Defaults to wait - indefinitely. - interval: Interval at which to poll the logs. - predicate_streams_and: should the predicate be applied to both + container: The DockerContainer to monitor + predicate: The predicate to check against logs. Can be: + - A callable function that takes log text and returns bool + - A string that will be compiled to a regex pattern + - A WaitStrategy object + timeout: Maximum time to wait in seconds (default: config.timeout) + interval: How frequently to check in seconds (default: 1) + predicate_streams_and: If True, predicate must match both stdout and stderr (default: False) + raise_on_exit: If True, raise RuntimeError if container exits before predicate matches (default: False) Returns: - duration: Number of seconds until the predicate was satisfied. + The time in seconds that was spent waiting + + Raises: + TimeoutError: If the predicate is not satisfied within the timeout period + RuntimeError: If raise_on_exit is True and container exits before predicate matches + + Example: + # Wait for a simple string + wait_for_logs(container, "ready for start") + + # Wait with custom predicate + wait_for_logs(container, lambda logs: "database" in logs and "ready" in logs) + + # Wait with WaitStrategy + strategy = LogMessageWaitStrategy("ready") + wait_for_logs(container, strategy) + + # For more complex scenarios, use structured wait strategies directly: + container.waiting_for(LogMessageWaitStrategy("ready")) """ - if timeout is None: - timeout = config.timeout + # Only warn for legacy usage (string or callable predicates, not WaitStrategy objects) + if not isinstance(predicate, WaitStrategy): + warnings.warn( + "The wait_for_logs function with string or callable predicates is deprecated and will be removed in a future version. " + "Use structured wait strategies instead: " + "container.waiting_for(LogMessageWaitStrategy('ready')) or " + "container.waiting_for(LogMessageWaitStrategy(re.compile(r'pattern')))", + DeprecationWarning, + stacklevel=2, + ) + + if isinstance(predicate, WaitStrategy): + start = time.time() + predicate.with_startup_timeout(int(timeout)).with_poll_interval(interval) + predicate.wait_until_ready(container) + return time.time() - start + + # Original implementation for backwards compatibility if isinstance(predicate, str): - predicate = re.compile(predicate, re.MULTILINE).search + compiled_pattern = re.compile(predicate, re.MULTILINE) + + def predicate_func(text: str) -> bool: + return compiled_pattern.search(text) is not None + else: + # At this point we know predicate is callable since it's not str and not WaitStrategy + predicate_func = predicate wrapped = container.get_wrapped_container() start = time.time() while True: duration = time.time() - start - stdout, stderr = container.get_logs() - stdout = stdout.decode() - stderr = stderr.decode() + stdout_bytes, stderr_bytes = container.get_logs() + stdout = stdout_bytes.decode() + stderr = stderr_bytes.decode() predicate_result = ( - predicate(stdout) or predicate(stderr) + predicate_func(stdout) or predicate_func(stderr) if predicate_streams_and is False - else predicate(stdout) and predicate(stderr) - # + else predicate_func(stdout) and predicate_func(stderr) ) if predicate_result: return duration if duration > timeout: - raise TimeoutError(f"Container did not emit logs satisfying predicate in {timeout:.3f} seconds") + # Get current logs and status for debugging + stdout_str, stderr_str = _get_container_logs_for_debugging(container) + status_info = _get_container_status_info(container) + + raise TimeoutError( + f"Container did not emit logs satisfying predicate in {timeout:.3f} seconds. " + f"Container status: {status_info['status']}, health: {status_info['health_status']}. " + f"Recent stdout: {stdout_str}. " + f"Recent stderr: {stderr_str}. " + f"Hint: Check if the container is starting correctly and the expected log pattern is being generated. " + f"Verify the predicate function or pattern matches the actual log output." + ) if raise_on_exit: wrapped.reload() if wrapped.status not in _NOT_EXITED_STATUSES: - raise RuntimeError("Container exited before emitting logs satisfying predicate") + # Get exit information for better debugging + status_info = _get_container_status_info(container) + + raise RuntimeError( + f"Container exited (status: {status_info['status']}, exit code: {status_info['exit_code']}) " + f"before emitting logs satisfying predicate. " + f"Container error: {status_info['error']}. " + f"Hint: Check container logs and ensure the application is configured to start correctly. " + f"The application may be crashing or exiting early." + ) time.sleep(interval) + + +def _get_container_logs_for_debugging(container: WaitStrategyTarget, max_length: int = 200) -> tuple[str, str]: + """ + Get container logs for debugging purposes. + + Args: + container: The container to get logs from + max_length: Maximum length of log output to include in error messages + + Returns: + Tuple of (stdout, stderr) as strings + """ + try: + stdout_bytes, stderr_bytes = container.get_logs() + stdout_str = stdout_bytes.decode() if stdout_bytes else "" + stderr_str = stderr_bytes.decode() if stderr_bytes else "" + + # Truncate if too long + if len(stdout_str) > max_length: + stdout_str = "..." + stdout_str[-max_length:] + if len(stderr_str) > max_length: + stderr_str = "..." + stderr_str[-max_length:] + + return stdout_str, stderr_str + except Exception: + return "(failed to get logs)", "(failed to get logs)" + + +def _get_container_status_info(container: WaitStrategyTarget) -> dict[str, str]: + """ + Get container status information for debugging. + + Args: + container: The container to get status from + + Returns: + Dictionary with status information + """ + try: + wrapped = container.get_wrapped_container() + wrapped.reload() + + state = wrapped.attrs.get("State", {}) + return { + "status": wrapped.status, + "exit_code": str(state.get("ExitCode", "unknown")), + "error": state.get("Error", ""), + "health_status": state.get("Health", {}).get("Status", "no health check"), + } + except Exception: + return { + "status": "unknown", + "exit_code": "unknown", + "error": "failed to get status", + "health_status": "unknown", + } + + +__all__ = [ + "WaitStrategy", + "WaitStrategyTarget", + "wait_container_is_ready", + "wait_for", + "wait_for_logs", +] diff --git a/core/tests/test_protocol_compliance.py b/core/tests/test_protocol_compliance.py new file mode 100644 index 000000000..b3fb87bd1 --- /dev/null +++ b/core/tests/test_protocol_compliance.py @@ -0,0 +1,73 @@ +"""Test protocol compliance for wait strategy targets.""" + +import pytest +from typing import get_type_hints + +from testcontainers.core.waiting_utils import WaitStrategyTarget +from testcontainers.core.container import DockerContainer +from testcontainers.compose.compose import ComposeContainer + + +def test_docker_container_implements_wait_strategy_target(): + """Test that DockerContainer implements all WaitStrategyTarget protocol methods.""" + container = DockerContainer("hello-world") + + # Check all required methods exist + assert hasattr(container, "get_container_host_ip") + assert hasattr(container, "get_exposed_port") + assert hasattr(container, "get_wrapped_container") + assert hasattr(container, "get_logs") + assert hasattr(container, "reload") + assert hasattr(container, "status") + + # Check method signatures are callable + assert callable(container.get_container_host_ip) + assert callable(container.get_exposed_port) + assert callable(container.get_wrapped_container) + assert callable(container.get_logs) + assert callable(container.reload) + + # Status should be a property + assert isinstance(container.__class__.status, property) + + +def test_compose_container_implements_wait_strategy_target(): + """Test that ComposeContainer implements all WaitStrategyTarget protocol methods.""" + container = ComposeContainer() + + # Check all required methods exist + assert hasattr(container, "get_container_host_ip") + assert hasattr(container, "get_exposed_port") + assert hasattr(container, "get_wrapped_container") + assert hasattr(container, "get_logs") + assert hasattr(container, "reload") + assert hasattr(container, "status") + + # Check method signatures are callable + assert callable(container.get_container_host_ip) + assert callable(container.get_exposed_port) + assert callable(container.get_wrapped_container) + assert callable(container.get_logs) + assert callable(container.reload) + + # Status should be a property + assert isinstance(container.__class__.status, property) + + +def test_protocol_typing_compatibility(): + """Test that both classes can be used where WaitStrategyTarget is expected.""" + + def function_expecting_protocol(target: WaitStrategyTarget) -> str: + """A function that expects a WaitStrategyTarget.""" + return "accepted" + + # These should work without type errors (structural typing) + docker_container = DockerContainer("hello-world") + compose_container = ComposeContainer() + + # If the classes properly implement the protocol, these should work + result1 = function_expecting_protocol(docker_container) + result2 = function_expecting_protocol(compose_container) + + assert result1 == "accepted" + assert result2 == "accepted" diff --git a/core/tests/test_wait_strategies.py b/core/tests/test_wait_strategies.py new file mode 100644 index 000000000..280e84fe5 --- /dev/null +++ b/core/tests/test_wait_strategies.py @@ -0,0 +1,148 @@ +import re +import time +from datetime import timedelta +from unittest.mock import Mock, patch, MagicMock +import pytest +import itertools + +from testcontainers.core.container import DockerContainer +from testcontainers.core.wait_strategies import ( + LogMessageWaitStrategy, + WaitStrategy, +) + + +class ConcreteWaitStrategy(WaitStrategy): + """Concrete implementation for testing abstract base class.""" + + def wait_until_ready(self, container: "WaitStrategyTarget") -> None: + # Simple implementation that just waits a bit + time.sleep(0.1) + + +class TestWaitStrategy: + """Test the base WaitStrategy class.""" + + def test_wait_strategy_initialization(self): + strategy = ConcreteWaitStrategy() + assert strategy._startup_timeout > 0 + assert strategy._poll_interval > 0 + + @pytest.mark.parametrize( + "timeout_value,expected_seconds", + [ + (30, 30), + (timedelta(seconds=45), 45), + (60, 60), + (timedelta(minutes=2), 120), + ], + ids=[ + "timeout_int_30_seconds", + "timeout_timedelta_45_seconds", + "timeout_int_60_seconds", + "timeout_timedelta_2_minutes", + ], + ) + def test_with_startup_timeout(self, timeout_value, expected_seconds): + strategy = ConcreteWaitStrategy() + result = strategy.with_startup_timeout(timeout_value) + assert result is strategy + assert strategy._startup_timeout == expected_seconds + + @pytest.mark.parametrize( + "interval_value,expected_seconds", + [ + (2.5, 2.5), + (timedelta(seconds=3), 3.0), + (0.1, 0.1), + (timedelta(milliseconds=500), 0.5), + ], + ids=[ + "interval_float_2_5_seconds", + "interval_timedelta_3_seconds", + "interval_float_0_1_seconds", + "interval_timedelta_500_milliseconds", + ], + ) + def test_with_poll_interval(self, interval_value, expected_seconds): + strategy = ConcreteWaitStrategy() + result = strategy.with_poll_interval(interval_value) + assert result is strategy + assert strategy._poll_interval == expected_seconds + + def test_abstract_method(self): + # Test that abstract base class cannot be instantiated + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + WaitStrategy() # type: ignore[abstract] + + +class TestLogMessageWaitStrategy: + """Test the LogMessageWaitStrategy class.""" + + @pytest.mark.parametrize( + "message,times,predicate_streams_and", + [ + ("test message", 1, False), + (re.compile(r"test\d+"), 1, False), + ("test", 3, False), + ("test", 1, True), + ("ready", 2, True), + ], + ids=[ + "simple_string_message", + "regex_pattern_message", + "message_with_times_3", + "message_with_predicate_streams_and_true", + "ready_message_with_times_and_predicate", + ], + ) + def test_log_message_wait_strategy_initialization(self, message, times, predicate_streams_and): + strategy = LogMessageWaitStrategy(message, times=times, predicate_streams_and=predicate_streams_and) + + if isinstance(message, str): + assert strategy._message.pattern == message + else: + assert strategy._message is message + + assert strategy._times == times + assert strategy._predicate_streams_and is predicate_streams_and + + @pytest.mark.parametrize( + "container_logs,expected_message,should_succeed", + [ + ((b"test message", b""), "test message", True), + ((b"", b"test message"), "test message", True), + ((b"no match", b""), "test message", False), + ((b"test123", b""), re.compile(r"test\d+"), True), + ((b"test", b""), re.compile(r"test\d+"), False), + ], + ids=[ + "stdout_contains_message_success", + "stderr_contains_message_success", + "no_message_match_failure", + "regex_pattern_match_success", + "regex_pattern_no_match_failure", + ], + ) + @patch("time.time") + @patch("time.sleep") + def test_wait_until_ready(self, mock_sleep, mock_time, container_logs, expected_message, should_succeed): + strategy = LogMessageWaitStrategy(expected_message) + mock_container = Mock() + mock_container.get_logs.return_value = container_logs + # Mock the wrapped container to simulate a running container + mock_wrapped = Mock() + mock_wrapped.status = "running" + mock_wrapped.reload.return_value = None + mock_container.get_wrapped_container.return_value = mock_wrapped + # Configure time mock to simulate timeout for failure cases + if should_succeed: + mock_time.side_effect = [0, 1] + else: + mock_time.side_effect = itertools.count(start=0, step=1) + if should_succeed: + strategy.wait_until_ready(mock_container) + mock_container.get_logs.assert_called_once() + else: + with pytest.raises(TimeoutError): + strategy.wait_until_ready(mock_container) diff --git a/core/tests/test_wait_strategies_integration.py b/core/tests/test_wait_strategies_integration.py new file mode 100644 index 000000000..4e090ab80 --- /dev/null +++ b/core/tests/test_wait_strategies_integration.py @@ -0,0 +1,88 @@ +import tempfile +import time +from pathlib import Path + +import pytest + +from testcontainers.core.container import DockerContainer +from testcontainers.core.wait_strategies import LogMessageWaitStrategy + + +class TestRealDockerIntegration: + """Integration tests using real Docker containers.""" + + def test_log_message_wait_strategy_with_real_container(self): + """Test LogMessageWaitStrategy with a real container that outputs known logs.""" + strategy = LogMessageWaitStrategy("Hello from Docker!") + + with DockerContainer("hello-world").waiting_for(strategy) as container: + # If we get here, the strategy worked + assert container.get_wrapped_container() is not None + + def test_wait_strategy_timeout_with_real_container(self): + """Test that wait strategies properly timeout with real containers.""" + # Use a very short timeout with a condition that won't be met + strategy = LogMessageWaitStrategy("this_message_will_never_appear").with_startup_timeout(2) + + with pytest.raises(TimeoutError): + with DockerContainer("alpine:latest").with_command("sleep 10").waiting_for(strategy): + pass # Should not reach here + + +class TestDockerComposeIntegration: + """Integration tests for wait strategies with Docker Compose.""" + + def test_compose_service_wait_strategies(self): + """Test that wait strategies work with Docker Compose services.""" + from testcontainers.compose import DockerCompose + import tempfile + from pathlib import Path + + # Use basic_multiple fixture with two alpine services that output logs + compose = DockerCompose( + context=Path(__file__).parent / "compose_fixtures" / "basic_multiple", + compose_file_name="docker-compose.yaml", + ) + + # Configure wait strategies for both services + # Wait for the date output that these containers produce + compose.waiting_for( + { + "alpine1": LogMessageWaitStrategy("202").with_startup_timeout(30), # Date includes year 202X + "alpine2": LogMessageWaitStrategy("202").with_startup_timeout(30), # Date includes year 202X + } + ) + + with compose: + # Verify both services are running + container1 = compose.get_container("alpine1") + container2 = compose.get_container("alpine2") + + assert container1.State == "running" + assert container2.State == "running" + + # Verify logs contain expected patterns + logs1 = container1.get_logs() + logs2 = container2.get_logs() + + # Both containers should have date output (which contains "202" for year 202X) + assert any(b"202" in log for log in logs1) + assert any(b"202" in log for log in logs2) + + def test_compose_wait_strategy_timeout(self): + """Test that compose wait strategies properly timeout.""" + from testcontainers.compose import DockerCompose + from pathlib import Path + + compose = DockerCompose( + context=Path(__file__).parent / "compose_fixtures" / "basic", compose_file_name="docker-compose.yaml" + ) + + # Use a wait strategy that will never succeed with very short timeout + compose.waiting_for( + {"alpine": LogMessageWaitStrategy("this_message_will_never_appear").with_startup_timeout(2)} + ) + + with pytest.raises(TimeoutError): + with compose: + pass # Should not reach here diff --git a/core/tests/test_waiting_utils.py b/core/tests/test_waiting_utils.py index 1e684fc46..80b421c46 100644 --- a/core/tests/test_waiting_utils.py +++ b/core/tests/test_waiting_utils.py @@ -1,7 +1,7 @@ import pytest from testcontainers.core.container import DockerContainer -from testcontainers.core.waiting_utils import wait_for_logs +from testcontainers.core.waiting_utils import wait_for_logs, wait_for, wait_container_is_ready def test_wait_for_logs() -> None: @@ -12,3 +12,27 @@ def test_wait_for_logs() -> None: def test_timeout_is_raised_when_waiting_for_logs() -> None: with pytest.raises(TimeoutError), DockerContainer("alpine").with_command("sleep 2") as container: wait_for_logs(container, "Hello from Docker!", timeout=1e-3) + + +def test_wait_container_is_ready_decorator_basic() -> None: + """Test the basic wait_container_is_ready decorator functionality.""" + + @wait_container_is_ready() + def simple_check(): + return True + + result = simple_check() + assert result is True + + +def test_wait_container_is_ready_decorator_with_container() -> None: + """Test wait_container_is_ready decorator with a real container.""" + + @wait_container_is_ready() + def check_container_logs(container): + stdout, stderr = container.get_logs() + return b"Hello from Docker!" in stdout or b"Hello from Docker!" in stderr + + with DockerContainer("hello-world") as container: + result = check_container_logs(container) + assert result is True diff --git a/pyproject.toml b/pyproject.toml index e52116126..7d516af83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -209,6 +209,12 @@ log_cli_level = "INFO" markers = [ "inside_docker_check: mark test to be used to validate DinD/DooD is working as expected", ] +filterwarnings = [ + # Suppress expected deprecation warnings for backwards compatibility testing + "ignore:The @wait_container_is_ready decorator is deprecated.*:DeprecationWarning", + "ignore:The wait_for function is deprecated and will be removed in a future version.*:DeprecationWarning", + "ignore:The wait_for_logs function with string or callable predicates is deprecated.*:DeprecationWarning", +] [tool.coverage.run] branch = true diff --git a/requirements.txt b/requirements.txt index febf39d64..f77d057b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,3 @@ mkdocs==1.3.0 mkdocs-codeinclude-plugin==0.2.0 mkdocs-material==8.1.3 mkdocs-markdownextradata-plugin==0.2.5 -