Skip to content

feat(core): Wait strategies foundation #838

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 119 additions & 14 deletions core/testcontainers/compose/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
from os import PathLike
from platform import system
from re import split
from subprocess import CompletedProcess
from subprocess import CalledProcessError, CompletedProcess
from subprocess import run as subprocess_run
from typing import Any, Callable, Literal, Optional, TypeVar, Union, cast
from urllib.error import HTTPError, URLError
from urllib.request import urlopen

from testcontainers.core.exceptions import ContainerIsNotRunning, NoSuchPortExposed
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.core.utils import setup_logger
from testcontainers.core.waiting_utils import WaitStrategy

_IPT = TypeVar("_IPT")
_WARNINGS = {"DOCKER_COMPOSE_GET_CONFIG": "get_config is experimental, see testcontainers/testcontainers-python#669"}

logger = setup_logger(__name__)


def _ignore_properties(cls: type[_IPT], dict_: any) -> _IPT:
"""omits extra fields like @JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -77,6 +78,7 @@ class ComposeContainer:
Health: Optional[str] = None
ExitCode: Optional[str] = None
Publishers: list[PublishedPort] = field(default_factory=list)
_docker_compose: Optional["DockerCompose"] = field(default=None, init=False, repr=False)

def __post_init__(self):
if self.Publishers:
Expand Down Expand Up @@ -112,6 +114,41 @@ def get_publisher(
def _matches_protocol(prefer_ip_version, r):
return (":" in r.URL) is (prefer_ip_version == "IPv6")

# WaitStrategy compatibility methods
def get_container_host_ip(self) -> str:
"""Get the host IP for the container."""
# Simplified implementation - wait strategies don't use this yet
return "127.0.0.1"

def get_exposed_port(self, port: int) -> int:
"""Get the exposed port mapping for the given internal port."""
# Simplified implementation - wait strategies don't use this yet
return port

def get_logs(self) -> tuple[bytes, bytes]:
"""Get container logs."""
if not self._docker_compose:
raise RuntimeError("DockerCompose reference not set on ComposeContainer")
if not self.Service:
raise RuntimeError("Service name not set on ComposeContainer")
stdout, stderr = self._docker_compose.get_logs(self.Service)
return stdout.encode(), stderr.encode()

def get_wrapped_container(self) -> "ComposeContainer":
"""Get the underlying container object for compatibility."""
return self

def reload(self) -> None:
"""Reload container information for compatibility with wait strategies."""
# ComposeContainer doesn't need explicit reloading as it's fetched fresh
# each time through get_container(), but we need this method for compatibility
pass

@property
def status(self) -> str:
"""Get container status for compatibility with wait strategies."""
return self.State or "unknown"


@dataclass
class DockerCompose:
Expand Down Expand Up @@ -174,6 +211,7 @@ class DockerCompose:
services: Optional[list[str]] = None
docker_command_path: Optional[str] = None
profiles: Optional[list[str]] = None
_wait_strategies: Optional[dict[str, Any]] = field(default=None, init=False, repr=False)

def __post_init__(self):
if isinstance(self.compose_file_name, str):
Expand Down Expand Up @@ -207,6 +245,16 @@ def compose_command_property(self) -> list[str]:
docker_compose_cmd += ["--env-file", self.env_file]
return docker_compose_cmd

def waiting_for(self, strategies: dict[str, WaitStrategy]) -> "DockerCompose":
"""
Set wait strategies for specific services.

Args:
strategies: Dictionary mapping service names to wait strategies
"""
self._wait_strategies = strategies
return self

def start(self) -> None:
"""
Starts the docker compose environment.
Expand Down Expand Up @@ -235,6 +283,11 @@ def start(self) -> None:

self._run_command(cmd=up_cmd)

if self._wait_strategies:
for service, strategy in self._wait_strategies.items():
container = self.get_container(service_name=service)
strategy.wait_until_ready(container)

def stop(self, down=True) -> None:
"""
Stops the docker compose environment.
Expand Down Expand Up @@ -322,6 +375,10 @@ def get_containers(self, include_all=False) -> list[ComposeContainer]:
else:
containers.append(_ignore_properties(ComposeContainer, data))

# Set the docker_compose reference on each container
for container in containers:
container._docker_compose = self

return containers

def get_container(
Expand Down Expand Up @@ -369,7 +426,13 @@ def exec_in_container(
exit_code: The command's exit code.
"""
if not service_name:
service_name = self.get_container().Service
containers = self.get_containers()
if len(containers) != 1:
raise ContainerIsNotRunning(
f"exec_in_container failed because no service_name given "
f"and there is not exactly 1 container (but {len(containers)})"
)
service_name = containers[0].Service
exec_cmd = [*self.compose_command_property, "exec", "-T", service_name, *command]
result = self._run_command(cmd=exec_cmd)

Expand All @@ -381,12 +444,18 @@ def _run_command(
context: Optional[str] = None,
) -> CompletedProcess[bytes]:
context = context or self.context
return subprocess_run(
cmd,
capture_output=True,
check=True,
cwd=context,
)
try:
return subprocess_run(
cmd,
capture_output=True,
check=True,
cwd=context,
)
except CalledProcessError as e:
logger.error(f"Command '{e.cmd}' failed with exit code {e.returncode}")
logger.error(f"STDOUT:\n{e.stdout.decode(errors='ignore')}")
logger.error(f"STDERR:\n{e.stderr.decode(errors='ignore')}")
raise e from e

