Skip to content
Open
46 changes: 46 additions & 0 deletions examples/protien_binding_usecase/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# AlphaFold Pipeline Designs

This document explains the two pipeline designs for running AlphaFold tasks with support for multiple structures and GPU binding.

---

## 1. Single Pipeline with Parallel Structures (Current)

A single pipeline manages multiple structures.
AlphaFold tasks run concurrently within the same pipeline, each bound to a GPU.

```
Single Pipeline with Parallel Structures
----------------------------------------
[Pipeline]
|
+--> [AlphaFold A] --> [Output A] (GPU0)
+--> [AlphaFold B] --> [Output B] (GPU1)
+--> [AlphaFold C] --> [Output C] (GPU2)
+--> [AlphaFold D] --> [Output D] (GPU3)
```

- One pipeline orchestrates all structures.

## 2. Separate Pipelines Design (Supported and can be enabled)

Each structure is processed by its own pipeline.

```
Separate Pipelines Design
-------------------------
[Pipeline 1] --> [AlphaFold A] --> [Output A]
GPU0

[Pipeline 2] --> [AlphaFold B] --> [Output B]
GPU1

[Pipeline 3] --> [AlphaFold C] --> [Output C]
GPU2
```

- Each pipeline launches independently.
- GPU allocation can be set per pipeline.
- Suitable if users prefer keeping pipelines isolated.

