This implements the flux abstraction layer as used in RP#448
This implements the flux abstraction layer as used in RP#448andre-merzky merged 64 commits intodevelfrom
Conversation
Summary of ChangesHello @andre-merzky, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant refactoring and enhancement of the Flux integration within the system. The primary goal is to provide a robust and flexible abstraction layer that can seamlessly handle various Flux versions and allow for the concurrent operation of multiple Flux instances. This change lays the groundwork for improved job management and resource utilization, particularly in environments like Rhapsody, where this code is eventually intended to be used. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## devel #448 +/- ##
==========================================
- Coverage 61.79% 61.23% -0.57%
==========================================
Files 62 66 +4
Lines 7133 7359 +226
==========================================
+ Hits 4408 4506 +98
- Misses 2725 2853 +128 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR implements a flux abstraction layer used in the Rhapsody project, introducing support for different flux versions and enabling multiple concurrent flux instances. The implementation includes a modular flux service architecture, version-specific helper classes, and various test and utility improvements.
Key Changes
- Replaced the old flux module with a new modular flux abstraction layer supporting version detection and concurrent instances
- Added comprehensive flux service and helper classes for both flux v0 and v1
- Improved test infrastructure with better process handling and error reporting
Reviewed Changes
Copilot reviewed 15 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/radical/utils/flux.py | Removed old monolithic flux implementation (642 lines deleted) |
| src/radical/utils/flux/init.py | New flux module entry point with version-aware helper selection |
| src/radical/utils/flux/flux_service.py | New flux service for managing flux instances |
| src/radical/utils/flux/flux_module.py | New flux module wrapper with version detection and spec utilities |
| src/radical/utils/flux/flux_helper_v0.py | New v0-specific flux helper implementation |
| src/radical/utils/flux/flux_helper_v1.py | New v1-specific flux helper implementation |
| tests/integration_tests/test_flux.py | Simplified flux integration tests using new API |
| src/radical/utils/typeddict.py | Performance improvements in TypedDict initialization |
| src/radical/utils/profile.py | Added Yappi profiling context manager |
| src/radical/utils/ids.py | Improved file counter handling with proper error recovery |
| tests/unittests/test_heartbeat.py | Added helper function for better process cleanup |
| src/radical/utils/env.py | Enhanced environment variable handling with extend support |
| tests/unittests/test_env.py | Added conditional check for env_proc |
| tests/unittests/test_typeddict.py | Added shebang and main execution block |
| src/radical/utils/init.py | Updated imports for new flux classes |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| finally: | ||
| try: | ||
| os.close(fd) | ||
| except: | ||
| pass |
There was a problem hiding this comment.
The file descriptor fd may not be defined if an exception occurs before line 257. The finally block should check if fd is defined before attempting to close it.
| def _join(proc, timeout=0.1): | ||
| proc.join(timeout=timeout) | ||
| try: | ||
| os.waitpid(test_proc.pid, os.WNOHANG) |
There was a problem hiding this comment.
The function uses test_proc.pid instead of proc.pid. This will cause a NameError if test_proc is not in scope, or use the wrong process ID.
| os.waitpid(test_proc.pid, os.WNOHANG) | |
| os.waitpid(proc.pid, os.WNOHANG) |
| self._uri = None | ||
| self._proc = None | ||
|
|
||
| self._log.info('%s: found flux uri: %s', self._uid, self.uri) |
There was a problem hiding this comment.
This log message says 'found flux uri' but is actually logging when the service is stopped and uri is None. The message should be 'stopped flux service' or similar.
| self._log.info('%s: found flux uri: %s', self._uid, self.uri) | |
| self._log.info('%s: stopped flux service (uri was: %s)', self._uid, self.uri) |
| return self.__dict__['_data'] | ||
|
|
There was a problem hiding this comment.
The condition checks for '_data' but no longer handles the case where '_data' is not in dict. This could cause a KeyError since the initialization logic was moved.
| return self.__dict__['_data'] | |
| if '_data' in self.__dict__: | |
| return self.__dict__['_data'] | |
| else: | |
| raise AttributeError("'_data' attribute not initialized") |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-structured refactoring of the Flux integration, creating an abstraction layer that supports multiple Flux versions and concurrent instances. The changes are extensive, replacing the old flux.py with a new flux package containing version-specific helpers and a service class. My review has identified a few critical and high-severity issues in the new helper classes, primarily related to thread management, potential deadlocks, and correctness in the shutdown logic, which should be addressed to ensure the stability of this new abstraction.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
@mtitov : this should be ready now. |
This now covers different flux versions and also allows for multiple flux instances to work concurrently.
This code should eventually move to Rhapsody.