Source code for haddock.clis.cli_dmn

r"""
HADDOCK3 benchmark submission daemon.

For more information read our benchmark tutorial at `docs/benchmark.tut`
in HADDOCK3 repository site: https://github.com/haddocking/haddock3::

   (_) L|J
   (")  |
   /_\--|
 _/\ /  |
   _W_  |

Usage::

    haddock3-dmn -h
    haddock3-dmn <benchmark folder>  --job-limit <num> [OPTIONS]
"""
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path

from haddock.core.typing import ArgumentParser, Callable, Namespace, Optional


workload_manager_launch = {
    "slurm": "sbatch",
    "torque": "qsub",
}
"""options for the different job queue systems supported"""


# prepares client arguments
ap = argparse.ArgumentParser(
    prog="HADDOCK3 benchmark submission daemon.",
    description=__doc__,
    formatter_class=argparse.RawDescriptionHelpFormatter,
)

ap.add_argument(
    "benchmark_path",
    help="Path to the benchmark folder as prepared by `haddock3-bm` interface.",
    type=Path,
)

ap.add_argument(
    "--job-limit",
    dest="job_limit",
    help="How many jobs should run at the same time. Default: 10",
    default=10,
    type=int,
)

ap.add_argument(
    "--job-sys",
    dest="manager",
    help="The system where the jobs will be run. Default `slurm`.",
    choices=tuple(workload_manager_launch.keys()),
    default="slurm",
)

ap.add_argument(
    "--restart",
    help="Restart the RUNNING jobs. DONE jobs won't be touched.",
    action="store_true",
)

ap.add_argument(
    "--sort-first",
    dest="sort_first",
    help=(
        "Sort jobs by size in ascending order. If not given jobs are order by "
        "size in descending order: the biggest first."
    ),
    action="store_true",
)


def _ap() -> ArgumentParser:
    return ap


[docs] class Job: """ Job task. Controls the status of each job. Parameters ---------- job_f : pathlib.Path The path to the job file. launch_command : str The command to launch the job. For example `sbatch`. """ def __init__(self, job_f: Path, launch_command: str) -> None: self.job_filename = job_f self.launch_command = launch_command # previous jog path job_stem = job_f.stem self.job_run_folder = Path(job_f.parents[1], f"run-{job_stem}") self.check_done = Path(self.job_run_folder, "DONE") self.check_running = Path(self.job_run_folder, "RUNNING") self.check_available = Path(self.job_run_folder, "AVAILABLE") self.check_fail = Path(self.job_run_folder, "FAIL") self.status = None self.status_files = [ self.check_done, self.check_running, self.check_fail, self.check_available, ]
[docs] def get_status(self) -> Optional[str]: """ Get job status. The job status depends on the present of files: `AVAILABLE`, `RUNNING`, `DONE`, and `FAIL` created by `haddock-bm` jobs. Status is assigned to `self.status` and returned. """ for _file in self.status_files: if _file.exists(): self.status = _file.stem # type: ignore break return self.status
[docs] def submit(self) -> None: """ Submit job. Run command `$launch_command $job_filename`. """ subprocess.run(cmds := [self.launch_command, str(self.job_filename)]) print("Job sent: ", cmds)
[docs] def restart(self) -> None: """ Restart the status of the job to `AVAILABLE`. Does this by removing all status files and creating the file `AVAILABLE`. """ self.check_done.unlink(missing_ok=True) self.check_running.unlink(missing_ok=True) self.check_fail.unlink(missing_ok=True) self.check_available.touch(exist_ok=True)
[docs] def get_current_jobs(grep: str = "BM5") -> int: """ Get current number of jobs for which job-name has the `grep` word. List of jobs is retrieve using the command `qstat`. Parameters ---------- grep : str The string to search job-names for. Return ------ int The number of jobs with the word `grep` in their name. """ concurrent_cmd = "qstat -a | awk '{print $4}' | grep " + grep + " | wc -l" p = subprocess.Popen( concurrent_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) output = p.communicate()[0] njobs = int(output.decode("utf-8")) print(f"Found {njobs} in the queue.") return njobs
[docs] def calc_size(job_path: Path) -> int: """ Calculate the size of the job. Expects the job file to be in a folder structure as defined by `haddock3-bm`. The size of the jobs is defined by the number of carbon-alpha lines in the `target.pdb` file. """ target_pdb = Path(job_path.parents[1], "input", "target.pdb") lines = target_pdb.read_text().split(os.linesep) size = sum(1 for line in lines if line[11:16].strip() == "CA") return size
[docs] def filter_by_status(job_list: list[Job], status: str = "AVAILABLE") -> list[Job]: """ Filter jobs by their status. Only jobs with `status` are accepted. Parameters ---------- job_list : list of Job objects. Returns ------- list The list with the `Job`s with matching `status`. """ jobs = [j for j in job_list if j.get_status() == status] print(f"Number of {status} jobs: {len(jobs)}.") return jobs
# command-line client helper functions # load_args, cli, maincli
[docs] def load_args(ap: ArgumentParser) -> Namespace: """Load argument parser args.""" return ap.parse_args()
[docs] def cli(ap: ArgumentParser, main: Callable[..., None]) -> None: """Command-line interface entry point.""" cmd = load_args(ap) main(**vars(cmd))
[docs] def maincli() -> None: """Execute main client.""" cli(ap, main)
[docs] def main( benchmark_path: Path, job_limit: int = 10, manager: str = "slurm", restart: bool = False, sort_first: bool = False, ) -> None: """ Execute the benchmark daemon. The parameters defined here are the same as defined in the client arguments. This is the main function of the client. If you want to run the daemon withOUT using the command line and instead importing its functionalities and setting it up from another pythong script, you should import this function. >>> from haddock.clis.cli_dmn import main Parameters ---------- benchmark_path : pathlib.Path The path of the benchmark folder as created by the `haddock3-bm` interface. job_limit : int The max number of jobs to send to the queue. manager : str A key to the `workload_manager_launch` dictionary. Selects the queue management system. restart : bool Whether to restart the `RUNNING` jobs that might have been halted in previous daemon runs. Defaults to False. sort_first : bool Whether to sort jobs by their size in ascending manner. That is, the sorted jobs first. Defaults to False, the longer first. """ # lists of all the job files in the benchmark_path folder job_list = list(benchmark_path.glob("*/jobs/*.job")) # breaks if no jobs are found if not job_list: sys.exit("+ ERROR! No jobs found in folder: {str(benchmark_path)!r}") # sorts the job list job_list.sort(key=calc_size, reverse=not (sort_first)) # noqa: E275 # create the job objects according to the queue managing systme _jobsys = workload_manager_launch[manager] jobs = [Job(j, _jobsys) for j in job_list] # restart previously (halted) `RUNNING` jobs - if selected. if restart: running_jobs = filter_by_status(jobs, status="RUNNING") for _job in running_jobs: _job.restart() # Lists the available jobs (those with status `AVAILABLE`) available_jobs = filter_by_status(jobs) # runs the daemon loop, only if there are available jobs :-) while available_jobs: # get the number of available queue slots according to the # job limit parameter # # 0 is added to avoid going to negative values in case a job # had been manually submitted. empty_slots = max(0, job_limit - get_current_jobs()) print("empty slots: ", empty_slots) # send jobs if there are empty slots for i in range(empty_slots): available_jobs[i].submit() time.sleep(5) # chill before repeating the process. print("chilling for 120 seconds...") time.sleep(120) # refreshes the available_jobs list available_jobs = filter_by_status(jobs) # done return
if __name__ == "__main__": sys.exit(maincli()) # type: ignore