Skip to content

Commit a2b9c3f

Browse files
committed
fix: avoid TSaslClientTransport reuse to eliminate server-side SASL noise
1 parent a9ad3a3 commit a2b9c3f

2 files changed

Lines changed: 62 additions & 18 deletions

File tree

pyiceberg/catalog/hive.py

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,17 @@
142142

143143

144144
class _HiveClient:
145-
"""Helper class to nicely open and close the transport."""
145+
"""Helper class to nicely open and close the transport.
146146
147-
_transport: TTransport
147+
``TSaslClientTransport`` instances are single-use: ``close()`` disposes
148+
the SASL session and the same instance cannot be reopened. The
149+
transport is therefore created lazily in ``__enter__`` rather than held
150+
on the class, which lets the ``_HiveClient`` itself live for the
151+
duration of a ``HiveCatalog`` while still honouring the SASL transport
152+
lifecycle.
153+
"""
154+
155+
_transport: "TTransport | None"
148156
_ugi: list[str] | None
149157

150158
def __init__(
@@ -158,7 +166,7 @@ def __init__(
158166
self._kerberos_auth = kerberos_auth
159167
self._kerberos_service_name = kerberos_service_name
160168
self._ugi = ugi.split(":") if ugi else None
161-
self._transport = self._init_thrift_transport()
169+
self._transport = None
162170

163171
def _init_thrift_transport(self) -> TTransport:
164172
url_parts = urlparse(self._uri)
@@ -169,31 +177,28 @@ def _init_thrift_transport(self) -> TTransport:
169177
return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service=self._kerberos_service_name)
170178

171179
def _client(self) -> Client:
180+
assert self._transport is not None # set by __enter__
172181
protocol = TBinaryProtocol.TBinaryProtocol(self._transport)
173182
client = Client(protocol)
174183
if self._ugi:
175184
client.set_ugi(*self._ugi)
176185
return client
177186

178187
def __enter__(self) -> Client:
179-
"""Make sure the transport is initialized and open."""
180-
if not self._transport.isOpen():
181-
try:
182-
self._transport.open()
183-
except (TypeError, TTransport.TTransportException):
184-
# Close the old transport before reinitializing to prevent resource leaks
185-
try:
186-
self._transport.close()
187-
except Exception:
188-
pass
189-
# reinitialize _transport
190-
self._transport = self._init_thrift_transport()
191-
self._transport.open()
192-
return self._client() # recreate the client
188+
"""Initialize and open a fresh transport for this entry.
189+
190+
A new ``TSaslClientTransport`` is created on every entry because
191+
the underlying SASL session cannot be reused after ``close()``.
192+
Reusing a closed transport would leave a half-opened TCP socket
193+
and SASL handshake EOF noise on the server.
194+
"""
195+
self._transport = self._init_thrift_transport()
196+
self._transport.open()
197+
return self._client()
193198

194199
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
195200
"""Close transport if it was opened."""
196-
if self._transport.isOpen():
201+
if self._transport is not None and self._transport.isOpen():
197202
self._transport.close()
198203

199204

tests/catalog/test_hive.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,3 +1410,42 @@ def test_create_hive_client_with_kerberos_using_context_manager(
14101410
# closing and re-opening work as expected.
14111411
with client as open_client:
14121412
assert open_client._iprot.trans.isOpen()
1413+
1414+
1415+
def test_kerberized_client_uses_fresh_transport_per_entry(
1416+
kerberized_hive_metastore_fake_url: str,
1417+
) -> None:
1418+
"""TSaslClientTransport is single-use — each entry must create a new one.
1419+
1420+
The underlying SASL session cannot be reused after close(). Each
1421+
context-manager entry must therefore produce a fresh transport
1422+
instance rather than reopening the previous one (which would leave
1423+
a half-opened TCP socket and SASL handshake EOF noise on the server).
1424+
"""
1425+
client = _HiveClient(
1426+
uri=kerberized_hive_metastore_fake_url,
1427+
kerberos_auth=True,
1428+
)
1429+
# No transport should exist until the context manager is entered.
1430+
assert client._transport is None
1431+
with (
1432+
patch(
1433+
"puresasl.mechanisms.kerberos.authGSSClientStep",
1434+
return_value=None,
1435+
),
1436+
patch(
1437+
"puresasl.mechanisms.kerberos.authGSSClientResponse",
1438+
return_value=base64.b64encode(b"Some Response"),
1439+
),
1440+
patch(
1441+
"puresasl.mechanisms.GSSAPIMechanism.complete",
1442+
return_value=True,
1443+
),
1444+
):
1445+
with client:
1446+
first_transport_id = id(client._transport)
1447+
1448+
with client:
1449+
second_transport_id = id(client._transport)
1450+
1451+
assert first_transport_id != second_transport_id

0 commit comments

Comments
 (0)