Skip to content

Commit 207d84b

Browse files
mickmisthe-glu
authored andcommitted
[datastore_access] split crdb and ybdb logic; correct ybdb implementation
1 parent 4d2a686 commit 207d84b

File tree

8 files changed

+288
-75
lines changed

8 files changed

+288
-75
lines changed

monitoring/uss_qualifier/resources/interuss/datastore/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from .datastore import (
2+
CockroachDBNode as CockroachDBNode,
3+
)
14
from .datastore import (
25
DatastoreDBClusterResource as DatastoreDBClusterResource,
36
)
@@ -7,3 +10,6 @@
710
from .datastore import (
811
DatastoreDBNodeResource as DatastoreDBNodeResource,
912
)
13+
from .datastore import (
14+
YugabyteDBNode as YugabyteDBNode,
15+
)

monitoring/uss_qualifier/resources/interuss/datastore/datastore.py

Lines changed: 203 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

33
import socket
4+
import ssl
5+
from abc import ABC, abstractmethod
46

57
import psycopg
68
from implicitdict import ImplicitDict
@@ -18,6 +20,9 @@ class DatastoreDBNodeSpecification(ImplicitDict):
1820
port: int
1921
"""Port to which DatastoreDB node is listening to."""
2022

23+
is_yugabyte: bool = False
24+
"""True if DatastoreDB node is a YugabyteDB node."""
25+
2126
def __init__(self, *args, **kwargs):
2227
super().__init__(**kwargs)
2328

