"""General utilities."""
import collections.abc
import contextlib
import os
import re
import shutil
import subprocess
import sys
from copy import deepcopy
from functools import partial
from pathlib import Path
from haddock import EmptyPath, log
from haddock.core.exceptions import SetupError
from haddock.core.typing import (
PT,
Any,
AnyT,
Callable,
Container,
FilePath,
FilePathT,
Generator,
Iterable,
Optional,
ParamDict,
ParamMap,
ParamMapT,
Union,
)
from haddock.gear.greetings import get_goodbye_help
check_subprocess = partial(
subprocess.run,
shell=True,
check=True,
stdout=subprocess.DEVNULL,
)
[docs]
def get_result_or_same_in_list(
function: Callable[[PT], AnyT], value: PT
) -> Union[AnyT, list[PT]]:
"""
Return the result if True or the value within a list.
Applies `function` to `value` and returns its result if it evaluates
to True. Otherwise, return the value within a list.
`function` should receive a single argument, the `value`.
"""
result = function(value)
return result if result else [value]
[docs]
def make_list_if_string(item: Union[str, list[str]]) -> list[str]:
"""Put `item` into a list."""
if isinstance(item, str):
return [item]
return item
[docs]
def copy_files_to_dir(paths: Iterable[FilePath], directory: FilePath) -> None:
"""
Copy files to directory.
Parameters
----------
paths : iterable of paths
Source files.
directory : path
Where to copy files to.
"""
for path in paths:
shutil.copy(path, directory)
[docs]
def remove_folder(folder: FilePath) -> None:
"""
Remove a folder if it exists.
Parameters
----------
folder : str or Path
Path to folder to remove.
"""
if Path(folder).exists():
log.warning(f"{folder} exists and it will be REMOVED!")
shutil.rmtree(folder)
[docs]
def remove_dict_keys(d: ParamMap, keys: Container[str]) -> ParamDict:
"""
Remove `keys` from dictionary (`d`).
Return
------
dict
A copy of `d` dictionary without the `keys`.
"""
return {k: deepcopy(v) for k, v in d.items() if k not in keys}
[docs]
def cpu_count() -> int:
"""Count number of available CPU for the process.
User suggestion, by https://github.com/EricDeveaud
Note: pid 0 == current process
FIXME: from python3.13, better to use os.process_cpu_count()
Returns
-------
process_ncores : int
Number of cores allocated to the process pid.
"""
def _cpu_count() -> int:
"""Detect number of cores available.
Returns
-------
ncores : int
Maximum number of cores that can be used.
"""
try:
# Try to obtain the number of cpu allocated to the process
_process_ncores = os.sched_getaffinity(0)
if isinstance(_process_ncores, set):
ncores = len(_process_ncores)
else:
ncores = int(_process_ncores)
except AttributeError:
# If unsucessful, return the number cores detected in the machine
# Note: this can happen on MacOS, where the os.sched_getaffinity
# may not be defined/exist.
ncores = int(os.cpu_count())
return ncores
try:
process_ncores = int(os.process_cpu_count())
except AttributeError:
process_ncores = _cpu_count()
return process_ncores
[docs]
def parse_ncores(
n: Optional[Union[int, str]] = None,
njobs: Optional[int] = None,
max_cpus: Optional[bool] = None,
) -> int:
"""
Check the number of cores according to HADDOCK3 architecture.
Parameters
----------
n : int or str
The desired number of cores. If `None` is given, returns the
maximum number of cores allowed, see `max_cpus`.
njobs : int
The number of jobs to execute. Optional. The number of cores
will be compared to `njobs`.
max_cpus : int
The maximum number of CPUs allowed. If not specified, defaults
to the available CPUs minus one.
Raises
------
SetupError
If `n` is not positive or not convertable to `int`.
Returns
-------
int
A correct number of cores according to specifications.
"""
if max_cpus is None or max_cpus is False:
max_cpus = max(cpu_count() - 1, 1) # type: ignore
if max_cpus is True:
max_cpus = cpu_count() # type: ignore
elif not isinstance(max_cpus, int):
raise TypeError(f"`max_cpus` not of valid type: {type(max_cpus)}")
if n is None:
return max_cpus
try:
n = int(n)
except (TypeError, ValueError) as err:
_msg = f"`n` must be `int` or `int`-convertable `str`: {n!r} given."
raise SetupError(_msg) from err
if n < 1:
_msg = f"`n` is not positive, this is not possible: {n!r}"
raise SetupError(_msg)
if njobs is not None:
ncores = min(n, njobs, max_cpus)
log.info(
f"Selected {ncores} cores to process {njobs} jobs, with {max_cpus} "
"maximum available cores."
)
return ncores
log.debug(f"`njobs` not specified, evaluating initial value {n}...")
ncores = min(n, max_cpus)
log.debug(f"Selected {ncores} for a maximum of {max_cpus} CPUs")
return ncores
[docs]
def non_negative_int(
n: Any,
exception: type[Exception] = ValueError,
emsg: str = "`n` do not satisfies",
) -> int:
"""
Transform `n` in int and returns if `compare` evaluates to True.
Parameters
----------
n : int-convertable
Something that can be converted to int.
exception : Exception
The Exception to raise in case `n` is not a positive integer.
emsg : str
The error message to give to `exception`. May accept formatting
to pass `n`.
Raises
------
ValueError, TypeError
If `n` cannot be converted to `int`
"""
try:
n1 = int(n)
if n1 >= 0:
return n1
except Exception as e:
raise e
else:
# don't change to f-strings, .format has a purpose
raise exception(emsg.format(n))
[docs]
def recursive_dict_update(d: ParamMapT, u: ParamMap) -> ParamMapT:
"""
Update dictionary `d` according to `u` recursively.
https://stackoverflow.com/questions/3232943
Returns
-------
dict
A new dict object with updated key: values. The original dictionaries
are not modified.
"""
def _recurse(d_: ParamMapT, u_: ParamMap) -> ParamMapT:
for k, v in u_.items():
if isinstance(v, collections.abc.Mapping):
d_[k] = _recurse(d_.get(k, {}), v)
else:
d_[k] = deepcopy(v) # in case these are also lists
return d_
new = deepcopy(d)
_recurse(new, u)
return new
[docs]
def get_number_from_path_stem(path: FilePath) -> int:
"""
Extract tail number from path.
Examples
--------
>>> get_number_from_path_stem('src/file_1.pdb')
>>> 1
>>> get_number_from_path_stem('src/file_3.pdb')
>>> 3
>>> get_number_from_path_stem('file_1231.pdb')
>>> 1231
>>> get_number_from_path_stem('src/file11')
>>> 11
>>> get_number_from_path_stem('src/file_1234_1.pdb')
>>> 1
Parameters
----------
path : str or Path obj
The path to evaluate.
Returns
-------
int
The tail integer of the path.
"""
stem = Path(path).stem
number = re.findall(r"\d+", stem)[-1]
return int(number)
[docs]
def sort_numbered_paths(*paths: FilePathT) -> list[FilePathT]:
"""
Sort input paths to tail number.
If possible, sort criteria is provided by
:py:func:`get_number_from_path_stem`.
If paths do not have a numbered tag, sort paths alphabetically.
Parameters
----------
*inputs : str or pathlib.Path
Paths to files.
Returns
-------
list
The sorted pathlist. The original types are not modified. If
strings are given, strings are returns, if Paths are given
paths are returned.
"""
try:
return sorted(paths, key=get_number_from_path_stem)
except TypeError as err:
log.exception(err)
emsg = (
"Mind the packing *argument, input should be strings or Paths, "
"not a list."
)
raise TypeError(emsg)
except IndexError:
return sorted(paths, key=lambda x: Path(x).stem)
[docs]
@contextlib.contextmanager
def log_error_and_exit() -> Generator[None, None, None]:
"""Exit with exception."""
try:
yield
except Exception as err:
log.exception(err)
log.error(err)
log.error(
"An error has occurred, see log file. "
"And contact the developers if needed."
)
log.info(get_goodbye_help())
sys.exit(1)
[docs]
def recursive_convert_paths_to_strings(params: ParamMapT) -> ParamMapT:
"""
Convert paths to strings recursively over a dictionary.
Parameters
----------
params : dictionary
Returns
-------
dictionary
A copy of the original dictionary with paths converted to strings.
"""
params = deepcopy(params)
for param, value in params.items():
if isinstance(value, (Path, EmptyPath)):
params[param] = str(value)
elif isinstance(value, collections.abc.Mapping):
params[param] = recursive_convert_paths_to_strings(value)
elif isinstance(value, (tuple, list)):
for i, v in enumerate(value):
if isinstance(v, (Path, EmptyPath)):
value[i] = str(v)
params[param] = value
return params