Skip to content

Commit 1bc760a

Browse files
committed
this pr is the foundation of aligning testcontainers-python waiting utilities with other languges
1 parent ff6a32d commit 1bc760a

11 files changed

+972
-82
lines changed

core/testcontainers/compose/compose.py

Lines changed: 119 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
from os import PathLike
66
from platform import system
77
from re import split
8-
from subprocess import CompletedProcess
8+
from subprocess import CalledProcessError, CompletedProcess
99
from subprocess import run as subprocess_run
1010
from typing import Any, Callable, Literal, Optional, TypeVar, Union, cast
11-
from urllib.error import HTTPError, URLError
12-
from urllib.request import urlopen
1311

1412
from testcontainers.core.exceptions import ContainerIsNotRunning, NoSuchPortExposed
15-
from testcontainers.core.waiting_utils import wait_container_is_ready
13+
from testcontainers.core.utils import setup_logger
14+
from testcontainers.core.waiting_utils import WaitStrategy
1615

1716
_IPT = TypeVar("_IPT")
1817
_WARNINGS = {"DOCKER_COMPOSE_GET_CONFIG": "get_config is experimental, see testcontainers/testcontainers-python#669"}
1918

19+
logger = setup_logger(__name__)
20+
2021

2122
def _ignore_properties(cls: type[_IPT], dict_: any) -> _IPT:
2223
"""omits extra fields like @JsonIgnoreProperties(ignoreUnknown = true)
@@ -77,6 +78,7 @@ class ComposeContainer:
7778
Health: Optional[str] = None
7879
ExitCode: Optional[str] = None
7980
Publishers: list[PublishedPort] = field(default_factory=list)
81+
_docker_compose: Optional["DockerCompose"] = field(default=None, init=False, repr=False)
8082

8183
def __post_init__(self):
8284
if self.Publishers:
@@ -112,6 +114,41 @@ def get_publisher(
112114
def _matches_protocol(prefer_ip_version, r):
113115
return (":" in r.URL) is (prefer_ip_version == "IPv6")
114116

117+
# WaitStrategy compatibility methods
118+
def get_container_host_ip(self) -> str:
119+
"""Get the host IP for the container."""
120+
# Simplified implementation - wait strategies don't use this yet
121+
return "127.0.0.1"
122+
123+
def get_exposed_port(self, port: int) -> int:
124+
"""Get the exposed port mapping for the given internal port."""
125+
# Simplified implementation - wait strategies don't use this yet
126+
return port
127+
128+
def get_logs(self) -> tuple[bytes, bytes]:
129+
"""Get container logs."""
130+
if not self._docker_compose:
131+
raise RuntimeError("DockerCompose reference not set on ComposeContainer")
132+
if not self.Service:
133+
raise RuntimeError("Service name not set on ComposeContainer")
134+
stdout, stderr = self._docker_compose.get_logs(self.Service)
135+
return stdout.encode(), stderr.encode()
136+
137+
def get_wrapped_container(self) -> "ComposeContainer":
138+
"""Get the underlying container object for compatibility."""
139+
return self
140+
141+
def reload(self) -> None:
142+
"""Reload container information for compatibility with wait strategies."""
143+
# ComposeContainer doesn't need explicit reloading as it's fetched fresh
144+
# each time through get_container(), but we need this method for compatibility
145+
pass
146+
147+
@property
148+
def status(self) -> str:
149+
"""Get container status for compatibility with wait strategies."""
150+
return self.State or "unknown"
151+
115152

116153
@dataclass
117154
class DockerCompose:
@@ -174,6 +211,7 @@ class DockerCompose:
174211
services: Optional[list[str]] = None
175212
docker_command_path: Optional[str] = None
176213
profiles: Optional[list[str]] = None
214+
_wait_strategies: Optional[dict[str, Any]] = field(default=None, init=False, repr=False)
177215

178216
def __post_init__(self):
179217
if isinstance(self.compose_file_name, str):
@@ -207,6 +245,16 @@ def compose_command_property(self) -> list[str]:
207245
docker_compose_cmd += ["--env-file", self.env_file]
208246
return docker_compose_cmd
209247

248+
def waiting_for(self, strategies: dict[str, WaitStrategy]) -> "DockerCompose":
249+
"""
250+
Set wait strategies for specific services.
251+
252+
Args:
253+
strategies: Dictionary mapping service names to wait strategies
254+
"""
255+
self._wait_strategies = strategies
256+
return self
257+
210258
def start(self) -> None:
211259
"""
212260
Starts the docker compose environment.
@@ -235,6 +283,11 @@ def start(self) -> None:
235283

