-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate dsub #195
base: master
Are you sure you want to change the base?
Integrate dsub #195
Conversation
Merge Reed Comp Bio SPRAS master branch
…mporary fix to use dsub as framework with oi1, change to gcloud storage cp for larger file transfers
…dsub file paths are handled in container command
Some additional context: we started using dsub for All of Us
However
It's not yet clear what All of Us will do to support migration. I would still like to proceed with merging in this code though, but we may be okay keeping documentation light if our use case is already deprecated. See also #130. |
As Tony pointed out in the PR review, Google Cloud Life Sciences is being deprecated, so we're not sure how much longer this will be around. Instead of going through and giving the `dsub` framework verbose documentation, we explicitly state it's usage is experimental and warn the user that they should already know what they're doing if they plan to use it.
This prints a warning for the user if they select the `dsub` framework. Since we're unsure whether we'll need to support dsub in the future (as our primary motivation for using it is being deprecated), this warning should prevent potential users from getting their hopes up that we have long-term plans for it.
@agitter, I believe @nisthapanda tested this integration with GCLS and verified that the merge didn't break anything for her. Since we're not sure what the long-term plan is for Since this isn't breaking tests in either direction, I think it's ready for a proper review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look pretty good overall. I mostly made small formatting suggestions.
@@ -114,9 +114,11 @@ def process_config(self, raw_config): | |||
# However, if we get a bad value, we raise an exception. | |||
if "container_framework" in raw_config: | |||
container_framework = raw_config["container_framework"].lower() | |||
if container_framework not in ("docker", "singularity"): | |||
if container_framework not in ("docker", "singularity", "dsub"): | |||
msg = "SPRAS was configured to run with an unknown container framework: '" + raw_config["container_framework"] + "'. Accepted values are 'docker' or 'singularity'." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to update the options in this line too
@@ -41,6 +42,79 @@ def convert_docker_path(src_path: PurePath, dest_path: PurePath, file_path: Unio | |||
rel_path = file_path.relative_to(src_path) | |||
return PurePosixPath(dest_path, rel_path) | |||
|
|||
def download_gcs(gcs_path: str, local_path: str, is_dir: bool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My IDE shows a warning requesting 2 blank lines before these new functions.
# run command | ||
subprocess.run(cmd, shell=True) | ||
|
||
if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nisthapanda why do we check is_dir
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the is_dir
here.
subprocess.run(cmd, shell=True) | ||
|
||
if is_dir and Path(Path(local_path)/'gcs_temp.txt').exists(): | ||
os.remove(Path(Path(local_path)/'gcs_temp.txt')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
os.remove(Path(Path(local_path)/'gcs_temp.txt')) | |
Path(Path(local_path)/'gcs_temp.txt').unlink() |
Does this work? It's a bit simpler to me to stick with pathlib operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works.
def upload_gcs(local_path: str, gcs_path: str, is_dir: bool): | ||
# check if path exists in cloud storage | ||
exists = len(subprocess.run(f'gcloud storage ls {gcs_path}', shell=True, capture_output=True, text=True).stdout) | ||
# if path exists rsyc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# if path exists rsyc | |
# if path exists rsync |
current owner and group IDs. | ||
Does not modify the owner or group for existing files modified by the container. | ||
@param container: name of the container in the Google Cloud Container Registry | ||
@param command: command to run in the container |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param command: command to run in the container | |
@param command: command to run |
@@ -258,3 +334,74 @@ def prepare_volume(filename: Union[str, PurePath], volume_base: Union[str, PureP | |||
src = parent | |||
|
|||
return (src, dest), container_filename | |||
|
|||
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True'): | |
def run_container_dsub(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: str = 'SPRAS=True') -> str: |
|
||
workspace_bucket = os.getenv('WORKSPACE_BUCKET') | ||
# Add path in the workspace bucket and label for dsub command for each volume | ||
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] | |
dsub_volumes = [(src, dst, workspace_bucket + str(dst), "INPUT_" + str(i),) for i, (src, dst) in enumerate(volumes)] |
flags['image'] = container | ||
flags['env'] = environment | ||
flags['input-recursive'] = [vol[3]+'='+vol[2] for vol in dsub_volumes] | ||
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir | |
flags['output-recursive'] = "OUTPUT=" + workspace_bucket + working_dir |
download_gcs(local_path=str(src), gcs_path=gcs_path, is_dir=True) | ||
|
||
# return location of dsub logs in WORKSPACE_BUCKET | ||
return 'dsub logs: {logs}'.format(logs = flags['logging']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return 'dsub logs: {logs}'.format(logs = flags['logging']) | |
return 'dsub logs: {logs}'.format(logs=flags['logging']) |
Opening this PR in draft mode for now while @nisthapanda work through collaborating on documentation updates and other testing.