Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Custom Modifications in Multiprocessing #303

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

GeorgWa
Copy link
Collaborator

@GeorgWa GeorgWa commented Mar 10, 2025

This PR adds support for using custom modifications in multiprocessing workflows, which was previously not supported. The key changes include:

  • Added get_custom_mods() function to serialize user-defined modifications
  • Added init_custom_mods() function to initialize custom modifications in worker processes
  • Updated multiprocessing implementations in calc_precursor_isotope_info_mp and calc_precursor_isotope_intensity_mp to pass custom modifications to worker processes
  • Removed the warning and fallback to single-process mode when custom modifications are present
  • Added comprehensive unit tests for the new modification serialization functionality
  • Improved documentation for multiprocessing functions
  • Fixed calculation in _count_batchify_df for more accurate progress reporting

These changes allow users to define custom modifications and still benefit from multiprocessing performance when calculating isotope patterns, which was previously not possible.

@GeorgWa GeorgWa requested a review from mschwoer March 10, 2025 10:38
@@ -463,6 +463,55 @@ def has_custom_mods():
return len(MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]) > 0


def get_custom_mods():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two new functions: get_custom_mods() to extract custom modifications as a serializable dictionary, and init_custom_mods() to initialize these modifications in child processes. This enables transferring custom modification data to worker processes during multiprocessing.

def get_custom_mods():
    "
    Returns a dictionary of user-defined modifications that can be serialized and passed to child processes.

    Returns
    -------
    dict
        Dictionary with modification names as keys and modification details as values
    "
    if not has_custom_mods():
        return {}

    custom_mods = MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]
    result = {}

    for mod_name, row in custom_mods.iterrows():
        result[mod_name] = {
            "composition": row["composition"],
            "modloss_composition": row["modloss_composition"],
            "smiles": row["smiles"],
        }

    return result


def init_custom_mods(custom_mods_dict):
    "
    Initialize custom modifications in a child process from a dictionary.

    Parameters
    ----------
    custom_mods_dict : dict
        Dictionary of custom modifications as returned by get_custom_mods()
    "
    if not custom_mods_dict:
        return

    for mod_name, mod_info in custom_mods_dict.items():
        _add_a_new_modification(
            mod_name=mod_name,
            composition=mod_info["composition"],
            modloss_composition=mod_info["modloss_composition"],
            smiles=mod_info["smiles"],
        )

    # Update all dictionaries after adding modifications
    update_all_by_MOD_DF()

@@ -483,14 +483,27 @@ def _batchify_df(df_group, mp_batch_size):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved the _count_batchify_df function by using integer division to calculate the number of batches more efficiently. This avoids creating unnecessary range objects just to count iterations.

def _batchify_df(df_group, mp_batch_size):
    "Internal funciton for multiprocessing"
    for _, df in df_group:
        for i in range(0, len(df), mp_batch_size):
            yield df.iloc[i : i + mp_batch_size, :]


def _count_batchify_df(df_group, mp_batch_size):
    "Internal function"
    count = 0
    for _, group in df_group:
        count += (len(group) + mp_batch_size - 1) // mp_batch_size
    return count

return count


def _init_worker(custom_mods_dict):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new _init_worker function that serves as an initializer for worker processes, handling the initialization of custom modifications in each worker process.

def _init_worker(custom_mods_dict):
    "
    Initialize a worker process with custom modifications.

    Parameters
    ----------
    custom_mods_dict : dict
        Dictionary of custom modifications as returned by get_custom_mods()
    "
    from alphabase.constants.modification import init_custom_mods

    init_custom_mods(custom_mods_dict)

@@ -501,56 +514,74 @@ def calc_precursor_isotope_info_mp(
min_right_most_intensity: float = 0.2,
min_precursor_num_to_run_mp: int = 10000,
) -> pd.DataFrame:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely refactored the calc_precursor_isotope_info_mp function to add support for custom modifications. Key improvements: 1) Added proper initialization of worker processes with custom mods, 2) Enhanced documentation with detailed parameter descriptions, 3) Better progress bar handling, and 4) Simplified the early return condition.

