Source code for snakemake.shell

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

from pathlib import Path
import _io
import sys
import os
import subprocess as sp
import inspect
import shutil
import stat
import tempfile
import threading

from snakemake.utils import format, argvquote, cmd_exe_quote
from snakemake.common import ON_WINDOWS, RULEFUNC_CONTEXT_MARKER
from snakemake.logging import logger
from snakemake.deployment import singularity
from snakemake.deployment.conda import Conda
from snakemake.exceptions import WorkflowError


__author__ = "Johannes Köster"

STDOUT = sys.stdout
if not isinstance(sys.stdout, _io.TextIOWrapper):
    # workaround for nosetest since it overwrites sys.stdout
    # in a strange way that does not work with Popen
    STDOUT = None


# There is a max length for a command executed as well as a maximum
# length for each argument passed to a command. The latter impacts us
# especially when doing `sh -c 'long script from user'`. On Linux, it's
# hardcoded in the kernel as 32 pages, or 128kB. On OSX it appears to be
# close to `getconf ARG_MAX`, about 253kb.
MAX_ARG_LEN = 16 * 4096 - 1


[docs] class shell: _process_args = {} _process_prefix = None _process_suffix = "" _win_command_prefix = "" _lock = threading.Lock() _processes = {} conda_block_conflicting_envvars = True
[docs] @classmethod def get_executable(cls): return cls._process_args.get("executable", None)
[docs] @classmethod def check_output(cls, cmd, **kwargs): executable = cls.get_executable() if ON_WINDOWS and executable: win_prefix = cls._get_win_command_prefix() cmd = f'"{executable}" {win_prefix} {argvquote(cmd)}' logger.debug(f"Executing: {cmd}") return sp.check_output(cmd, shell=False, executable=executable, **kwargs) else: return sp.check_output(cmd, shell=True, executable=executable, **kwargs)
[docs] @classmethod def executable(cls, cmd): if isinstance(cmd, Path): cmd = str(cmd) if cmd and not os.path.isabs(cmd): # always enforce absolute path cmd = shutil.which(cmd) if not cmd: raise WorkflowError( f"Cannot set default shell {cmd} because it is not available in your PATH." ) cls._process_args["executable"] = cmd logger.debug(f"Setting shell executable to {cmd}.")
@classmethod def _get_process_prefix(cls, shell_exec=None): shell_exec = cls._get_executable_name(shell_exec) if ( shell_exec == "bash" or (ON_WINDOWS and shell_exec == "bash.exe") ) and cls._process_prefix is None: return "set -euo pipefail; " else: return cls._process_prefix or "" @classmethod def _get_win_command_prefix(cls, use_default=False, shell_exec=None): assert ON_WINDOWS if use_default or (cls._win_command_prefix and shell_exec is None): # use whatever is the default return cls._win_command_prefix shell_exec = cls._get_executable_name(shell_exec) if shell_exec == "bash" or shell_exec == "bash.exe": return "-c" else: return "" @classmethod def _check_executable(cls, shell_exec=None): shell_exec = shell_exec or cls.get_executable() if shell_exec is not None: if ON_WINDOWS and shell_exec == r"C:\Windows\System32\bash.exe": raise WorkflowError( "Cannot use WSL bash.exe on Windows. Ensure that you have " "a usable bash.exe available on your path." ) if not os.path.isabs(shell_exec): path = shutil.which(shell_exec) if not path: raise WorkflowError( f"Cannot set shell to {shell_exec} because it is not " "available in your PATH." ) elif not os.path.exists(shell_exec): raise WorkflowError( f"Cannot set shell to {shell_exec} because it does not exist." ) @classmethod def _get_executable_name(cls, shell_exec=None): shell_exec = shell_exec or cls.get_executable() if shell_exec: return os.path.split(shell_exec)[-1].lower() else: return None
[docs] @classmethod def prefix(cls, prefix): cls._process_prefix = format(prefix, stepout=2)
[docs] @classmethod def suffix(cls, suffix): cls._process_suffix = format(suffix, stepout=2)
[docs] @classmethod def win_command_prefix(cls, cmd): """The command prefix used on windows when specifying a explicit shell executable. This would be "-c" for bash. Note: that if no explicit executable is set commands are executed with Popen(..., shell=True) which uses COMSPEC on windows where this is not needed. """ cls._win_command_prefix = cmd
[docs] @classmethod def kill(cls, jobid): with cls._lock: if jobid in cls._processes: cls._processes[jobid].kill() del cls._processes[jobid]
[docs] @classmethod def terminate(cls, jobid): with cls._lock: if jobid in cls._processes: cls._processes[jobid].terminate() del cls._processes[jobid]
[docs] @classmethod def cleanup(cls): with cls._lock: cls._processes.clear()
def __new__( cls, cmd, *args, iterable=False, read=False, bench_record=None, **kwargs ): if "stepout" in kwargs: raise KeyError("Argument stepout is not allowed in shell command.") if ON_WINDOWS and not cls.get_executable(): # If bash is not used on Windows quoting must be handled in a special way kwargs["quote_func"] = cmd_exe_quote cmd = format(cmd, *args, stepout=2, **kwargs) stdout = sp.PIPE if iterable or read else STDOUT close_fds = sys.platform != "win32" func_context = inspect.currentframe().f_back.f_locals if func_context.get(RULEFUNC_CONTEXT_MARKER): # If this comes from a rule, we expect certain information to be passed # implicitly via the rule func context, which is added here. context = func_context else: # Otherwise, context is just filled via kwargs. context = dict() # add kwargs to context (overwriting the locals of the caller) context.update(kwargs) jobid = context.get("jobid") if not context.get("is_shell") and jobid is not None: logger.shellcmd(cmd) conda_env = context.get("conda_env", None) conda_base_path = context.get("conda_base_path", None) container_img = context.get("container_img", None) env_modules = context.get("env_modules", None) shadow_dir = context.get("shadow_dir", None) resources = context.get("resources", {}) singularity_args = context.get("singularity_args", "") threads = context.get("threads", 1) shell_executable = resources.get("shell_exec") if shell_executable is not None: process_args = dict(cls._process_args) process_args["executable"] = shell_executable else: shell_executable = cls.get_executable() process_args = cls._process_args cls._check_executable(shell_executable) cmd = " ".join( (cls._get_process_prefix(shell_executable), cmd, cls._process_suffix) ).strip() # If the executor is the submit executor or the jobstep executor for the SLURM # backend, we do not want the environment modules to be activated: # if the rule requires a Python module, snakemake's environment might be # incompatible with the module's environment. if env_modules and "slurm" not in (item.filename for item in inspect.stack()): cmd = env_modules.shellcmd(cmd) logger.info(f"Activating environment modules: {env_modules}") if conda_env: if ON_WINDOWS and not cls.get_executable(): # If we use cmd.exe directly on windows we need to prepend batch activation script. cmd = Conda( container_img=container_img, prefix_path=conda_base_path ).shellcmd_win(conda_env, cmd) else: cmd = Conda( container_img=container_img, prefix_path=conda_base_path ).shellcmd(conda_env, cmd) tmpdir = None if len(cmd.replace("'", r"'\''")) + 2 > MAX_ARG_LEN: tmpdir = tempfile.mkdtemp(dir=".snakemake", prefix="shell_tmp.") script = os.path.join(os.path.abspath(tmpdir), "script.sh") with open(script, "w") as script_fd: print(cmd, file=script_fd) os.chmod(script, os.stat(script).st_mode | stat.S_IXUSR | stat.S_IRUSR) cmd = '"{}" "{}"'.format(cls.get_executable() or "/bin/sh", script) if container_img: cmd = singularity.shellcmd( container_img, cmd, singularity_args, envvars=None, shell_executable=shell_executable, container_workdir=shadow_dir, is_python_script=context.get("is_python_script", False), ) logger.info(f"Activating singularity image {container_img}") if conda_env: logger.info(f"Activating conda environment: {os.path.relpath(conda_env)}") tmpdir_resource = resources.get("tmpdir", None) # environment variable lists for linear algebra libraries taken from: # https://stackoverflow.com/a/53224849/2352071 # https://github.com/xianyi/OpenBLAS/tree/59243d49ab8e958bb3872f16a7c0ef8c04067c0a#setting-the-number-of-threads-using-environment-variables envvars = dict(os.environ) threads = str(threads) envvars["OMP_NUM_THREADS"] = threads envvars["GOTO_NUM_THREADS"] = threads envvars["OPENBLAS_NUM_THREADS"] = threads envvars["MKL_NUM_THREADS"] = threads envvars["VECLIB_MAXIMUM_THREADS"] = threads envvars["NUMEXPR_NUM_THREADS"] = threads if tmpdir_resource: envvars["TMPDIR"] = tmpdir_resource envvars["TMP"] = tmpdir_resource envvars["TEMPDIR"] = tmpdir_resource envvars["TEMP"] = tmpdir_resource if "additional_envvars" in kwargs: env = kwargs["additional_envvars"] if not isinstance(env, dict) or not all( isinstance(v, str) for v in env.values() ): raise WorkflowError( "Given environment variables for shell command have to be a dict of strings, " "but the following was provided instead:\n{}".format(env) ) envvars.update(env) if conda_env and cls.conda_block_conflicting_envvars: # remove envvars that conflict with conda for var in ["R_LIBS", "PYTHONPATH", "PERLLIB", "PERL5LIB"]: try: del envvars[var] except KeyError: pass use_shell = True if ON_WINDOWS and shell_executable: # If executable is set on Windows shell mode can not be used # and the executable should be prepended the command together # with a command prefix (e.g. -c for bash). use_shell = False win_prefix = cls._get_win_command_prefix( use_default=False, shell_exec=shell_executable ) cmd = '"{}" {} {}'.format( shell_executable, win_prefix, argvquote(cmd), ) proc = sp.Popen( cmd, bufsize=-1, shell=use_shell, stdout=stdout, universal_newlines=iterable or read or None, close_fds=close_fds, **process_args, env=envvars, ) if jobid is not None: with cls._lock: cls._processes[jobid] = proc ret = None if iterable: return cls.iter_stdout(proc, cmd, tmpdir) if read: ret = proc.stdout.read() if bench_record is not None: from snakemake.benchmark import benchmarked with benchmarked(proc.pid, bench_record): retcode = proc.wait() else: retcode = proc.wait() if tmpdir: shutil.rmtree(tmpdir) if jobid is not None: with cls._lock: try: del cls._processes[jobid] except KeyError: pass if retcode: raise sp.CalledProcessError(retcode, cmd) return ret
[docs] @staticmethod def iter_stdout(proc, cmd, tmpdir): for l in proc.stdout: yield l[:-1] retcode = proc.wait() if tmpdir: shutil.rmtree(tmpdir) if retcode: raise sp.CalledProcessError(retcode, cmd)
# set bash as default shell on posix compatible OS if os.name == "posix": if not shutil.which("bash"): logger.warning( "Cannot set bash as default shell because it is not " "available in your PATH. Falling back to sh." ) if not shutil.which("sh"): logger.warning( "Cannot fall back to sh since it seems to be not " "available on this system. Using whatever is " "defined as default." ) else: shell.executable("sh") else: shell.executable("bash") elif ON_WINDOWS: shell.executable(None)