-
Notifications
You must be signed in to change notification settings - Fork 9
Add start_profile/stop_profile implementation #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cdf02b7 to
df76a69
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements start_profile and stop_profile functionality to enable profiling of the inference engine. The implementation follows the existing request/response pattern used for other operations like heartbeat and metrics.
- Adds new protocol types (START_PROFILE, STOP_PROFILE) for request/response communication
- Implements profile handlers in the disagg worker to call engine profiling methods
- Adds proxy methods to send profile requests to specific workers and handle responses
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 14 comments.
| File | Description |
|---|---|
| lm_service/protocol/protocol.py | Adds START_PROFILE and STOP_PROFILE constants and ProfileRequest/ProfileResponse message structures |
| lm_service/workers/vllm/disagg_worker.py | Refactors request handling to use a decoder map and adds profile request handlers |
| lm_service/apis/vllm/proxy.py | Refactors response handling to use a decoder map, adds start_profile/stop_profile methods, and improves worker registration |
Comments suppressed due to low confidence (1)
lm_service/apis/vllm/proxy.py:590
- The check on line 584-590 excludes RequestType.EXIT and RequestType.REGISTER from the 'request may have been aborted' warning, but does not include the new RequestType.START_PROFILE and RequestType.STOP_PROFILE. If these profile responses arrive for requests that are no longer in the queue, they will generate spurious warnings. Add RequestType.START_PROFILE and RequestType.STOP_PROFILE to the exclusion list on lines 585-590, or add ResponseType.START_PROFILE and ResponseType.STOP_PROFILE if those are the actual response types used.
if resp.request_id not in self.queues:
if resp_type not in (
ResponseType.HEARTBEAT,
ResponseType.METRICS,
RequestType.EXIT,
RequestType.REGISTER,
):
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if req_type in decoder_map: | ||
| req = decoder_map[req_type]["decoder"].decode(req_data) | ||
| else: | ||
| raise Exception(f"Unknown Request Type: {req_type.decode()}.") | ||
| raise Exception("Unknown Request Type.") | ||
| if req_type == RequestType.ENCODE or req_type == RequestType.PREFILL: | ||
| req.sampling_params.max_tokens = 1 | ||
| await decoder_map[req_type]["handler"](req) |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If req_type is not in decoder_map, the code raises an exception on line 289, but then line 292 tries to access decoder_map[req_type] which would have already raised. However, the logic issue is that lines 290-292 will fail if req_type is not in decoder_map. The error handling should include an early return or the subsequent logic should be inside the if block. Move lines 290-292 inside the if block to ensure they only execute when req_type is valid.
| cluster = self.instance_clusters.get(server_type, None) | ||
| if cluster is None: | ||
| logger.error( | ||
| f"_worker_register_handler fail, unknown server type {server_type}" | ||
| ) | ||
| return |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 488-494 check if server_type is in self.active_types, then immediately check if the cluster exists. However, if server_type is in self.active_types, the cluster should have been initialized in init (line 189). This creates defensive code that may hide initialization bugs. Consider either: (1) removing the redundant cluster None check since active_types guarantees initialization, or (2) removing the active_types check and only checking cluster existence.
| cluster = self.instance_clusters.get(server_type, None) | |
| if cluster is None: | |
| logger.error( | |
| f"_worker_register_handler fail, unknown server type {server_type}" | |
| ) | |
| return | |
| cluster = self.instance_clusters[server_type] |
| ): | ||
| logger.info("Profiling stopped successfully") | ||
| else: | ||
| logger.error(f"Failed to stop profiling: {response}") |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 830-837 have the same issue as start_profile: errors are logged but not raised, and the method returns None in both success and failure cases. This makes it impossible for callers to determine if profiling actually stopped. Consider raising an exception when the response is not a successful ProfileResponse, or return a boolean to indicate success/failure.
| logger.error(f"Failed to stop profiling: {response}") | |
| logger.error(f"Failed to stop profiling: {response}") | |
| raise RuntimeError(f"Failed to stop profiling: {response}") |
| ) | ||
| await self._handle_response(req, msg) | ||
|
|
||
| async def _stop_profile_handler(self, req: ProfileRequest): |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _stop_profile_handler method (lines 496-499) is missing a docstring while the _start_profile_handler method has one (line 466). Add a docstring to _stop_profile_handler for consistency: """Stop profiling on the engine."""
| async def _stop_profile_handler(self, req: ProfileRequest): | |
| async def _stop_profile_handler(self, req: ProfileRequest): | |
| """Stop profiling on the engine.""" |
| request = ProfileRequest( | ||
| request_id=request_id, proxy_addr=self.proxy_addr | ||
| ) | ||
| q: asyncio.Queue = asyncio.Queue() |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue created on line 791 has no timeout or size limit. If the worker never responds, the await q.get() on line 801 will hang indefinitely. Consider using asyncio.wait_for() with a timeout when getting from the queue, similar to how other async operations in the codebase might handle timeouts.
| request = ProfileRequest( | ||
| request_id=request_id, proxy_addr=self.proxy_addr | ||
| ) | ||
| q: asyncio.Queue = asyncio.Queue() |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue created on line 819 has the same issue as in start_profile: no timeout or size limit. If the worker never responds, the await q.get() on line 830 will hang indefinitely. Consider using asyncio.wait_for() with a timeout when getting from the queue.
| q: asyncio.Queue = asyncio.Queue() | ||
| self.queues[request_id] = q | ||
|
|
||
| try: | ||
| payload = self.encoder.encode(request) | ||
| msg = (RequestType.START_PROFILE, payload) | ||
| socket = await self._get_socket_and_server_types_from_addr( | ||
| addr, server_type | ||
| ) | ||
| await socket.send_multipart(msg, copy=False) | ||
| response = await q.get() |
Copilot
AI
Nov 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new start_profile and stop_profile methods (lines 781-841) lack test coverage. Given that the repository has tests for other proxy functionality (tests/test_proxy.py), these new methods should also have tests to verify correct behavior, error handling, and response processing.
df76a69 to
a198f27
Compare
Signed-off-by: amy-why-3459 <[email protected]>
a198f27 to
2c7e2f7
Compare
Description
Type of Change
Related Issues
Changes Made
Testing
Test Coverage
INFO 11-29 11:22:45 [config/model.py:1510] Using max model len 128000
INFO 11-29 11:22:45 [proxy.py:304] Connected to worker tcp://[2071:192:168:2::181]:39000 success
INFO 11-29 11:22:45 [proxy.py:304] Connected to worker tcp://[2071:192:168:2::181]:40000 success
INFO 11-29 11:23:43 [proxy.py:806] Profiling started successfully
Request(0) generated_text: The
Request(0) generated_text: The text
Request(0) generated_text: The text in
Request(0) generated_text: The text in the
Request(0) generated_text: The text in the illustration
Request(0) generated_text: The text in the illustration reads
Request(0) generated_text: The text in the illustration reads:
Documentation
Checklist
Screenshots/Output
Additional Notes
Reviewer Checklist