Source code for haddock.libs.libworkflow

"""HADDOCK3 workflow logic."""
import importlib
import sys
from pathlib import Path
from time import time

from haddock import log
from haddock.clis.cli_analyse import main as cli_analyse
from haddock.clis.cli_traceback import main as cli_traceback
from haddock.core.exceptions import HaddockError, HaddockTermination, StepError
from haddock.core.typing import Any, ModuleParams, Optional
from haddock.gear.clean_steps import clean_output
from haddock.gear.config import get_module_name
from haddock.gear.zerofill import zero_fill
from haddock.libs.libtimer import convert_seconds_to_min_sec, log_time
from haddock.libs.libutil import recursive_dict_update
from haddock.modules import (
    modules_category,
    non_mandatory_general_parameters_defaults,
)


[docs] class WorkflowManager: """Read and execute workflows.""" def __init__( self, workflow_params: ModuleParams, start: Optional[int] = 0, **other_params: Any, ) -> None: self.start = 0 if start is None else start self.recipe = Workflow(workflow_params, start=0, **other_params) # terminate is used to synchronize the `clean` option with the # `exit` module. If the `exit` module is removed in the future, # you can also remove and clean the `terminate` part here. self._terminated = None
[docs] def run(self) -> None: """High level workflow composer.""" for i, step in enumerate(self.recipe.steps[self.start :], start=self.start): try: step.execute() except HaddockTermination: self._terminated = i # type: ignore break
[docs] def clean(self, terminated: Optional[int] = None) -> None: """ Clean steps. Parameters ---------- terminated : int, None At which index of the workflow to stop the cleaning. If ``None``, uses the internal class configuration. """ terminated = self._terminated if terminated is None else terminated for step in self.recipe.steps[:terminated]: step.clean()
[docs] def postprocess(self) -> None: """Postprocess the workflow.""" # is the workflow going to be cleaned? is_cleaned = self.recipe.steps[0].config['clean'] # Is the workflow supposed to run offline offline = self.recipe.steps[0].config['offline'] # running mode mode = self.recipe.steps[0].config['mode'] # ncores ncores = self.recipe.steps[0].config['ncores'] capri_steps: list[int] = [] for step in self.recipe.steps: if step.module_name == "caprieval": capri_steps.append(step.order) # type: ignore # call cli_analyse (no need for capri_dicts, it's all precalculated) cli_analyse("./", capri_steps, top_cluster=10, format=None, scale=None, inter=False, is_cleaned=is_cleaned, offline=offline, mode=mode, ncores=ncores) # call cli_traceback. If it fails, it's not a big deal try: cli_traceback("./", offline=offline) except Exception as e: log.warning(f"Error running traceback: {e}")
[docs] class Workflow: """Represent a set of stages to be executed by HADDOCK.""" def __init__( self, modules_parameters: ModuleParams, start: Optional[int] = 0, **other_params: Any, ) -> None: if start is None: start = 0 # filter out those parameters not belonging to the modules general_modules = { k: v for k, v in other_params.items() if k in non_mandatory_general_parameters_defaults } # Create the list of steps contained in this workflow self.steps: list[Step] = [] _items = enumerate(modules_parameters.items(), start=start) for num_stage, (stage_name, params) in _items: stage_name = get_module_name(stage_name) log.info(f"Reading instructions step {num_stage}_{stage_name}") # updates the module's specific parameter with global parameters # that are applicable to the modules. But keep priority to the local # level params_up = recursive_dict_update(general_modules, params) try: _ = Step( stage_name, order=num_stage, **params_up, ) self.steps.append(_) except StepError as re: log.error(f"Error found while parsing course {stage_name}") raise HaddockError from re
[docs] class Step: """Represents a Step of the Workflow.""" def __init__( self, module_name: str, order: Optional[int] = None, **config_params: Any, ) -> None: self.config = config_params self.module_name = module_name self.order = order self.working_path = Path(zero_fill.fill(self.module_name, self.order)) # type: ignore self.module = None
[docs] def execute(self) -> None: """Execute simulation step.""" self.working_path.resolve().mkdir(parents=False, exist_ok=False) # Import the module given by the mode or default module_name = ".".join( ["haddock", "modules", modules_category[self.module_name], self.module_name] ) module_lib = importlib.import_module(module_name) self.module = module_lib.HaddockModule(order=self.order, path=self.working_path) # Run module start = time() try: self.module.update_params(**self.config) # type: ignore self.module.save_config(Path(self.working_path, "params.cfg")) # type: ignore self.module.run() # type: ignore except KeyboardInterrupt: log.info("You have halted subprocess execution by hitting Ctrl+c") log.info("Exiting...") sys.exit(1) end = time() elapsed = convert_seconds_to_min_sec(end - start) self.module.log(f"took {elapsed}") # type: ignore
[docs] def clean(self) -> None: """Clean step output.""" if self.module is None and self.config["clean"]: with log_time("cleaning output files took"): clean_output(self.working_path, self.config["ncores"]) elif self.module is not None and self.module.params["clean"]: self.module.clean_output()