Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR enhances the Spark Connect gRPC proxy by adding gRPC health-check support and making runtime behavior changes intended to improve scalability (non-blocking authentication + channel pooling eviction).
Changes:
- Register gRPC Health Checking service and set SERVING / NOT_SERVING statuses in
serve(). - Refactor authentication to run inside async handler behaviors (using
asyncio.to_thread) instead of in the synchronousservice()method. - Add LRU eviction to
ChannelPooland extend tests to cover new behaviors (health registration, async auth, LRU eviction).
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
src/spark_connect_proxy/proxy.py |
Adds health-check registration; defers auth into async behaviors; introduces LRU channel pool eviction and shutdown behavior changes. |
tests/test_proxy.py |
Updates tests for deferred auth, adds LRU tests, and adds coverage for health-check registration in serve(). |
pyproject.toml |
Adds grpcio-health-checking dependency. |
uv.lock |
Locks new dependency set including grpcio-health-checking and its transitive deps (e.g., protobuf). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @pytest.mark.asyncio | ||
| async def test_serve_registers_health_check(self) -> None: | ||
| """serve() registers gRPC health check service.""" | ||
| mock_server = AsyncMock() | ||
| mock_server.wait_for_termination = AsyncMock(side_effect=asyncio.CancelledError) | ||
| mock_server.add_generic_rpc_handlers = MagicMock() | ||
| mock_server.add_insecure_port = MagicMock() | ||
|
|
||
| with ( | ||
| patch("spark_connect_proxy.proxy.aio.server", return_value=mock_server), | ||
| patch("spark_connect_proxy.proxy.TokenValidator"), | ||
| patch("spark_connect_proxy.proxy.ChannelPool") as mock_pool_cls, | ||
| patch("spark_connect_proxy.proxy.health.HealthServicer") as mock_health_cls, | ||
| patch( | ||
| "spark_connect_proxy.proxy.health_pb2_grpc.add_HealthServicer_to_server" | ||
| ) as mock_add, | ||
| ): | ||
| mock_pool = MagicMock() | ||
| mock_pool.close_all = AsyncMock() | ||
| mock_pool_cls.return_value = mock_pool | ||
|
|
||
| mock_health = AsyncMock() | ||
| mock_health_cls.return_value = mock_health | ||
|
|
||
| with pytest.raises(asyncio.CancelledError): | ||
| await serve(ProxySettings()) | ||
|
|
||
| # Health servicer should be added to the server | ||
| mock_add.assert_called_once_with(mock_health, mock_server) | ||
| # Health status should be set to SERVING | ||
| assert mock_health.set.await_count >= 2 # overall + service-specific | ||
|
|
There was a problem hiding this comment.
The health-check registration tests model HealthServicer as AsyncMock and assert awaits on set(). If HealthServicer.set() is synchronous (as in grpcio-health-checking), these tests will pass while production code fails. After switching to the correct aio HealthServicer, update the tests to treat set() as a normal method and assert it was called with the expected service names/status values.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 5 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| context.abort = AsyncMock() | ||
|
|
||
| with pytest.raises(TimeoutError): |
There was a problem hiding this comment.
test_authenticate_backend_unreachable currently expects a TimeoutError from _authenticate(), but in real grpc.aio execution context.abort() raises an RPC exception and the TimeoutError will not propagate. To better match production behavior (and be consistent with the other auth-failure tests), set context.abort to raise grpc.RpcError (or grpc.aio.AbortError) and assert that exception instead while still checking the abort arguments.
| context.abort = AsyncMock() | |
| with pytest.raises(TimeoutError): | |
| context.abort = AsyncMock(side_effect=grpc.RpcError("abort")) | |
| with pytest.raises(grpc.RpcError): |
| # Authenticate and resolve backend target | ||
| Raises: | ||
| Aborts the gRPC context with UNAUTHENTICATED on auth failure, | ||
| or UNAVAILABLE if the backend is not reachable. |
There was a problem hiding this comment.
The _authenticate() docstring says the backend-unreachable case aborts with UNAVAILABLE, but the implementation currently aborts with FAILED_PRECONDITION. Please update either the docstring or the status code so behavior and documentation match.
| or UNAVAILABLE if the backend is not reachable. | |
| or FAILED_PRECONDITION if the backend is not reachable. |
| await context.abort( | ||
| grpc.StatusCode.FAILED_PRECONDITION, | ||
| f"Spark Connect server at {target} is not reachable. " | ||
| f"Please ensure you have logged in to BERDL JupyterHub " | ||
| f"and your notebook's Spark Connect service is running.", | ||
| ) |
There was a problem hiding this comment.
Backend connectivity timeout currently aborts with grpc.StatusCode.FAILED_PRECONDITION. For an unreachable backend, gRPC conventions typically use UNAVAILABLE (or DEADLINE_EXCEEDED if you want to surface the timeout) so clients can apply appropriate retry/backoff behavior. Consider switching the status code (and updating tests accordingly).
| pool = ChannelPool(max_size=settings.MAX_CHANNELS_PER_BACKEND) | ||
| handler = SparkConnectProxyHandler(settings, validator, pool) |
There was a problem hiding this comment.
serve() passes settings.MAX_CHANNELS_PER_BACKEND into ChannelPool(max_size=...). However ProxySettings describes MAX_CHANNELS_PER_BACKEND as a per-backend limit, while ChannelPool's max_size is a global cap on cached targets/channels. This mismatch can lead to unexpected aggressive eviction; consider renaming the setting and updating its description, or adjusting ChannelPool to enforce a true per-backend limit.
No description provided.