236284
self._run_command(cmd=up_cmd)
237285

286+
if self._wait_strategies:
287+
for service, strategy in self._wait_strategies.items():
288+
container = self.get_container(service_name=service)
289+
strategy.wait_until_ready(container)
290+
238291
def stop(self, down=True) -> None:
239292
"""
240293
Stops the docker compose environment.
@@ -322,6 +375,10 @@ def get_containers(self, include_all=False) -> list[ComposeContainer]:
322375
else:
323376
containers.append(_ignore_properties(ComposeContainer, data))
324377

378+
# Set the docker_compose reference on each container
379+
for container in containers:
380+
container._docker_compose = self
381+
325382
return containers
326383

327384
def get_container(
@@ -369,7 +426,13 @@ def exec_in_container(
369426
exit_code: The command's exit code.
370427
"""
371428
if not service_name:
372-
service_name = self.get_container().Service
429+
containers = self.get_containers()
430+
if len(containers) != 1:
431+
raise ContainerIsNotRunning(
432+
f"exec_in_container failed because no service_name given "
433+
f"and there is not exactly 1 container (but {len(containers)})"
434+
)
435+
service_name = containers[0].Service
373436
exec_cmd = [*self.compose_command_property, "exec", "-T", service_name, *command]
374437
result = self._run_command(cmd=exec_cmd)
375438

@@ -381,12 +444,18 @@ def _run_command(
381444
context: Optional[str] = None,
382445
) -> CompletedProcess[bytes]:
383446
context = context or self.context
384-
return subprocess_run(
385-
cmd,
386-
capture_output=True,
387-
check=True,
388-
cwd=context,
389-
)
447+
try:
448+
return subprocess_run(
449+
cmd,
450+
capture_output=True,
451+
check=True,
452+
cwd=context,
453+
)
454+
except CalledProcessError as e:
455+
logger.error(f"Command '{e.cmd}' failed with exit code {e.returncode}")
456+
logger.error(f"STDOUT:\n{e.stdout.decode(errors='ignore')}")
457+
logger.error(f"STDERR:\n{e.stderr.decode(errors='ignore')}")
458+
raise e from e
390459