# `progress_bar` should be replaced by more advanced tqdm wrappers created by Sander
# I will leave it to alphabase.utils
def calc_precursor_isotope_info_mp(
    precursor_df: pd.DataFrame,
    processes: int = 8,
    mp_batch_size: int = 10000,
    progress_bar=None,
    min_right_most_intensity: float = 0.2,
    min_precursor_num_to_run_mp: int = 10000,
) -> pd.DataFrame:
    "Calculate isotope info for precursor_df using multiprocessing.

    Parameters
    ----------

    precursor_df : pd.DataFrame
        Precursor_df to calculate isotope info

    processes : int
        Process number. Optional, by default 8

    mp_batch_size : int
        Multiprocessing batch size. Optional, by default 10000.

    progress_bar : tqdm.tqdm
        Progress bar. Optional, by default None

    min_right_most_intensity : float
        The minimal intensity value of the right-most peak relative to apex peak.

    min_precursor_num_to_run_mp : int
        The minimal number of precursors to run multiprocessing. Optional, by default 10000.

    Returns
    -------

    pd.DataFrame
        precursor_df with additional columns:
        - isotope_apex_offset
        - isotope_apex_mz
        - isotope_apex_intensity
        - isotope_right_most_offset
        - isotope_right_most_mz
        - isotope_right_most_intensity
        - isotope_m1_mz
        - isotope_m1_intensity
        - mono_isotope_idx
    "
    if processes <= 1 or len(precursor_df) < min_precursor_num_to_run_mp:
        return calc_precursor_isotope_info(precursor_df)

    # Get custom modifications to pass to worker processes
    from alphabase.constants.modification import get_custom_mods

    custom_mods_dict = get_custom_mods()

    df_list = []
    df_group = precursor_df.groupby("nAA")

    with mp.get_context("spawn").Pool(
        processes, initializer=_init_worker, initargs=(custom_mods_dict,)
    ) as p:
        processing = p.imap(
            partial(
                calc_precursor_isotope_info,
                min_right_most_intensity=min_right_most_intensity,
            ),
            _batchify_df(df_group, mp_batch_size),
        )

        if progress_bar:
            for df in progress_bar(
                processing, _count_batchify_df(df_group, mp_batch_size)
            ):
                df_list.append(df)
        else:
            for df in processing:
                df_list.append(df)
    return pd.concat(df_list)