---
9 changes: 8 additions & 1 deletion examples/protien_binding_usecase/run_protein_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ async def adaptive_decision(pipeline: ProteinBindingPipeline) -> Optional[Dict[s

name, *_, score_str = line.split(',')
protein = name.split('.')[0]

score = float(score_str)
pipeline.logger.pipeline_log('Appending current scores to the list of scores')
if protein not in pipeline.score_history: # Appending scores
pipeline.score_history[protein] = []
pipeline.score_history[protein].append(score)
Expand All @@ -63,6 +65,7 @@ async def adaptive_decision(pipeline: ProteinBindingPipeline) -> Optional[Dict[s

try:
decision = await adaptive_criteria(protein, scores, pipeline)
pipeline.logger.pipeline_log(f'Adaptive descision: {decision}')
except Exception as e:
logger.error(e)
continue
Expand All @@ -88,6 +91,8 @@ async def adaptive_decision(pipeline: ProteinBindingPipeline) -> Optional[Dict[s
'type': type(pipeline),
'adaptive_fn': adaptive_decision,
'config': {
'is_child': True,
'start_pass': pipeline.passes,
'passes': pipeline.passes,
'iter_seqs': sub_iter_seqs,
'seq_rank': pipeline.seq_rank + 1,
Expand All @@ -99,10 +104,12 @@ async def adaptive_decision(pipeline: ProteinBindingPipeline) -> Optional[Dict[s
# Submit the request
pipeline.submit_child_pipeline_request(new_config)

pipeline.finalize()
pipeline.finalize(sub_iter_seqs)

if not pipeline.fasta_list_2:
pipeline.kill_parent = True
else:
pipeline.previous_scores = copy.deepcopy(pipeline.current_scores)
Comment on lines +111 to +112
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential data loss bug here. If sub_iter_seqs is not empty but a child pipeline cannot be created (e.g., pipeline.sub_order >= MAX_SUB_PIPELINES), this else block is executed. The proteins in sub_iter_seqs have already been removed from pipeline.iter_seqs on line 76. Since this block only updates previous_scores, those proteins are effectively dropped from any further processing. They should be added back to the current pipeline's iter_seqs if they are not moved to a child.

Consider this implementation:

    else:
        if sub_iter_seqs:
            # If a child pipeline could not be created, add the sequences back to the parent.
            pipeline.iter_seqs.update(sub_iter_seqs)
        pipeline.previous_scores = copy.deepcopy(pipeline.current_scores)



async def impress_protein_bind() -> None:
Expand Down
48 changes: 29 additions & 19 deletions src/impress/pipelines/protein_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ def __init__(self, name, flow, configs=None, **kwargs):
# Execution metadata
if configs is None:
configs = {}

self.is_child: bool = kwargs.get("is_child", False)
self.passes = kwargs.get("passes", 1)
self.start_pass: int = kwargs.get("start_pass", 1)
self.step_id = kwargs.get("step_id", 1)
self.seq_rank = kwargs.get("seq_rank", 0)
self.num_seqs = kwargs.get("num_seqs", 10)
Expand All @@ -44,7 +47,7 @@ def __init__(self, name, flow, configs=None, **kwargs):
)
self.output_path_mpnn = os.path.join(self.output_path, "mpnn")
self.output_path_af = os.path.join(
self.output_path, "/af/prediction/best_models"
self.output_path, "af/prediction/best_models"
)

# might have to do outside of initialization, so new pipelines
Expand All @@ -64,6 +67,7 @@ def set_up_new_pipeline_dirs(self, new_pipeline_name):
# all directories to create
subdirs = [
"af/fasta",
"af/prediction",
"af/prediction/best_models",
"af/prediction/best_ptm",
"af/prediction/dimer_models",
Expand All @@ -83,9 +87,7 @@ def register_pipeline_tasks(self):
"""Register all pipeline tasks"""

@self.auto_register_task() # MPNN
async def s1(task_description=None):
if task_description is None:
task_description = {"ranks": 1}
async def s1(task_description={"gpus_per_rank": 1}): # noqa: B006
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a mutable default argument like a dictionary is generally discouraged as it can lead to unexpected behavior if the object is modified. While it is not modified here, the previous pattern of using None as a default and creating the dictionary within the function is safer and improves maintainability. The noqa: B006 indicates awareness, but adhering to best practices would be better in the long run.

A safer implementation would be:

async def s1(task_description=None):
    if task_description is None:
        task_description = {"gpus_per_rank": 1}
    # ... rest of function

mpnn_script = os.path.join(self.base_path, "mpnn_wrapper.py")
output_dir = os.path.join(self.output_path_mpnn, f"job_{self.passes}")

Expand Down Expand Up @@ -142,9 +144,7 @@ async def s3():

# alphafold, must be run separately for each structure one at a time!
@self.auto_register_task()
async def s4(target_fasta, task_description=None):
if task_description is None:
task_description = {"gpus_per_rank": 1}
async def s4(target_fasta, task_description={"gpus_per_rank": 1}): # noqa: B006
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the comment on s1, using a mutable default argument here is risky for future maintenance. It's safer to initialize the default to None and create the dictionary inside the function body to avoid potential side effects if the function logic changes.

A safer implementation would be:

async def s4(target_fasta, task_description=None):
    if task_description is None:
        task_description = {"gpus_per_rank": 1}
    # ... rest of function

cmd = (
f"/bin/bash {self.base_path}/af2_multimer_reduced.sh "
f"{self.output_path}/af/fasta/ "
Expand All @@ -154,10 +154,8 @@ async def s4(target_fasta, task_description=None):

return cmd

@self.auto_register_task() # plddt_extract
async def s5(task_description=None):
if task_description is None:
task_description = {}
@self.auto_register_task() # pLDTT_extract
async def s5(task_description={}): # noqa: B006
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

As with s1 and s4, using a mutable default argument is not recommended. To improve code safety and maintainability, please consider using None as the default and creating the dictionary inside the function.

A safer implementation would be:

async def s5(task_description=None):
    if task_description is None:
        task_description = {}
    # ... rest of function

return (
f"python3 {self.base_path}/plddt_extract_pipeline.py "
f"--path={self.base_path} "
Expand Down Expand Up @@ -185,13 +183,22 @@ async def run(self):
while self.passes <= self.max_passes:
self.logger.pipeline_log(f"Starting pass {self.passes}")

self.logger.pipeline_log("Submitting MPNN task")
await self.s1(task_description={"pre_exec": TASK_PRE_EXEC})
self.logger.pipeline_log("MPNN task finished")
if self.is_child and self.passes == self.start_pass:
self.logger.pipeline_log(
"Skipping MPNN and Ranking steps for this child pipeline "
"in the current pass only."
)

pass
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pass statement here is unnecessary because the if block already contains a logging statement and is not empty. It can be safely removed for cleaner code.


self.logger.pipeline_log("Submitting sequence ranking task")
await self.s2()
self.logger.pipeline_log("Sequence ranking task finished")
else:
self.logger.pipeline_log("Submitting MPNN task")
await self.s1(task_description={"pre_exec": TASK_PRE_EXEC})
self.logger.pipeline_log("MPNN task finished")

self.logger.pipeline_log("Submitting sequence ranking task")
await self.s2()
self.logger.pipeline_log("Sequence ranking task finished")

self.logger.pipeline_log("Submitting scoring task")
fasta_files = await self.s3()
Expand Down Expand Up @@ -245,7 +252,7 @@ async def run(self):
await asyncio.gather(*alphafold_tasks, return_exceptions=True)
self.logger.pipeline_log(f"{len(alphafold_tasks)} Alphafold tasks finished")

self.logger.pipeline_log("Submitting plddt extract")
self.logger.pipeline_log("Submitting pLDTT extraction task")

staged_file = f"af_stats_{self.name}_pass_{self.passes}.csv"

Expand All @@ -260,8 +267,11 @@ async def run(self):
],
}
)
self.logger.pipeline_log("Plddt extract finished")
self.logger.pipeline_log("pLDTT extract finished")

await self.run_adaptive_step(wait=True)

if self.kill_parent:
break

self.passes += 1
Loading