"""General utilities."""
import collections.abc
import contextlib
import re
import shutil
import subprocess
import sys
from copy import deepcopy
from functools import partial
from os import cpu_count
from pathlib import Path
from haddock import EmptyPath, log
from haddock.core.exceptions import SetupError
from haddock.core.typing import (
PT,
Any,
AnyT,
Callable,
Container,
FilePath,
FilePathT,
Generator,
Iterable,
Optional,
ParamDict,
ParamMap,
ParamMapT,
Union,
)
from haddock.gear.greetings import get_goodbye_help
check_subprocess = partial(
subprocess.run,
shell=True,
check=True,
stdout=subprocess.DEVNULL,
)
[docs]
def get_result_or_same_in_list(
function: Callable[[PT], AnyT], value: PT
) -> Union[AnyT, list[PT]]:
"""
Return the result if True or the value within a list.
Applies `function` to `value` and returns its result if it evaluates
to True. Otherwise, return the value within a list.
`function` should receive a single argument, the `value`.
"""
result = function(value)
return result if result else [value]
[docs]
def make_list_if_string(item: Union[str, list[str]]) -> list[str]:
"""Put `item` into a list."""
if isinstance(item, str):
return [item]
return item
[docs]
def copy_files_to_dir(paths: Iterable[FilePath], directory: FilePath) -> None:
"""
Copy files to directory.
Parameters
----------
paths : iterable of paths
Source files.
directory : path
Where to copy files to.
"""
for path in paths:
shutil.copy(path, directory)
[docs]
def remove_folder(folder: FilePath) -> None:
"""
Remove a folder if it exists.
Parameters
----------
folder : str or Path
Path to folder to remove.
"""
if Path(folder).exists():
log.warning(f"{folder} exists and it will be REMOVED!")
shutil.rmtree(folder)
[docs]
def remove_dict_keys(d: ParamMap, keys: Container[str]) -> ParamDict:
"""
Remove `keys` from dictionary (`d`).
Return
------
dict
A copy of `d` dictionary without the `keys`.
"""
return {k: deepcopy(v) for k, v in d.items() if k not in keys}
[docs]
def parse_ncores(
n: Optional[Union[int, str]] = None,
njobs: Optional[int] = None,
max_cpus: Optional[bool] = None,
) -> int:
"""
Check the number of cores according to HADDOCK3 architecture.
Parameters
----------
n : int or str
The desired number of cores. If `None` is given, returns the
maximum number of cores allowed, see `max_cpus`.
njobs : int
The number of jobs to execute. Optional. The number of cores
will be compared to `njobs`.
max_cpus : int
The maximum number of CPUs allowed. If not specified, defaults
to the available CPUs minus one.
Raises
------
SetupError
If `n` is not positive or not convertable to `int`.
Returns
-------
int
A correct number of cores according to specifications.
"""
if max_cpus is None or max_cpus is False:
max_cpus = max(cpu_count() - 1, 1) # type: ignore
if max_cpus is True:
max_cpus = cpu_count() # type: ignore
elif not isinstance(max_cpus, int):
raise TypeError(f"`max_cpus` not of valid type: {type(max_cpus)}")
if n is None:
return max_cpus
try:
n = int(n)
except (TypeError, ValueError) as err:
_msg = f"`n` must be `int` or `int`-convertable `str`: {n!r} given."
raise SetupError(_msg) from err
if n < 1:
_msg = f"`n` is not positive, this is not possible: {n!r}"
raise SetupError(_msg)
if njobs is not None:
ncores = min(n, njobs, max_cpus)
log.info(
f"Selected {ncores} cores to process {njobs} jobs, with {max_cpus} "
"maximum available cores."
)
return ncores
log.debug(f"`njobs` not specified, evaluating initial value {n}...")
ncores = min(n, max_cpus)
log.debug(f"Selected {ncores} for a maximum of {max_cpus} CPUs")
return ncores
[docs]
def non_negative_int(
n: Any,
exception: type[Exception] = ValueError,
emsg: str = "`n` do not satisfies",
) -> int:
"""
Transform `n` in int and returns if `compare` evaluates to True.
Parameters
----------
n : int-convertable
Something that can be converted to int.
exception : Exception
The Exception to raise in case `n` is not a positive integer.
emsg : str
The error message to give to `exception`. May accept formatting
to pass `n`.
Raises
------
ValueError, TypeError
If `n` cannot be converted to `int`
"""
try:
n1 = int(n)
if n1 >= 0:
return n1
except Exception as e:
raise e
else:
# don't change to f-strings, .format has a purpose
raise exception(emsg.format(n))
[docs]
def recursive_dict_update(d: ParamMapT, u: ParamMap) -> ParamMapT:
"""
Update dictionary `d` according to `u` recursively.
https://stackoverflow.com/questions/3232943
Returns
-------
dict
A new dict object with updated key: values. The original dictionaries
are not modified.
"""
def _recurse(d_: ParamMapT, u_: ParamMap) -> ParamMapT:
for k, v in u_.items():
if isinstance(v, collections.abc.Mapping):
d_[k] = _recurse(d_.get(k, {}), v)
else:
d_[k] = deepcopy(v) # in case these are also lists
return d_
new = deepcopy(d)
_recurse(new, u)
return new
[docs]
def get_number_from_path_stem(path: FilePath) -> int:
"""
Extract tail number from path.
Examples
--------
>>> get_number_from_path_stem('src/file_1.pdb')
>>> 1
>>> get_number_from_path_stem('src/file_3.pdb')
>>> 3
>>> get_number_from_path_stem('file_1231.pdb')
>>> 1231
>>> get_number_from_path_stem('src/file11')
>>> 11
>>> get_number_from_path_stem('src/file_1234_1.pdb')
>>> 1
Parameters
----------
path : str or Path obj
The path to evaluate.
Returns
-------
int
The tail integer of the path.
"""
stem = Path(path).stem
number = re.findall(r"\d+", stem)[-1]
return int(number)
[docs]
def sort_numbered_paths(*paths: FilePathT) -> list[FilePathT]:
"""
Sort input paths to tail number.
If possible, sort criteria is provided by
:py:func:`get_number_from_path_stem`.
If paths do not have a numbered tag, sort paths alphabetically.
Parameters
----------
*inputs : str or pathlib.Path
Paths to files.
Returns
-------
list
The sorted pathlist. The original types are not modified. If
strings are given, strings are returns, if Paths are given
paths are returned.
"""
try:
return sorted(paths, key=get_number_from_path_stem)
except TypeError as err:
log.exception(err)
emsg = (
"Mind the packing *argument, input should be strings or Paths, "
"not a list."
)
raise TypeError(emsg)
except IndexError:
return sorted(paths, key=lambda x: Path(x).stem)
[docs]
@contextlib.contextmanager
def log_error_and_exit() -> Generator[None, None, None]:
"""Exit with exception."""
try:
yield
except Exception as err:
log.exception(err)
log.error(err)
log.error(
"An error has occurred, see log file. "
"And contact the developers if needed."
)
log.info(get_goodbye_help())
sys.exit(1)
[docs]
def recursive_convert_paths_to_strings(params: ParamMapT) -> ParamMapT:
"""
Convert paths to strings recursively over a dictionary.
Parameters
----------
params : dictionary
Returns
-------
dictionary
A copy of the original dictionary with paths converted to strings.
"""
params = deepcopy(params)
for param, value in params.items():
if isinstance(value, (Path, EmptyPath)):
params[param] = str(value)
elif isinstance(value, collections.abc.Mapping):
params[param] = recursive_convert_paths_to_strings(value)
elif isinstance(value, (tuple, list)):
for i, v in enumerate(value):
if isinstance(v, (Path, EmptyPath)):
value[i] = str(v)
params[param] = value
return params