@@ -653,6 +684,12 @@ def calc_precursor_isotope_intensity_mp(
min_right_most_intensity : float

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhanced the calc_precursor_isotope_intensity_mp function to support custom modifications in multiprocessing. Added worker initialization, improved documentation with better parameter descriptions (especially for the normalize parameter), and made the function's interface more consistent with other multiprocessing functions.

def calc_precursor_isotope_intensity_mp(
    precursor_df,
    max_isotope=6,
    min_right_most_intensity=0.001,
    normalize: typing.Literal["mono", "sum"] = "sum",
    mp_batch_size=1000,
    mp_process_num=8,
    progress_bar=True,
) -> pd.DataFrame:
    "Calculate isotope intensity values for precursor_df using multiprocessing.

    Parameters
    ----------

    precursor_df : pd.DataFrame
        Precursor_df to calculate isotope intensity

    max_isotope : int
        Max isotope number to calculate. Optional, by default 6

    min_right_most_intensity : float
        The minimal intensity value of the right-most peak relative to apex peak.

    normalize : typing.Literal["mono", "sum"]
        How to normalize the isotope intensities.
        "mono": normalize to monoisotopic peak
        "sum": normalize to sum of all peaks
        Optional, by default "sum"

    mp_batch_size : int
        Multiprocessing batch size. Optional, by default 1000.

    mp_process_num : int
        Process number. Optional, by default 8

    progress_bar : bool
        Whether to show progress bar. Optional, by default True

    Returns
    -------

    pd.DataFrame
        precursor_df with additional columns i_0, i_1, i_2, ... i_{max_isotope-1}

    "

    if mp_process_num <= 1:
        return calc_precursor_isotope_intensity(
            precursor_df=precursor_df,
            max_isotope=max_isotope,
            min_right_most_intensity=min_right_most_intensity,
            normalize=normalize,
        )

    # Get custom modifications to pass to worker processes
    from alphabase.constants.modification import get_custom_mods

    custom_mods_dict = get_custom_mods()

    df_list = []
    df_group = precursor_df.groupby("nAA")

    with mp.get_context("spawn").Pool(
        mp_process_num, initializer=_init_worker, initargs=(custom_mods_dict,)
    ) as p:
        processing = p.imap(
            partial(
                calc_precursor_isotope_intensity,
                max_isotope=max_isotope,
                min_right_most_intensity=min_right_most_intensity,
                normalize=normalize,
            ),
            _batchify_df(df_group, mp_batch_size),
        )

        if progress_bar:
            df_list = list(
                tqdm(processing, total=_count_batchify_df(df_group, mp_batch_size))
            )
        else:
            df_list = list(processing)

    return pd.concat(df_list)

@@ -417,9 +416,14 @@ def calc_precursor_isotope_intensity(
mp_batch_size : int, optional
The batch size for multiprocessing.

mp_processes : int, optional
mp_process_num : int, optional
The number of processes for multiprocessing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved documentation for the calc_precursor_isotope_intensity method, especially clarifying the normalize parameter. Removed the check against custom modifications since they are now supported in multiprocessing.

        """Calculate and append the isotope intensity columns into self.precursor_df.
        See `alphabase.peptide.calc_precursor_isotope_intensity` for details.

        Parameters
        ----------

        max_isotope : int, optional
            The maximum isotope to calculate.

        min_right_most_intensity : float, optional
            The minimum intensity of the right most isotope.

        mp_batch_size : int, optional
            The batch size for multiprocessing.

        mp_process_num : int, optional
            The number of processes for multiprocessing.

        normalize : typing.Literal["mono", "sum"], optional
            How to normalize the isotope intensities.
            "mono": normalize to monoisotopic peak
            "sum": normalize to sum of all peaks
            Defaults to "sum".
        """

        if "precursor_mz" not in self._precursor_df.columns:
            self.calc_and_clip_precursor_mz()

        do_multiprocessing = (
            mp_process_num > 1 and len(self.precursor_df) > mp_batch_size
        )

        # Custom modifications are now supported in multiprocessing

See `alphabase.peptide.calc_precursor_isotope` for details.
See `alphabase.peptide.calc_precursor_isotope_info` for details.

Parameters

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved documentation for the calc_precursor_isotope_info method with better parameter descriptions. Removed the check against custom modifications since that limitation no longer exists.

    def calc_precursor_isotope_info(
        self,
        mp_process_num: int = 8,
        mp_process_bar=None,
        mp_batch_size=10000,
    ):
        """
        Append isotope columns into self.precursor_df.
        See `alphabase.peptide.calc_precursor_isotope_info` for details.

        Parameters
        ----------
        mp_process_num : int, optional
            The number of processes for multiprocessing. Defaults to 8.

        mp_process_bar : tqdm.tqdm, optional
            Progress bar. Defaults to None.

        mp_batch_size : int, optional
            The batch size for multiprocessing. Defaults to 10000.
        """
        if "precursor_mz" not in self._precursor_df.columns:
            self.calc_and_clip_precursor_mz()

        # Custom modifications are now supported in multiprocessing

Copy link

Number of tokens: input_tokens=34902 output_tokens=4096 max_tokens=4096
review_instructions=''
config={}
thinking: ```
[]

Premature stop because: max_tokens.

if not has_custom_mods():
return {}

custom_mods = MOD_DF[MOD_DF["classification"] == _MOD_CLASSIFICATION_USER_ADDED]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main drawback I see here is that this code needs to be adapted whenever something is added to the MOD_DF .. this would be easily missed and then the multiprocessing part of the code would behave differently

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants