2
2
import socket
3
3
import ssl
4
4
from collections .abc import Iterable
5
+ from distutils .util import strtobool
6
+ from urllib .error import HTTPError , URLError
7
+ from urllib .request import urlopen
8
+
5
9
from typing_extensions import Self
10
+
6
11
from testcontainers .core .container import DockerContainer
7
12
from testcontainers .core .waiting_utils import wait_container_is_ready , wait_for_logs
13
+
8
14
from . import _grab as grab
9
- from distutils .util import strtobool
10
- from urllib .error import HTTPError , URLError
11
- from urllib .request import urlopen
12
15
13
16
__all__ = ["CosmosDBEmulatorContainer" ]
14
17
15
18
EMULATOR_PORT = 8081
16
19
20
+
17
21
class CosmosDBEmulatorContainer (DockerContainer ):
18
22
"""
19
23
Abstract class for CosmosDB Emulator endpoints.
@@ -28,9 +32,7 @@ def __init__(
28
32
"AZURE_COSMOS_EMULATOR_IMAGE" , "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
29
33
),
30
34
partition_count : int = os .getenv ("AZURE_COSMOS_EMULATOR_PARTITION_COUNT" , None ),
31
- enable_data_persistence : bool = strtobool (
32
- os .getenv ("AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE" , "false" )
33
- ),
35
+ enable_data_persistence : bool = strtobool (os .getenv ("AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE" , "false" )),
34
36
key : str = os .getenv (
35
37
"AZURE_COSMOS_EMULATOR_KEY" ,
36
38
"C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==" ,
@@ -52,7 +54,7 @@ def host(self) -> str:
52
54
Emulator host
53
55
"""
54
56
return self .get_container_host_ip ()
55
-
57
+
56
58
@property
57
59
def server_certificate_pem (self ) -> bytes :
58
60
"""
@@ -66,18 +68,17 @@ def start(self) -> Self:
66
68
self ._wait_until_ready ()
67
69
self ._cert_pem_bytes = self ._download_cert ()
68
70
return self
69
-
71
+
70
72
def _configure (self ) -> None :
71
- all_ports = set ([ EMULATOR_PORT ] + self .endpoint_ports )
73
+ all_ports = { EMULATOR_PORT , * self .endpoint_ports }
72
74
if self .bind_ports :
73
75
for port in all_ports :
74
76
self .with_bind_ports (port , port )
75
77
else :
76
78
self .with_exposed_ports (* all_ports )
77
79
78
80
(
79
- self
80
- .with_env ("AZURE_COSMOS_EMULATOR_PARTITION_COUNT" , str (self .partition_count ))
81
+ self .with_env ("AZURE_COSMOS_EMULATOR_PARTITION_COUNT" , str (self .partition_count ))
81
82
.with_env ("AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE" , socket .gethostbyname (socket .gethostname ()))
82
83
.with_env ("AZURE_COSMOS_EMULATOR_ENABLE_DATA_PERSISTENCE" , str (self .enable_data_persistence ))
83
84
.with_env ("AZURE_COSMOS_EMULATOR_KEY" , str (self .key ))
@@ -89,12 +90,13 @@ def _wait_until_ready(self) -> Self:
89
90
if self .bind_ports :
90
91
self ._wait_for_url (f"https://{ self .host } :{ EMULATOR_PORT } /_explorer/index.html" )
91
92
self ._wait_for_query_success ()
92
-
93
+
93
94
return self
94
95
95
96
def _download_cert (self ) -> bytes :
96
97
with grab .file (
97
- self .get_wrapped_container (), "/tmp/cosmos/appdata/.system/profiles/Client/AppData/Local/CosmosDBEmulator/emulator.pem"
98
+ self .get_wrapped_container (),
99
+ "/tmp/cosmos/appdata/.system/profiles/Client/AppData/Local/CosmosDBEmulator/emulator.pem" ,
98
100
) as cert :
99
101
return cert .read ()
100
102
@@ -103,6 +105,6 @@ def _wait_for_url(self, url: str) -> Self:
103
105
with urlopen (url , context = ssl ._create_unverified_context ()) as response :
104
106
response .read ()
105
107
return self
106
-
108
+
107
109
def _wait_for_query_success (self ) -> None :
108
110
pass
0 commit comments