Source code for haddock.modules.analysis.rmsdmatrix

"""
RMSD matrix module.

This module calculates of the RMSD matrix between all the models
generated in the previous step.

As all the pairwise RMSD calculations are independent, the module distributes
them over all the available cores in an optimal way.

Once created, the RMSD matrix is saved in text form in the current `rmsdmatrix`
folder. The path to this file is then shared with the following step of the
workflow by means of the json file `rmsd_matrix.json`.

The module accepts two parameters in input, namely:

* `max_models` (default = 10000)
* `resdic_` : an expandable parameter to specify which residues must be
  considered for the alignment and the RMSD calculation. If there are
  two proteins denoted by chain IDs A and B, then the user can operate
  such selection in the following way inside the configuration file

>>> resdic_A = [1,2,3,4]
>>> resdic_B = [2,3,4]

thus telling the module to consider residues from 1 to 4 of chain A and from 2
to 4 of chain B for the alignment and RMSD calculation.
"""

import contextlib
import os
from pathlib import Path

from haddock import RMSD_path, log
from haddock.core.defaults import FAST_RMSDMATRIX_EXEC, MODULE_DEFAULT_YAML
from haddock.core.typing import Any, AtomsDict, FilePath
from haddock.libs.libalign import check_common_atoms, rearrange_xyz_files
from haddock.libs.libontology import ModuleIO, RMSDFile
from haddock.libs.libparallel import get_index_list
from haddock.libs.libutil import parse_ncores
from haddock.modules import BaseHaddockModule, get_engine
from haddock.modules.analysis import (
    confirm_resdic_chainid_length,
    get_analysis_exec_mode,
    )
from haddock.modules.analysis.rmsdmatrix.rmsd import (
    RMSDJob,
    XYZWriter,
    XYZWriterJob,
    rmsd_dispatcher,
    )


RECIPE_PATH = Path(__file__).resolve().parent
DEFAULT_CONFIG = Path(RECIPE_PATH, MODULE_DEFAULT_YAML)
EXEC_PATH = FAST_RMSDMATRIX_EXEC


[docs] class HaddockModule(BaseHaddockModule): """HADDOCK3 module for clustering with RMSD.""" name = RECIPE_PATH.name def __init__( self, order: int, path: Path, initial_params: FilePath = DEFAULT_CONFIG ) -> None: super().__init__(order, path, initial_params)
[docs] @classmethod def confirm_installation(cls) -> None: """Confirm if fast-rmsdmatrix is installed and available.""" if not os.access(EXEC_PATH, mode=os.F_OK): raise Exception( f"Required {str(EXEC_PATH)} file does not exist.{os.linesep}" "Old HADDOCK3 installation? Please follow the new installation instructions at https://github.com/haddocking/haddock3/blob/main/docs/INSTALL.md" ) if not os.access(EXEC_PATH, mode=os.X_OK): raise Exception(f"Required {str(EXEC_PATH)} file is not executable") return
def _rearrange_output( self, output_name: FilePath, path: FilePath, ncores: int ) -> None: """Combine different rmsd outputs in a single file.""" output_fname = Path(path, output_name) self.log(f"rearranging output files into {output_fname}") # Combine files with open(output_fname, "w") as out_file: for core in range(ncores): tmp_file = Path(path, "rmsd_" + str(core) + ".matrix") with open(tmp_file) as infile: out_file.write(infile.read()) log.debug(f"File number {core} written") tmp_file.unlink() log.info("Completed reconstruction of rmsd files.") log.info(f"{output_fname} created.")
[docs] def update_params(self, *args: Any, **kwargs: Any) -> None: """Update parameters.""" super().update_params(*args, **kwargs) with contextlib.suppress(KeyError): self.params.pop("resdic_") confirm_resdic_chainid_length(self._params)
def _run(self) -> None: """Execute module.""" # Get the models generated in previous step if type(self.previous_io) == iter: _e = "This module cannot come after one that produced an iterable." self.finish_with_error(_e) # Get the models generated in previous step models = self.previous_io.retrieve_models(individualize=True) nmodels = len(models) if nmodels > self.params["max_models"]: # too many input models : RMSD matrix would be too big => Abort! raise Exception("Too many models for RMSD matrix calculation") # index_list for the jobs with linear scaling ncores = parse_ncores(n=self.params["ncores"], njobs=len(models)) index_list = get_index_list(nmodels, ncores) traj_filename = Path("traj.xyz") filter_resdic = { key[-1]: value for key, value in self.params.items() if key.startswith("resdic") } # check common atoms n_atoms, common_keys = check_common_atoms( models, filter_resdic, self.params["allatoms"], self.params["atom_similarity"], ) xyzwriter_jobs: list[XYZWriterJob] = [] for core in range(ncores): output_name = Path("traj_" + str(core) + ".xyz") # init RMSDJobFast xyzwriter_obj = XYZWriter( model_list=models[index_list[core] : index_list[core + 1]], output_name=output_name, core=core, n_atoms=n_atoms, common_keys=common_keys, filter_resdic=filter_resdic, allatoms=self.params["allatoms"], ) # job_f = output_name job = XYZWriterJob( xyzwriter_obj, ) xyzwriter_jobs.append(job) # run jobs exec_mode = get_analysis_exec_mode(self.params["mode"]) Engine = get_engine(exec_mode, self.params) engine = Engine(xyzwriter_jobs) engine.run() rearrange_xyz_files(traj_filename, path=Path("."), ncores=ncores) # Parallelisation : optimal dispatching of models tot_npairs = nmodels * (nmodels - 1) // 2 log.info(f"total number of pairs {tot_npairs}") ncores = parse_ncores(n=self.params["ncores"], njobs=tot_npairs) npairs, ref_structs, mod_structs = rmsd_dispatcher(nmodels, tot_npairs, ncores) # Calculate the rmsd for each set of models rmsd_jobs: list[RMSDJob] = [] self.log(f"running RmsdFast Jobs with {ncores} cores") for core in range(ncores): output_name = Path("rmsd_" + str(core) + ".out") # init RMSDJobFast job = RMSDJob( traj_filename, output_name, EXEC_PATH, core, npairs[core], ref_structs[core], mod_structs[core], len(models), n_atoms, ) rmsd_jobs.append(job) engine = Engine(rmsd_jobs) engine.run() rmsd_file_l: list[str] = [] not_found: list[str] = [] for job in rmsd_jobs: if not job.output.exists(): # NOTE: If there is no output, most likely the RMSD calculation # timed out not_found.append(job.output.name) wrn = f"Rmsd results were not calculated for {job.output.name}" log.warning(wrn) else: rmsd_file_l.append(str(job.output)) if not_found: # Not all distances were calculated, cannot create the full matrix self.finish_with_error("Several files were not generated:" f" {not_found}") # Post-processing : single file final_output_name = "rmsd.matrix" self._rearrange_output(final_output_name, path=Path("."), ncores=ncores) # Delete the trajectory file if traj_filename.exists(): os.unlink(traj_filename) # Sending models to the next step of the workflow self.output_models = models self.export_io_models() # Sending matrix path to the next step of the workflow matrix_io = ModuleIO() rmsd_matrix_file = RMSDFile(final_output_name, npairs=tot_npairs) matrix_io.add(rmsd_matrix_file) matrix_io.save(filename="rmsd_matrix.json")