"""Describe the Haddock3 ontology used for communicating between modules."""
import datetime
import itertools
from enum import Enum
from os import linesep
from pathlib import Path
import jsonpickle
from haddock.core.defaults import MODULE_IO_FILE
from haddock.core.typing import FilePath, Literal, Optional, TypeVar, Union
from typing import List, Any
NaN = float("nan")
[docs]
class Persistent:
"""Any persistent file generated by this framework."""
def __init__(
self,
file_name: FilePath,
file_type: Format,
path: FilePath = ".",
md5: Optional[str] = None,
restr_fname: Optional[FilePath] = None,
) -> None:
self.created = datetime.datetime.now().isoformat(" ", "seconds")
self.file_name = Path(file_name).name
self.file_type = file_type
self.path = str(Path(path).resolve())
self.full_name = str(Path(path, self.file_name))
self.rel_path = Path("..", Path(self.path).name, file_name)
self.md5 = md5
self.restr_fname = restr_fname
def __repr__(self) -> str:
rep = (
f"[{self.file_type}|{self.created}] " f"{Path(self.path) / self.file_name}"
)
return rep
[docs]
def is_present(self) -> bool:
"""Check if the persisent file exists on disk."""
return self.rel_path.resolve().exists()
[docs]
class PDBFile(Persistent):
"""Represent a PDB file."""
def __init__(
self,
file_name: Union[Path, str],
topology: Optional[Any] = None,
path: Union[Path, str] = ".",
score: float = NaN,
md5: Optional[str] = None,
restr_fname: Optional[Union[Path, str]] = None,
unw_energies: Optional[dict[str, float]] = None,
) -> None:
super().__init__(file_name, Format.PDB, path, md5, restr_fname)
self.topology = topology
self.score = score
self.ori_name: Optional[str] = None
self.clt_id: Union[str, int, None] = None
self.clt_rank: Optional[int] = None
self.clt_model_rank: Optional[int] = None
self.len = score
self.unw_energies = unw_energies
self.seed = None
def __lt__(self, other: "PDBFile") -> bool:
return self.score < other.score
def __gt__(self, other: "PDBFile") -> bool:
return self.score > other.score
def __eq__(self, other: "PDBFile") -> bool: # type: ignore
return self.score == other.score
def __hash__(self) -> int:
return id(self)
[docs]
class RMSDFile(Persistent):
"""Represents a RMSD matrix file."""
def __init__(self, file_name: FilePath, npairs: int, path: FilePath = ".") -> None:
super().__init__(file_name, Format.MATRIX, path)
self.npairs = npairs
def __hash__(self) -> int:
return id(self)
[docs]
class TopologyFile(Persistent):
"""Represent a CNS-generated topology file."""
def __init__(self, file_name: FilePath, path: FilePath = ".") -> None:
super().__init__(file_name, Format.TOPOLOGY, path)
[docs]
class ModuleIO:
"""Intercommunicating modules and exchange input/output information."""
def __init__(self) -> None:
self.input: List[Any] = []
self.output: List[Any] = []
[docs]
def add(self, persistent, mode="i"):
"""Add a given filename as input or output."""
if mode == "i":
if isinstance(persistent, list):
self.input.extend(persistent)
else:
self.input.append(persistent)
else:
if isinstance(persistent, list):
self.output.extend(persistent)
else:
self.output.append(persistent)
[docs]
def save(self, path: FilePath = ".", filename: FilePath = MODULE_IO_FILE) -> Path:
"""Save Input/Output needed files by this module to disk."""
fpath = Path(path, filename)
with open(fpath, "w") as output_handler:
to_save = {"input": self.input, "output": self.output}
jsonpickle.set_encoder_options("json", sort_keys=True, indent=4)
output_handler.write(jsonpickle.encode(to_save)) # type: ignore
return fpath
[docs]
def load(self, filename: FilePath) -> None:
"""Load the content of a given IO filename."""
with open(filename) as json_file:
content = jsonpickle.decode(json_file.read())
self.input = content["input"] # type: ignore
self.output = content["output"] # type: ignore
[docs]
def retrieve_models(
self, crossdock: bool = False, individualize: bool = False
) -> list[Union[PDBFile, list[PDBFile]]]:
"""Retrieve the PDBobjects to be used in the module."""
# Get the models generated in previous step
model_list: list[PDBFile] = []
input_dic: dict[int, list[PDBFile]] = {}
for i, element in enumerate(self.output):
if isinstance(element, dict):
position_list: list[PDBFile] = input_dic.setdefault(i, [])
for key in element:
position_list.append(element[key]) # type: ignore
elif element.file_type == Format.PDB: # type: ignore
model_list.append(element) # type: ignore
if input_dic and not crossdock and not individualize:
# check if all ensembles contain the same number of models
sub_lists = iter(input_dic.values())
_len = len(next(sub_lists))
if not all(len(sub) == _len for sub in sub_lists):
_msg = (
"Different number of models in molecules,"
" cannot prepare pairwise complexes."
)
raise Exception(_msg)
# prepare pairwise combinations
model_list = [values for values in zip(*input_dic.values())] # type: ignore
elif input_dic and crossdock and not individualize:
model_list = [values for values in itertools.product(*input_dic.values())] # type: ignore
elif input_dic and individualize:
model_list = list(itertools.chain(*input_dic.values()))
return model_list # type: ignore
[docs]
def check_faulty(self) -> float:
"""Check how many of the output exists."""
total = 0.0
present = 0.0
for element in self.output:
if isinstance(element, dict):
total += len(element)
present += sum(j.is_present() for j in element.values())
else:
total += 1
if element.is_present():
present += 1
if total == 0:
_msg = "No expected output was passed to ModuleIO"
raise Exception(_msg)
faulty_per = (1 - (present / total)) * 100
# added this method here to avoid modifying all calls in the
# modules' run method. We can think about restructure this part
# in the future.
self.remove_missing()
return faulty_per
[docs]
def remove_missing(self) -> None:
"""Remove missing structure from `output`."""
# can't modify a list/dictionary within a loop
idxs: list[int] = []
for idx, element in enumerate(self.output):
if isinstance(element, dict):
to_pop = []
for key2 in element:
if not element[key2].is_present():
to_pop.append(key2)
for pop_me in to_pop:
element.pop(pop_me)
else:
if not element.is_present():
idxs.append(idx)
self.output = [value for i, value in enumerate(self.output) if i not in idxs]
def __repr__(self) -> str:
return f"Input: {self.input}{linesep}Output: {self.output}"
PDBPath = Union[PDBFile, Path]
PDBPathT = TypeVar("PDBPathT", bound=Union[PDBFile, Path])
"""
Generic type variable for PDBFile or Path.
If the first annotated variable is PDBFile,
the second annotated variable will be PDBFile instead of Path,vice versa.
"""