Source code for snakemake.deployment.singularity

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = ""
__license__ = "MIT"

from pathlib import Path
import subprocess
import shutil
import os
import hashlib

from snakemake.common import (
from snakemake.exceptions import WorkflowError
from snakemake.logging import logger
from snakemake_interface_common.utils import lazy_property

SNAKEMAKE_MOUNTPOINT = "/mnt/snakemake"

[docs] def get_snakemake_searchpath_mountpoints(): paths = get_snakemake_searchpaths() base = Path("/mnt/snakemake_searchpaths") return [str(base / f"item_{i}") for i in range(len(paths))]
[docs] class Image:
[docs] def __init__(self, url, dag, is_containerized): if " " in url: raise WorkflowError("Invalid singularity image URL containing whitespace.") self.singularity = Singularity() self.url = url self._img_dir = dag.workflow.persistence.container_img_path self.is_containerized = is_containerized
@property def is_local(self): return is_local_file(self.url) @lazy_property def hash(self): md5hash = hashlib.md5() md5hash.update(self.url.encode()) return md5hash.hexdigest() def pull(self, dryrun=False): self.singularity.check() if self.is_local: return if dryrun:"Singularity image {self.url} will be pulled.") return logger.debug(f"Singularity image location: {self.path}") if not os.path.exists(self.path):"Pulling singularity image {self.url}.") try: p = subprocess.check_output( [ "singularity", "pull", "--name", f"{self.hash}.simg", self.url, ], cwd=self._img_dir, stderr=subprocess.STDOUT, ) except subprocess.CalledProcessError as e: raise WorkflowError( "Failed to pull singularity image " "from {}:\n{}".format(self.url, e.stdout.decode()) ) @property def path(self): if self.is_local: return parse_uri(self.url).uri_path return os.path.join(self._img_dir, self.hash) + ".simg" def __hash__(self): return hash(self.hash) def __eq__(self, other): return self.url == other.url
[docs] def shellcmd( img_path, cmd, args="", quiet=False, envvars=None, shell_executable=None, container_workdir=None, is_python_script=False, ): """Execute shell command inside singularity container given optional args and environment variables to be passed.""" if envvars: envvars = " ".join(f"SINGULARITYENV_{k}={v}" for k, v in envvars.items()) else: envvars = "" if shell_executable is None: shell_executable = "sh" else: # Ensure to just use the name of the executable, not a path, # because we cannot be sure where it is located in the container. shell_executable = os.path.split(shell_executable)[-1] if is_python_script: # mount host snakemake module into container args += " ".join( f" --bind {repr(searchpath)}:{repr(mountpoint)}" for searchpath, mountpoint in zip( get_snakemake_searchpaths(), get_snakemake_searchpath_mountpoints() ) ) if container_workdir: args += f" --pwd {repr(container_workdir)}" cmd = "{} singularity {} exec --home {} {} {} {} -c '{}'".format( envvars, "--quiet --silent" if quiet else "", repr(os.getcwd()), args, img_path, shell_executable, cmd.replace("'", r"'\''"), ) logger.debug(cmd) return cmd
[docs] class Singularity: instance = None def __new__(cls): if cls.instance is not None: return cls.instance else: inst = super().__new__(cls) cls.instance = inst return inst
[docs] def __init__(self): self.checked = False self._version = None
@property def version(self): assert ( self._version is not None ), "bug: singularity version accessed before check() has been called" return self._version def parseversion(self, raw_version): import packaging raw_version = raw_version.rsplit(" ", 1)[-1] if raw_version.startswith("v"): raw_version = raw_version[1:] parsed_version = None trimend = len(raw_version) while parsed_version is None: try: parsed_version = packaging.version.Version(raw_version[:trimend]) except packaging.version.InvalidVersion: trimend = trimend - 1 if trimend == 0: raise WorkflowError( f"Apptainer/Singularity version cannot be parsed: {raw_version}" ) return parsed_version def check(self): from packaging.version import parse if not self.checked: if not shutil.which("singularity"): raise WorkflowError( "The apptainer or singularity command has to be " "available in order to use apptainer/singularity " "integration." ) try: v = subprocess.check_output( ["singularity", "--version"], stderr=subprocess.PIPE ).decode() except subprocess.CalledProcessError as e: raise WorkflowError( f"Failed to get singularity version:\n{e.stderr.decode()}" ) if v.startswith("apptainer"): if self.parseversion(v) < parse("1.0.0"): raise WorkflowError("Minimum apptainer version is 1.0.0.") elif self.parseversion(v) < parse("2.4.1"): raise WorkflowError("Minimum singularity version is 2.4.1.") self._version = v