Source code for haddock.libs.libontology

"""Describe the Haddock3 ontology used for communicating between modules."""

import datetime
import itertools
from enum import Enum
from os import linesep
from pathlib import Path


import jsonpickle

from haddock.core.defaults import MODULE_IO_FILE
from haddock.core.typing import FilePath, Literal, Optional, TypeVar, Union
from typing import List, Any


NaN = float("nan")


[docs] class Format(Enum): """Input and Output possible formats.""" PDB = "pdb" PDB_ENSEMBLE = "pdb" CNS_INPUT = "inp" CNS_OUTPUT = "out" TOPOLOGY = "psf" MATRIX = "matrix" def __str__(self) -> str: return str(self.value)
[docs] class Persistent: """Any persistent file generated by this framework.""" def __init__( self, file_name: FilePath, file_type: Format, path: FilePath = ".", md5: Optional[str] = None, restr_fname: Optional[FilePath] = None, ) -> None: self.created = datetime.datetime.now().isoformat(" ", "seconds") self.file_name = Path(file_name).name self.file_type = file_type self.path = str(Path(path).resolve()) self.full_name = str(Path(path, self.file_name)) self.rel_path = Path("..", Path(self.path).name, file_name) self.md5 = md5 self.restr_fname = restr_fname def __repr__(self) -> str: rep = ( f"[{self.file_type}|{self.created}] " f"{Path(self.path) / self.file_name}" ) return rep
[docs] def is_present(self) -> bool: """Check if the persisent file exists on disk.""" return self.rel_path.resolve().exists()
[docs] class PDBFile(Persistent): """Represent a PDB file.""" def __init__( self, file_name: Union[Path, str], topology: Optional[Any] = None, path: Union[Path, str] = ".", score: float = NaN, md5: Optional[str] = None, restr_fname: Optional[Union[Path, str]] = None, unw_energies: Optional[dict[str, float]] = None, ) -> None: super().__init__(file_name, Format.PDB, path, md5, restr_fname) self.topology = topology self.score = score self.ori_name: Optional[str] = None self.clt_id: Union[str, int, None] = None self.clt_rank: Optional[int] = None self.clt_model_rank: Optional[int] = None self.len = score self.unw_energies = unw_energies self.seed = None def __lt__(self, other: "PDBFile") -> bool: return self.score < other.score def __gt__(self, other: "PDBFile") -> bool: return self.score > other.score def __eq__(self, other: "PDBFile") -> bool: # type: ignore return self.score == other.score def __hash__(self) -> int: return id(self)
[docs] class RMSDFile(Persistent): """Represents a RMSD matrix file.""" def __init__(self, file_name: FilePath, npairs: int, path: FilePath = ".") -> None: super().__init__(file_name, Format.MATRIX, path) self.npairs = npairs def __hash__(self) -> int: return id(self)
[docs] class TopologyFile(Persistent): """Represent a CNS-generated topology file.""" def __init__(self, file_name: FilePath, path: FilePath = ".") -> None: super().__init__(file_name, Format.TOPOLOGY, path)
[docs] class ModuleIO: """Intercommunicating modules and exchange input/output information.""" def __init__(self) -> None: self.input: List[Any] = [] self.output: List[Any] = []
[docs] def add(self, persistent, mode="i"): """Add a given filename as input or output.""" if mode == "i": if isinstance(persistent, list): self.input.extend(persistent) else: self.input.append(persistent) else: if isinstance(persistent, list): self.output.extend(persistent) else: self.output.append(persistent)
[docs] def save(self, path: FilePath = ".", filename: FilePath = MODULE_IO_FILE) -> Path: """Save Input/Output needed files by this module to disk.""" fpath = Path(path, filename) with open(fpath, "w") as output_handler: to_save = {"input": self.input, "output": self.output} jsonpickle.set_encoder_options("json", sort_keys=True, indent=4) output_handler.write(jsonpickle.encode(to_save)) # type: ignore return fpath
[docs] def load(self, filename: FilePath) -> None: """Load the content of a given IO filename.""" with open(filename) as json_file: content = jsonpickle.decode(json_file.read()) self.input = content["input"] # type: ignore self.output = content["output"] # type: ignore
[docs] def retrieve_models( self, crossdock: bool = False, individualize: bool = False ) -> list[Union[PDBFile, list[PDBFile]]]: """Retrieve the PDBobjects to be used in the module.""" # Get the models generated in previous step model_list: list[PDBFile] = [] input_dic: dict[int, list[PDBFile]] = {} for i, element in enumerate(self.output): if isinstance(element, dict): position_list: list[PDBFile] = input_dic.setdefault(i, []) for key in element: position_list.append(element[key]) # type: ignore elif element.file_type == Format.PDB: # type: ignore model_list.append(element) # type: ignore if input_dic and not crossdock and not individualize: # check if all ensembles contain the same number of models sub_lists = iter(input_dic.values()) _len = len(next(sub_lists)) if not all(len(sub) == _len for sub in sub_lists): _msg = ( "Different number of models in molecules," " cannot prepare pairwise complexes." ) raise Exception(_msg) # prepare pairwise combinations model_list = [values for values in zip(*input_dic.values())] # type: ignore elif input_dic and crossdock and not individualize: model_list = [values for values in itertools.product(*input_dic.values())] # type: ignore elif input_dic and individualize: model_list = list(itertools.chain(*input_dic.values())) return model_list # type: ignore
[docs] def check_faulty(self) -> float: """Check how many of the output exists.""" total = 0.0 present = 0.0 for element in self.output: if isinstance(element, dict): total += len(element) present += sum(j.is_present() for j in element.values()) else: total += 1 if element.is_present(): present += 1 if total == 0: _msg = "No expected output was passed to ModuleIO" raise Exception(_msg) faulty_per = (1 - (present / total)) * 100 # added this method here to avoid modifying all calls in the # modules' run method. We can think about restructure this part # in the future. self.remove_missing() return faulty_per
[docs] def remove_missing(self) -> None: """Remove missing structure from `output`.""" # can't modify a list/dictionary within a loop idxs: list[int] = [] for idx, element in enumerate(self.output): if isinstance(element, dict): to_pop = [] for key2 in element: if not element[key2].is_present(): to_pop.append(key2) for pop_me in to_pop: element.pop(pop_me) else: if not element.is_present(): idxs.append(idx) self.output = [value for i, value in enumerate(self.output) if i not in idxs]
def __repr__(self) -> str: return f"Input: {self.input}{linesep}Output: {self.output}"
PDBPath = Union[PDBFile, Path] PDBPathT = TypeVar("PDBPathT", bound=Union[PDBFile, Path]) """ Generic type variable for PDBFile or Path. If the first annotated variable is PDBFile, the second annotated variable will be PDBFile instead of Path,vice versa. """