@@ -34,7 +39,13 @@ def __init__(
3439
self._specification = specification
3540

3641
def get_client(self) -> DatastoreDBNode:
37-
return DatastoreDBNode(
42+
if self._specification.is_yugabyte:
43+
return YugabyteDBNode(
44+
self._specification.participant_id,
45+
self._specification.host,
46+
self._specification.port,
47+
)
48+
return CockroachDBNode(
3849
self._specification.participant_id,
3950
self._specification.host,
4051
self._specification.port,
@@ -69,7 +80,9 @@ def __init__(
6980
]
7081

7182

72-
class DatastoreDBNode:
83+
class DatastoreDBNode(ABC):
84+
_NOT_IMPLEMENTED_MSG = "All methods of base DatastoreDBNode class must be implemented by each specific subclass"
85+
7386
participant_id: str
7487
host: str
7588
port: int
@@ -84,6 +97,28 @@ def __init__(
8497
self.host = host
8598
self.port = port
8699

100+
@abstractmethod
101+
def is_reachable(self) -> tuple[bool, Exception | None]:
102+
"""Returns True if the node is reachable."""
103+
raise NotImplementedError(DatastoreDBNode._NOT_IMPLEMENTED_MSG)
104+
105+
@abstractmethod
106+
def no_ssl_rejected(self) -> tuple[bool, Exception | None]:
107+
"""Returns True if the node rejects cleartext communications."""
108+
raise NotImplementedError(DatastoreDBNode._NOT_IMPLEMENTED_MSG)
109+
110+
@abstractmethod
111+
def unauthenticated_rejected(self) -> tuple[bool, Exception | None]:
112+
"""Returns True if the node rejects unauthenticated communications."""
113+
raise NotImplementedError(DatastoreDBNode._NOT_IMPLEMENTED_MSG)
114+
115+
@abstractmethod
116+
def legacy_ssl_version_rejected(self) -> tuple[bool, Exception | None]:
117+
"""Returns True if the node rejects the usage of the legacy cryptographic protocols TLSv1 and TLSv1.1."""
118+
raise NotImplementedError(DatastoreDBNode._NOT_IMPLEMENTED_MSG)
119+
120+
121+
class CockroachDBNode(DatastoreDBNode):
87122
def connect(self, **kwargs) -> psycopg.Connection:
88123
return psycopg.connect(
89124
host=self.host,
@@ -92,9 +127,8 @@ def connect(self, **kwargs) -> psycopg.Connection:
92127
**kwargs,
93128
)
94129

95-
def is_reachable(self) -> tuple[bool, psycopg.Error | None]:
130+
def is_reachable(self) -> tuple[bool, Exception | None]:
96131
"""
97-
Returns True if the node is reachable.
98132
This is detected by attempting to establish a connection with the node
99133
not requiring encryption and validating either 1) that the connection
100134
fails with the error message reporting that the authentication failed;
@@ -117,9 +151,8 @@ def is_reachable(self) -> tuple[bool, psycopg.Error | None]:
117151
return is_reachable, e
118152
return True, None
119153

120-
def runs_in_secure_mode(self) -> tuple[bool, psycopg.Error | None]:
154+
def runs_in_secure_mode(self) -> tuple[bool, Exception | None]:
121155
"""
122-
Returns True if the node is running in secure mode.
123156
This is detected by attempting to establish a connection with the node
124157
in insecure mode and validating that the connection fails with the error
125158
message reporting that the node is running in secure mode.
@@ -138,10 +171,14 @@ def runs_in_secure_mode(self) -> tuple[bool, psycopg.Error | None]:
138171
return secure_mode, e
139172
return False, None
140173

141-
def legacy_ssl_version_rejected(self) -> tuple[bool, psycopg.Error | None]:
174+
def no_ssl_rejected(self) -> tuple[bool, Exception | None]:
175+
return self.runs_in_secure_mode()
176+
177+
def unauthenticated_rejected(self) -> tuple[bool, Exception | None]:
178+
return self.runs_in_secure_mode()
179+
180+
def legacy_ssl_version_rejected(self) -> tuple[bool, Exception | None]:
142181
"""
143-
Returns True if the node rejects the usage of the legacy cryptographic
144-
protocols TLSv1 and TLSv1.1.
145182
This is detected by attempting to establish a connection with the node
146183
forcing the client to use a TLS version < 1.2 and validating that the
147184
connection fails with the expected error message.
@@ -199,3 +236,160 @@ def _is_protocol_failure(data):
199236
return _is_protocol_failure(data), None
200237
except Exception as e:
201238
return False, str(e)
239+
240+
241+
class YugabyteDBNode(DatastoreDBNode):
242+
def _connect(self) -> socket.socket:
243+
return socket.create_connection(
244+
(self.host, self.port),
245+
timeout=5,
246+
)
247+
248+
def is_reachable(self) -> tuple[bool, Exception | None]:
249+
"""This is detected by attempting to open a socket with the node."""
250+
try:
251+
sock = self._connect()
252+
sock.close()
253+
return True, None
254+
except (TimeoutError, ConnectionError) as e:
255+
return False, e
256+
257+
def runs_in_secure_mode(self) -> tuple[bool, Exception | None]:
258+
"""
259+
Returns True is node runs in secure mode. If False, returns an exception describing the issue.
260+
Expects node to be reachable (see is_reachable).
261+
Secure mode is detected by:
262+
1) attempting to establish a connection with the node in cleartext; and then
263+
2) attempting to establish a connection with the node using TLS but without authentication using a client certificate.
264+
"""
265+
secure_without_ssl, e_without_ssl = self._attempt_insecure_request(
266+
with_ssl=False
267+
)
268+
# secure_without_ssl, e_without_ssl = True, None TODO: handle ConnectionResetByPeer when doing recv? or not?
269+
secure_with_ssl, e_with_ssl = self._attempt_insecure_request(with_ssl=True)
270+
if secure_without_ssl and secure_with_ssl:
271+
return True, None
272+
273+
err_msg = "Node is not in secure mode:\n"
274+
if e_without_ssl:
275+
err_msg += f"Request without SSL: {e_without_ssl}\n"
276+
if e_with_ssl:
277+
err_msg += f"Request with SSL: {e_with_ssl}\n"
278+
return False, Exception(err_msg)
279+
280+
@classmethod
281+
def _build_dummy_rpc_request(cls):
282+
"""Builds an RPC request for service 'yb.master.MasterService', method 'GetMasterRegistration' with call ID '5'.
283+
Reference: https://gruchalski.com/posts/2022-02-12-a-brief-look-at-yugabytedb-rpc-api/"""
284+
285+
return bytes.fromhex(
286+
"594201" # 'YB1' preamble
287+
"0000003a" # message byte length
288+
"38" # request header protobuf message length
289+
"0805" # field 1: call_id
290+
"12300a1779622e6d61737465722e4d61737465725365727669636512154765744d6173746572526567697374726174696f6e" # field 2: remote_method (service_name = 'yb.master.MasterService', method_name = 'GetMasterRegistration')
291+
"18e0d403" # field 3: timeout_millis
292+
"00" # request payload protobuf message length (no payload)
293+
)
294+
295+
def _attempt_insecure_request(
296+
self, with_ssl: bool
297+
) -> tuple[bool, Exception | None]:
298+
sock = self._connect() # we expect node to be reachable
299+
if with_ssl:
300+
ctx = ssl.create_default_context()
301+
ctx.check_hostname = False
302+
ctx.verify_mode = ssl.CERT_NONE
303+
try:
304+
sock = ctx.wrap_socket(sock)
305+
sock.do_handshake(block=True)
306+
except ssl.SSLError as e:
307+
# if we fail to create the SSL context: insecure
308+
sock.close()
309+
return False, e
310+
311+
sock.sendall(self._build_dummy_rpc_request())
312+
msg_size_bytes = sock.recv(4) # message byte length (32-bits integer)
313+
if len(msg_size_bytes) == 0:
314+
# server closed connection upon receiving a valid RPC through insecure channel: secure
315+
sock.close()
316+
return True, None
317+
318+
# server did not close connection: read response, msg_bytes is expected to look like:
319+
# 04 # protobuf message length (response header)
320+
# 0805 # field 1: call_id
321+
# 1000 # field 2: is_error
322+
# 8701 # protobuf message length (response payload)
323+
# 0a34... # protobuf response payload
324+
msg_size = int.from_bytes(msg_size_bytes, byteorder="big")
325+
msg_bytes = sock.recv(msg_size)
326+
sock.close()
327+
return False, Exception(
328+
f"Received RPC response from node, hex: {bytes.hex(msg_bytes)}"
329+
)
330+
331+
def no_ssl_rejected(self) -> tuple[bool, Exception | None]:
332+
return self._attempt_insecure_request(with_ssl=False)
333+
334+
def unauthenticated_rejected(self) -> tuple[bool, Exception | None]:
335+
return self._attempt_insecure_request(with_ssl=True)
336+
337+
def legacy_ssl_version_rejected(self) -> tuple[bool, Exception | None]:
338+
"""
339+
Returns True if the node rejects the usage of the legacy cryptographic
340+
protocols TLSv1 and TLSv1.1.
341+
This is detected by attempting to establish a connection with the node
342+
forcing the client to use a TLS version < 1.2 and validating that the
343+
connection fails with the expected error message.
344+
345+
Modern libraries and Python have dropped support for TLS versions older than 1.2, as these are now considered legacy.
346+
347+
To be able to test those old protocols, we manually send TLS packets (captured from legacy code) and parse the result.
348+
Parsing is limited, but should be good enough for our cases.
349+
"""
350+
351+
def _build_client_hello():
352+
"""Builds a client hello"""
353+
354+
return bytes.fromhex(
355+
"16" # Handshake
356+
"0301" # TLS Version: 1.0
357+
"0063" # Length
358+
"01" # Handshake type: Client hello
359+
"00005f" # Length
360+
"0302" # TLS Version: 1.1
361+
"4895335bae2d2d929e34bdd5ccc89d800807bb01bbaaa7bf86efbb83a9249206" # Random value
362+
"00" # Session ID Length
363+
"0012" # Cipher suite Length
364+
"c00ac0140039c009c01300330035002f00ff" # Cipher suites
365+
"01" # Compression method length
366+
"00" # No compression
367+
"0024" # Extentions length
368+
"000b000403000102000a000c000a001d0017001e00190018002300000016000000170000" # Extensions
369+
)
370+
371+
def _is_protocol_failure(data):
372+
"""Tests whether the server sends a protocol failure."""
373+
# Format:
374+
# 15 TLS Alert
375+
# 03 01 TLS Version (Ignored)
376+
# 00 02 Length (Ignored)
377+
# 02 Level: Fatal (Ignored)
378+
# 46 Description: Protocol version
379+
380+
content_type = data[0]
381+
alert_description = data[6]
382+
383+
return content_type == 0x15 and alert_description == 0x46
384+
385+
try:
386+
with socket.create_connection((self.host, self.port), timeout=5) as sock:
387+
sock.sendall(_build_client_hello())
388+
data = sock.recv(1024)
389+
390+
if not data:
391+
return True, None # Server will close the connection without reply
392+
393+
return _is_protocol_failure(data), None
394+
except Exception as e:
395+
return False, e

0 commit comments

Comments
 (0)