def get_service_port(
self,
Expand Down Expand Up @@ -440,16 +509,52 @@ def get_service_host_and_port(
publisher = self.get_container(service_name).get_publisher(by_port=port).normalize()
return publisher.URL, publisher.PublishedPort

@wait_container_is_ready(HTTPError, URLError)
def wait_for(self, url: str) -> "DockerCompose":
"""
Waits for a response from a given URL. This is typically used to block until a service in
the environment has started and is responding. Note that it does not assert any sort of
return code, only check that the connection was successful.

This is a convenience method that internally uses HttpWaitStrategy. For more complex
wait scenarios, consider using the structured wait strategies with `waiting_for()`.

Args:
url: URL from one of the services in the environment to use to wait on.

Example:
# Simple URL wait (legacy style)
compose.wait_for("http://localhost:8080")

# For more complex scenarios, use structured wait strategies:
from testcontainers.core.waiting_utils import HttpWaitStrategy, LogMessageWaitStrategy

compose.waiting_for({
"web": HttpWaitStrategy(8080).for_status_code(200),
"db": LogMessageWaitStrategy("database system is ready to accept connections")
})
"""
with urlopen(url) as response:
response.read()
import time
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

# For simple URL waiting when we have multiple containers,
# we'll do a direct HTTP check instead of using the container-based strategy
start_time = time.time()
timeout = 120 # Default timeout

while True:
if time.time() - start_time > timeout:
raise TimeoutError(f"URL {url} not ready within {timeout} seconds")

try:
request = Request(url, method="GET")
with urlopen(request, timeout=1) as response:
if 200 <= response.status < 400:
return self
except (URLError, HTTPError, ConnectionResetError, ConnectionRefusedError, BrokenPipeError, OSError):
# Any connection error means we should keep waiting
pass

time.sleep(1)

return self
27 changes: 24 additions & 3 deletions core/testcontainers/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from testcontainers.core.labels import LABEL_SESSION_ID, SESSION_ID
from testcontainers.core.network import Network
from testcontainers.core.utils import is_arm, setup_logger
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
from testcontainers.core.waiting_utils import WaitStrategy

if TYPE_CHECKING:
from docker.models.containers import Container
Expand Down Expand Up @@ -90,6 +91,7 @@
self.with_network_aliases(*network_aliases)

self._kwargs = kwargs
self._wait_strategy: Optional[WaitStrategy] = None

def with_env(self, key: str, value: str) -> Self:
self.env[key] = value
Expand Down Expand Up @@ -154,6 +156,11 @@
return self.with_kwargs(platform="linux/amd64")
return self

def waiting_for(self, strategy: WaitStrategy) -> "DockerContainer":
"""Set a wait strategy to be used after container start."""
self._wait_strategy = strategy
return self

def start(self) -> Self:
if not c.ryuk_disabled and self.image != c.ryuk_image:
logger.debug("Creating Ryuk container")
Expand Down Expand Up @@ -186,6 +193,9 @@
)

logger.info("Container started: %s", self._container.short_id)

if self._wait_strategy is not None:
self._wait_strategy.wait_until_ready(self)
return self

def stop(self, force=True, delete_volume=True) -> None:
Expand All @@ -212,7 +222,6 @@
# ensure that we covered all possible connection_modes
assert_never(connection_mode)

@wait_container_is_ready()
def get_exposed_port(self, port: int) -> int:
if self.get_docker_client().get_connection_mode().use_mapped_port:
return self.get_docker_client().port(self._container.id, port)
Expand Down Expand Up @@ -242,6 +251,18 @@
raise ContainerStartException("Container should be started before getting logs")
return self._container.logs(stderr=False), self._container.logs(stdout=False)

def reload(self) -> None:
"""Reload container information for compatibility with wait strategies."""
if self._container:
self._container.reload()

Check warning on line 257 in core/testcontainers/core/container.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/container.py#L257

Added line #L257 was not covered by tests

@property
def status(self) -> str:
"""Get container status for compatibility with wait strategies."""
if not self._container:
return "not_started"
return self._container.status

Check warning on line 264 in core/testcontainers/core/container.py

View check run for this annotation

Codecov / codecov/patch

core/testcontainers/core/container.py#L264

Added line #L264 was not covered by tests

def exec(self, command: Union[str, list[str]]) -> tuple[int, bytes]:
if not self._container:
raise ContainerStartException("Container should be started before executing a command")
Expand Down Expand Up @@ -291,7 +312,7 @@
.with_env("RYUK_RECONNECTION_TIMEOUT", c.ryuk_reconnection_timeout)
.start()
)
wait_for_logs(Reaper._container, r".* Started!", timeout=20, raise_on_exit=True)
Reaper._container.waiting_for(LogMessageWaitStrategy(r".* Started!").with_startup_timeout(20))

container_host = Reaper._container.get_container_host_ip()
container_port = int(Reaper._container.get_exposed_port(8080))
Expand Down
Loading