391460
def get_service_port(
392461
self,
@@ -440,16 +509,52 @@ def get_service_host_and_port(
440509
publisher = self.get_container(service_name).get_publisher(by_port=port).normalize()
441510
return publisher.URL, publisher.PublishedPort
442511

443-
@wait_container_is_ready(HTTPError, URLError)
444512
def wait_for(self, url: str) -> "DockerCompose":
445513
"""
446514
Waits for a response from a given URL. This is typically used to block until a service in
447515
the environment has started and is responding. Note that it does not assert any sort of
448516
return code, only check that the connection was successful.
449517
518+
This is a convenience method that internally uses HttpWaitStrategy. For more complex
519+
wait scenarios, consider using the structured wait strategies with `waiting_for()`.
520+
450521
Args:
451522
url: URL from one of the services in the environment to use to wait on.
523+
524+
Example:
525+
# Simple URL wait (legacy style)
526+
compose.wait_for("http://localhost:8080")
527+
528+
# For more complex scenarios, use structured wait strategies:
529+
from testcontainers.core.waiting_utils import HttpWaitStrategy, LogMessageWaitStrategy
530+
531+
compose.waiting_for({
532+
"web": HttpWaitStrategy(8080).for_status_code(200),
533+
"db": LogMessageWaitStrategy("database system is ready to accept connections")
534+
})
452535
"""
453-
with urlopen(url) as response:
454-
response.read()
536+
import time
537+
from urllib.error import HTTPError, URLError
538+
from urllib.request import Request, urlopen
539+
540+
# For simple URL waiting when we have multiple containers,
541+
# we'll do a direct HTTP check instead of using the container-based strategy
542+
start_time = time.time()
543+
timeout = 120 # Default timeout
544+
545+
while True:
546+
if time.time() - start_time > timeout:
547+
raise TimeoutError(f"URL {url} not ready within {timeout} seconds")
548+
549+
try:
550+
request = Request(url, method="GET")
551+
with urlopen(request, timeout=1) as response:
552+
if 200 <= response.status < 400:
553+
return self
554+
except (URLError, HTTPError, ConnectionResetError, ConnectionRefusedError, BrokenPipeError, OSError):
555+
# Any connection error means we should keep waiting
556+
pass
557+
558+
time.sleep(1)
559+
455560
return self

core/testcontainers/core/container.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
from testcontainers.core.labels import LABEL_SESSION_ID, SESSION_ID
1717
from testcontainers.core.network import Network
1818
from testcontainers.core.utils import is_arm, setup_logger
19-
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs
19+
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
20+
from testcontainers.core.waiting_utils import WaitStrategy
2021

2122
if TYPE_CHECKING:
2223
from docker.models.containers import Container
@@ -90,6 +91,7 @@ def __init__(
9091
self.with_network_aliases(*network_aliases)
9192

9293
self._kwargs = kwargs
94+
self._wait_strategy: Optional[WaitStrategy] = None
9395

9496
def with_env(self, key: str, value: str) -> Self:
9597
self.env[key] = value
@@ -154,6 +156,11 @@ def maybe_emulate_amd64(self) -> Self:
154156
return self.with_kwargs(platform="linux/amd64")
155157
return self
156158

159+
def waiting_for(self, strategy: WaitStrategy) -> "DockerContainer":
160+
"""Set a wait strategy to be used after container start."""
161+
self._wait_strategy = strategy
162+
return self
163+
157164
def start(self) -> Self:
158165
if not c.ryuk_disabled and self.image != c.ryuk_image:
159166
logger.debug("Creating Ryuk container")
@@ -186,6 +193,9 @@ def start(self) -> Self:
186193
)
187194

188195
logger.info("Container started: %s", self._container.short_id)
196+
197+
if self._wait_strategy is not None:
198+
self._wait_strategy.wait_until_ready(self)
189199
return self
190200

191201
def stop(self, force=True, delete_volume=True) -> None:
@@ -212,7 +222,6 @@ def get_container_host_ip(self) -> str:
212222
# ensure that we covered all possible connection_modes
213223
assert_never(connection_mode)
214224

215-
@wait_container_is_ready()
216225
def get_exposed_port(self, port: int) -> int:
217226
if self.get_docker_client().get_connection_mode().use_mapped_port:
218227
return self.get_docker_client().port(self._container.id, port)
@@ -242,6 +251,18 @@ def get_logs(self) -> tuple[bytes, bytes]:
242251
raise ContainerStartException("Container should be started before getting logs")
243252
return self._container.logs(stderr=False), self._container.logs(stdout=False)
244253

254+
def reload(self) -> None:
255+
"""Reload container information for compatibility with wait strategies."""
256+
if self._container:
257+
self._container.reload()
258+
259+
@property
260+
def status(self) -> str:
261+
"""Get container status for compatibility with wait strategies."""
262+
if not self._container:
263+
return "not_started"
264+
return self._container.status
265+
245266
def exec(self, command: Union[str, list[str]]) -> tuple[int, bytes]:
246267
if not self._container:
247268
raise ContainerStartException("Container should be started before executing a command")
@@ -291,7 +312,7 @@ def _create_instance(cls) -> "Reaper":
291312
.with_env("RYUK_RECONNECTION_TIMEOUT", c.ryuk_reconnection_timeout)
292313
.start()
293314
)
294-
wait_for_logs(Reaper._container, r".* Started!", timeout=20, raise_on_exit=True)
315+
Reaper._container.waiting_for(LogMessageWaitStrategy(r".* Started!").with_startup_timeout(20))
295316

296317
container_host = Reaper._container.get_container_host_ip()
297318
container_port = int(Reaper._container.get_exposed_port(8080))

0 commit comments

Comments
 (0)