Source code for snakemake.deployment.conda

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

import os
from pathlib import Path
import re
from typing import Union
from snakemake.sourcecache import (
    LocalGitFile,
    LocalSourceFile,
    SourceFile,
    infer_source_file,
)
import subprocess
import tempfile
import hashlib
import shutil
import json
from glob import glob
import tarfile
import zipfile
import uuid
from enum import Enum
import threading
import shutil
from abc import ABC, abstractmethod


from snakemake.exceptions import CreateCondaEnvironmentException, WorkflowError
from snakemake.logging import logger
from snakemake.common import (
    copy_permission_safe,
    is_local_file,
    parse_uri,
    ON_WINDOWS,
)
from snakemake.deployment import singularity, containerize
from snakemake.io import (
    IOFile,
    apply_wildcards,
    contains_wildcard,
    _IOFile,
)
from snakemake_interface_common.utils import lazy_property


MIN_CONDA_VER = "24.7.1"


[docs] def get_env_setup_done_flag_file(env_path: Path) -> Path: return env_path.with_suffix(".env_setup_done")
[docs] class CondaCleanupMode(Enum): tarballs = "tarballs" cache = "cache" def __str__(self): return self.value
[docs] class Env: """Conda environment from a given specification file.""" def __init__( self, workflow, env_file=None, env_name=None, env_dir=None, envs_dir=None, container_img=None, cleanup=None, ): self.file = env_file if env_file is not None: self.file = infer_source_file(env_file) assert env_name is None assert env_dir is None self.name = env_name if env_name is not None: assert env_file is None assert env_dir is None self.dir = env_dir if env_dir is not None: assert env_file is None assert env_name is None self.workflow = workflow self._container_img = container_img self._envs_dir = envs_dir or ( containerize.CONDA_ENV_PATH if self.is_containerized else workflow.persistence.conda_env_path ) self._hash = None self._content_hash = None self._content = None self._content_deploy = None self._content_pin = None self._path = None self._archive_file = None self._cleanup = cleanup self._singularity_args = workflow.deployment_settings.apptainer_args @property def is_externally_managed(self): """Return True if the environment is managed by the user.""" return self.name is not None or self.dir is not None @lazy_property def conda(self): return Conda(container_img=self._container_img, check=True) def _path_or_uri_prefix(self): prefix = self.file.get_path_or_uri() if prefix.endswith(".yaml") or prefix.endswith(".yml"): prefix = prefix.rsplit(".", 1)[0] return prefix else: return None @lazy_property def pin_file(self): return self._get_aux_file( f".{self.conda.platform}.pin.txt", "Omitting search for pin file " "(https://snakemake.readthedocs.io/en/stable/snakefiles/" "deployment.html#freezing-environments-to-exactly-pinned-packages).", ) @lazy_property def post_deploy_file(self): return self._get_aux_file( ".post-deploy.sh", "Omitting search for post-deploy script " "(https://snakemake.readthedocs.io/en/stable/snakefiles/" "deployment.html#providing-post-deployment-scripts).", ) def _get_aux_file(self, suffix: str, omit_msg: str): if self.file: prefix = self._path_or_uri_prefix() # TODO handle LocalGitFile properly if prefix is None: logger.warning( f"Conda environment file {self.file.get_path_or_uri()} does not end " f"on .yaml or .yml. {omit_msg}" ) return None aux_file = f"{prefix}{suffix}" aux_file = infer_source_file(aux_file) if self.workflow.sourcecache.exists(aux_file): return aux_file else: return None else: return None def _get_content(self): if self.name or self.dir: from snakemake.shell import shell try: content = shell.check_output( f"conda env export {self.address_argument}", stderr=subprocess.STDOUT, text=True, ) except subprocess.CalledProcessError as e: raise WorkflowError( f"Error exporting conda environment {self.address_argument}:\n{e.output}" ) return content.encode() else: return self.workflow.sourcecache.open(self.file, "rb").read() def _get_content_deploy(self): self.check_is_file_based() if self.post_deploy_file: return self.workflow.sourcecache.open(self.post_deploy_file, "rb").read() return None def _get_content_pin(self): self.check_is_file_based() if self.pin_file: return self.workflow.sourcecache.open(self.pin_file, "rb").read() return None @property def _env_archive_dir(self): return self.workflow.persistence.conda_env_archive_path @property def container_img_url(self): return self._container_img.url if self._container_img else None @property def content(self): if self._content is None: self._content = self._get_content() return self._content @property def content_deploy(self): if self._content_deploy is None: self._content_deploy = self._get_content_deploy() return self._content_deploy @property def content_pin(self): if self._content_pin is None: self._content_pin = self._get_content_pin() return self._content_pin @property def hash(self): if self._hash is None: if self.is_containerized: self._hash = self.content_hash else: self._hash = self._get_hash( include_location=True, include_container_img=True ) return self._hash @property def content_hash(self): if self._content_hash is None: self._content_hash = self._get_hash( include_location=False, include_container_img=False ) return self._content_hash def _get_hash(self, include_location: bool, include_container_img: bool) -> str: md5hash = hashlib.md5() if self.name: md5hash.update(self.name.encode()) elif self.dir: md5hash.update(self.dir.encode()) else: if include_location: # Include the absolute path of the target env dir into the hash. # By this, moving the working directory around automatically # invalidates all environments. This is necessary, because binaries # in conda environments can contain hardcoded absolute RPATHs. env_dir = os.path.realpath(self._envs_dir) md5hash.update(env_dir.encode()) if include_container_img and self._container_img: md5hash.update(self._container_img.url.encode()) content_deploy = self.content_deploy if content_deploy: md5hash.update(content_deploy) content_pin = self.content_pin if content_pin: md5hash.update(content_pin) md5hash.update(self.content) return md5hash.hexdigest() @property def is_containerized(self): if not self._container_img: return False return self._container_img.is_containerized
[docs] def check_is_file_based(self): assert ( self.file is not None ), "bug: trying to access conda env file based functionality for named environment"
@property def address(self): """Path to directory of the conda environment. First tries full hash, if it does not exist, (8-prefix) is used as default. """ if self.name: return self.name elif self.dir: return self.dir else: env_dir = self._envs_dir get_path = lambda h: os.path.join(env_dir, h) hash_candidates = [ self.hash[:8], self.hash, self.hash + "_", # activate no-shortcuts behavior (so that no admin rights are needed on win) ] # [0] is the old fallback hash (shortened) exists = [os.path.exists(get_path(h)) for h in hash_candidates] if self.is_containerized: return get_path(hash_candidates[1]) for candidate, candidate_exists in zip(hash_candidates, exists): if candidate_exists or candidate == hash_candidates[-1]: # exists or it is the last (i.e. the desired one) return get_path(candidate) @property def address_argument(self): if self.name: return f"--name '{self.address}'" else: return f"--prefix '{self.address}'" @property def archive_file(self): """Path to archive of the conda environment, which may or may not exist.""" if self._archive_file is None: self._archive_file = os.path.join(self._env_archive_dir, self.content_hash) return self._archive_file
[docs] def create_archive(self): """Create self-contained archive of environment.""" from snakemake.shell import shell # importing requests locally because it interferes with instantiating conda environments import requests self.check_is_file_based() env_archive = self.archive_file if os.path.exists(env_archive): return env_archive try: # Download logger.info( "Downloading packages for conda environment {}...".format( self.file.get_path_or_uri() ) ) os.makedirs(env_archive, exist_ok=True) try: out = shell.check_output( f"conda list --explicit {self.address_argument}", stderr=subprocess.STDOUT, text=True, ) logger.debug(out) except subprocess.CalledProcessError as e: raise WorkflowError("Error exporting conda packages:\n" + e.output) with open(os.path.join(env_archive, "packages.txt"), "w") as pkg_list: for l in out.split("\n"): if l and not l.startswith("#") and not l.startswith("@"): pkg_url = l logger.info(pkg_url) parsed = parse_uri(pkg_url) pkg_name = os.path.basename(parsed.uri_path) # write package name to list print(pkg_name, file=pkg_list) # download package pkg_path = os.path.join(env_archive, pkg_name) with open(pkg_path, "wb") as copy: r = requests.get(pkg_url) r.raise_for_status() copy.write(r.content) try: if pkg_path.endswith(".conda"): assert zipfile.ZipFile(pkg_path).testzip() is None else: tarfile.open(pkg_path) except: raise WorkflowError( f"Package is invalid tar/zip archive: {pkg_url}" ) except ( requests.exceptions.ChunkedEncodingError, requests.exceptions.HTTPError, ) as e: shutil.rmtree(env_archive) raise WorkflowError(f"Error downloading conda package {pkg_url}.") except BaseException as e: shutil.rmtree(env_archive) raise e return env_archive
[docs] def execute_deployment_script(self, env_file, deploy_file): """Execute post-deployment script if present""" from snakemake.shell import shell if ON_WINDOWS: raise WorkflowError( "Post deploy script {} provided for conda env {} but unsupported on windows.".format( deploy_file, env_file ) ) logger.info( "Running post-deploy script {}...".format( os.path.relpath(path=deploy_file, start=os.getcwd()) ) ) # Determine interpreter from shebang or use sh as default. interpreter = "sh" with open(deploy_file, "r") as f: first_line = next(iter(f)) if first_line.startswith("#!"): interpreter = first_line[2:].strip() shell.check_output( self.conda.shellcmd(self.address, f"{interpreter} {deploy_file}"), stderr=subprocess.STDOUT, text=True, )
[docs] def create(self, dryrun=False): """Create the conda environment.""" from snakemake.shell import shell self.check_is_file_based() # Read env file and create hash. env_file = self.file deploy_file = None pin_file = None tmp_env_file = None tmp_deploy_file = None tmp_pin_file = None if not isinstance(env_file, LocalSourceFile) or isinstance( env_file, LocalGitFile ): with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml") as tmp: # write to temp file such that conda can open it tmp.write(self.content) env_file = tmp.name tmp_env_file = tmp.name if self.post_deploy_file: with tempfile.NamedTemporaryFile( delete=False, suffix=".post-deploy.sh" ) as tmp: # write to temp file such that conda can open it tmp.write(self.content_deploy) deploy_file = tmp.name tmp_deploy_file = tmp.name if self.pin_file and not dryrun: with tempfile.NamedTemporaryFile(delete=False, suffix="pin.txt") as tmp: tmp.write(self.content_pin) pin_file = tmp.name tmp_pin_file = tmp.name else: env_file = env_file.get_path_or_uri() deploy_file = self.post_deploy_file if not dryrun: pin_file = self.pin_file env_path = self.address if self.is_containerized: if not dryrun: try: shell.check_output( singularity.shellcmd( self._container_img.path, f"[ -d '{env_path}' ]", args=self._singularity_args, envvars=self.get_singularity_envvars(), quiet=True, ), stderr=subprocess.PIPE, text=True, ) except subprocess.CalledProcessError as e: raise WorkflowError( "Unable to find environment in container image. " "Maybe a conda environment was modified without containerizing again " "(see snakemake --containerize)?\nDetails:\n{}\n{}".format( e, e.stderr ) ) return env_path else: # env should be present in the container return env_path setup_done_flag = get_env_setup_done_flag_file(Path(env_path)) # Check for broken environment if os.path.exists(env_path) and not setup_done_flag.exists(): if dryrun: logger.info( "Incomplete Conda environment {} will be recreated.".format( self.file.simplify_path() ) ) else: logger.info( "Removing incomplete Conda environment {}...".format( self.file.simplify_path() ) ) shutil.rmtree(env_path, ignore_errors=True) # Create environment if not already present. if not os.path.exists(env_path): if dryrun: logger.info( "Conda environment {} will be created.".format( self.file.simplify_path() ) ) return env_path logger.info(f"Creating conda environment {self.file.simplify_path()}...") env_archive = self.archive_file try: # Check if env archive exists. Use that if present. if os.path.exists(env_archive): logger.info("Installing archived conda packages.") pkg_list = os.path.join(env_archive, "packages.txt") if os.path.exists(pkg_list): # read packages in correct order # this is for newer env archives where the package list # was stored packages = [ os.path.join(env_archive, pkg.rstrip()) for pkg in open(pkg_list) ] else: # guess order packages = glob(os.path.join(env_archive, "*.tar.bz2")) # install packages manually from env archive cmd = " ".join( [ "conda", "create", "--quiet", "--no-shortcuts" if ON_WINDOWS else "", "--yes", "--no-default-packages", f"--prefix '{env_path}'", ] + packages ) if self._container_img: cmd = singularity.shellcmd( self._container_img.path, cmd, args=self._singularity_args, envvars=self.get_singularity_envvars(), ) out = shell.check_output(cmd, stderr=subprocess.STDOUT, text=True) else: def create_env(env_file, filetype="yaml"): # Copy env file to env_path (because they can be on # different volumes and singularity should only mount one). # In addition, this allows to immediately see what an # environment in .snakemake/conda contains. target_env_file = env_path + f".{filetype}" copy_permission_safe(env_file, target_env_file) logger.info("Downloading and installing remote packages.") strict_priority = ( ["conda config --set channel_priority strict &&"] if self._container_img else [] ) subcommand = ["conda"] yes_flag = ["--yes"] if filetype == "yaml": subcommand.append("env") yes_flag = [] cmd = ( strict_priority + subcommand + [ "create", "--quiet", "--no-default-packages", f'--file "{target_env_file}"', f'--prefix "{env_path}"', ] + yes_flag ) cmd = " ".join(cmd) if self._container_img: cmd = singularity.shellcmd( self._container_img.path, cmd, args=self._singularity_args, envvars=self.get_singularity_envvars(), ) out = shell.check_output( cmd, stderr=subprocess.STDOUT, text=True ) # cleanup if requested if self._cleanup is CondaCleanupMode.tarballs: logger.info("Cleaning up conda package tarballs.") shell.check_output("conda clean -y --tarballs", text=True) elif self._cleanup is CondaCleanupMode.cache: logger.info( "Cleaning up conda package tarballs and package cache." ) shell.check_output( "conda clean -y --tarballs --packages", text=True ) return out if pin_file is not None: try: logger.info( f"Using pinnings from {self.pin_file.get_path_or_uri()}." ) out = create_env(pin_file, filetype="pin.txt") except subprocess.CalledProcessError as e: # remove potential partially installed environment shutil.rmtree(env_path, ignore_errors=True) advice = "" if isinstance(self.file, LocalSourceFile): advice = ( " If that works, make sure to update the pin file with " f"'snakedeploy pin-conda-env {self.file.get_path_or_uri()}'." ) logger.warning( f"Failed to install conda environment from pin file ({self.pin_file.get_path_or_uri()}). " f"Trying regular environment definition file.{advice}" ) out = create_env(env_file, filetype="yaml") else: out = create_env(env_file, filetype="yaml") # Execute post-deploy script if present if deploy_file: target_deploy_file = env_path + ".post-deploy.sh" copy_permission_safe(deploy_file, target_deploy_file) self.execute_deployment_script(env_file, target_deploy_file) # Touch "done" flag file with open(setup_done_flag, "w") as _: pass logger.debug(out) logger.info( f"Environment for {self.file.get_path_or_uri()} created (location: {os.path.relpath(env_path)})" ) except subprocess.CalledProcessError as e: # remove potential partially installed environment shutil.rmtree(env_path, ignore_errors=True) raise CreateCondaEnvironmentException( f"Could not create conda environment from {env_file}:\nCommand:\n{e.cmd}\nOutput:\n{e.output}" ) if tmp_env_file: # temporary file was created os.remove(tmp_env_file) if tmp_deploy_file: os.remove(tmp_deploy_file) if tmp_pin_file: os.remove(tmp_pin_file) return env_path
[docs] @classmethod def get_singularity_envvars(self): return {"CONDA_PKGS_DIRS": f"/tmp/conda/{uuid.uuid4()}"}
def __hash__(self): # this hash is only for object comparison, not for env paths if self.name: return hash(self.name) elif self.dir: return hash(self.dir) else: return hash(self.file) def __eq__(self, other): if isinstance(other, Env): if self.name: return self.name == other.name elif self.dir: return self.dir == other.dir else: return self.file == other.file return False
[docs] class Conda: instances = dict() lock = threading.Lock() def __new__(cls, container_img=None, prefix_path=None, check=False): with cls.lock: if container_img not in cls.instances: inst = super().__new__(cls) cls.instances[container_img] = inst return inst else: return cls.instances[container_img] def __init__(self, container_img=None, prefix_path=None, check=False): if not self.is_initialized: # avoid superfluous init calls from snakemake.deployment import singularity from snakemake.shell import shell if isinstance(container_img, singularity.Image): container_img = container_img.path self.container_img = container_img try: self.info = json.loads( shell.check_output(self._get_cmd("conda info --json"), text=True) ) except subprocess.CalledProcessError as e: raise WorkflowError( "Error running conda info. " f"Is conda installed and accessible? Error: {e}" ) if prefix_path is None or container_img is not None: self.prefix_path = self.info["conda_prefix"] else: self.prefix_path = prefix_path self.platform = self.info["platform"] # check conda installation if check: self._check() @property def is_initialized(self): return hasattr(self, "prefix_path") def _get_cmd(self, cmd): if self.container_img: return singularity.shellcmd(self.container_img, cmd, quiet=True) return cmd def _check(self): from snakemake.shell import shell frontend = "conda" # Use type here since conda now is a function. # type allows to check for both functions and regular commands. if not ON_WINDOWS or shell.get_executable(): locate_cmd = f"type {frontend}" else: locate_cmd = f"where {frontend}" try: shell.check_output( self._get_cmd(locate_cmd), stderr=subprocess.STDOUT, text=True ) except subprocess.CalledProcessError as e: if self.container_img: msg = ( f"The '{frontend}' command is not " "available inside " "your singularity container " "image. Snakemake mounts " "your conda installation " "into singularity. " "Sometimes, this can fail " "because of shell restrictions. " "It has been tested to work " "with docker://ubuntu, but " "it e.g. fails with " "docker://bash " ) else: msg = ( f"The '{frontend}' command is not " "available in the " f"shell {shell.get_executable()} that will be " "used by Snakemake. You have " "to ensure that it is in your " "PATH, e.g., first activating " "the conda base environment " "with `conda activate base`." ) raise CreateCondaEnvironmentException(msg) try: self._check_version() self._check_condarc() except subprocess.CalledProcessError as e: raise CreateCondaEnvironmentException( "Unable to check conda installation:\n" + e.stderr.decode() ) def _check_version(self): from snakemake.shell import shell from packaging.version import Version version = shell.check_output( self._get_cmd("conda --version"), stderr=subprocess.PIPE, text=True ) version_matches = re.findall(r"\d+\.\d+\.\d+", version) if len(version_matches) != 1: raise WorkflowError( f"Unable to determine conda version. 'conda --version' returned {version}" ) else: version = version_matches[0] if Version(version) < Version(MIN_CONDA_VER): raise CreateCondaEnvironmentException( f"Conda must be version {MIN_CONDA_VER} or later, found version {version}. " "Please update conda to the latest version. " "Note that you can also install conda into the snakemake environment " "without modifying your main conda installation." ) def _check_condarc(self): if self.container_img: # Do not check for strict priorities when running conda in an image # Instead, we set priorities to strict ourselves in the image. return from snakemake.shell import shell res = json.loads( shell.check_output( self._get_cmd("conda config --get channel_priority --json"), text=True, stderr=subprocess.PIPE, ) ) if res["get"].get("channel_priority") != "strict": logger.warning( "Your conda installation is not configured to use strict channel priorities. " "This is however important for having robust and correct environments (for details, " "see https://conda-forge.org/docs/user/tipsandtricks.html). " "Please consider to configure strict priorities by executing " "'conda config --set channel_priority strict'." )
[docs] def bin_path(self): if ON_WINDOWS: return os.path.join(self.prefix_path, "Scripts") else: return os.path.join(self.prefix_path, "bin")
[docs] def shellcmd(self, env_address, cmd): # get path to activate script activate = os.path.join(self.bin_path(), "activate") if ON_WINDOWS: activate = activate.replace("\\", "/") env_address = env_address.replace("\\", "/") return f"source {activate} '{env_address}'; {cmd}"
[docs] def shellcmd_win(self, env_address, cmd): """Prepend the windows activate bat script.""" # get path to activate script activate = os.path.join(self.bin_path(), "activate.bat").replace("\\", "/") env_address = env_address.replace("\\", "/") return f'"{activate}" "{env_address}"&&{cmd}'
[docs] class CondaEnvSpec(ABC):
[docs] @abstractmethod def apply_wildcards(self, wildcards): ...
[docs] @abstractmethod def get_conda_env(
self, workflow, envs_dir=None, container_img=None, cleanup=None ): ...
[docs] @abstractmethod def check(self): ...
@property def is_file(self): return False @property @abstractmethod def contains_wildcard(self): ... @abstractmethod def __hash__(self): ... @abstractmethod def __eq__(self, other): ...
[docs] class CondaEnvFileSpec(CondaEnvSpec): def __init__(self, filepath, rule=None): if isinstance(filepath, SourceFile): self.file = IOFile(str(filepath.get_path_or_uri()), rule=rule) elif isinstance(filepath, _IOFile): self.file = filepath else: self.file = IOFile(filepath, rule=rule)
[docs] def apply_wildcards(self, wildcards, rule): filepath = self.file.apply_wildcards(wildcards) if is_local_file(filepath): # Normalize 'file:///my/path.yml' to '/my/path.yml' filepath = parse_uri(filepath).uri_path return CondaEnvFileSpec(filepath, rule)
[docs] def check(self): self.file.check()
[docs] def get_conda_env(self, workflow, envs_dir=None, container_img=None, cleanup=None): return Env( workflow, env_file=self.file, envs_dir=envs_dir, container_img=container_img, cleanup=cleanup, )
@property def is_file(self): return True @property def contains_wildcard(self): return contains_wildcard(self.file) def __hash__(self): return hash(self.file) def __eq__(self, other): return self.file == other.file
[docs] class CondaEnvDirSpec(CondaEnvSpec): def __init__(self, path, rule=None): if isinstance(path, SourceFile): self.path = IOFile(str(path.get_path_or_uri()), rule=rule) elif isinstance(path, _IOFile): self.path = path else: self.path = IOFile(path, rule=rule)
[docs] def apply_wildcards(self, wildcards, rule): filepath = self.path.apply_wildcards(wildcards) if is_local_file(filepath): # Normalize 'file:///my/path.yml' to '/my/path.yml' filepath = parse_uri(filepath).uri_path return CondaEnvDirSpec(filepath, rule)
[docs] def check(self): pass
[docs] def get_conda_env(self, workflow, envs_dir=None, container_img=None, cleanup=None): return Env( workflow, env_dir=self.path, envs_dir=envs_dir, container_img=container_img, cleanup=cleanup, )
@property def is_file(self): return True @property def contains_wildcard(self): return contains_wildcard(self.path) def __hash__(self): return hash(self.path) def __eq__(self, other): return self.path == other.file
[docs] class CondaEnvNameSpec(CondaEnvSpec): def __init__(self, name: str): self.name = name
[docs] def apply_wildcards(self, wildcards, _): return CondaEnvNameSpec(apply_wildcards(self.name, wildcards))
[docs] def get_conda_env(self, workflow, envs_dir=None, container_img=None, cleanup=None): return Env( workflow, env_name=self.name, envs_dir=envs_dir, container_img=container_img, cleanup=cleanup, )
[docs] def check(self): # not a file, nothing to check here pass
@property def contains_wildcard(self): return contains_wildcard(self.name) def __hash__(self): return hash(self.name) def __eq__(self, other): return self.name == other.name
[docs] class CondaEnvSpecType(Enum): FILE = "file" NAME = "name" DIR = "dir"
[docs] @classmethod def from_spec(cls, spec: Union[str, SourceFile, Path]): if isinstance(spec, SourceFile): if isinstance(spec, LocalSourceFile): spec = spec.get_path_or_uri() else: spec = spec.get_filename() elif isinstance(spec, Path): spec = str(spec) if spec.endswith(".yaml") or spec.endswith(".yml"): return cls.FILE elif is_local_file(spec) and os.path.isdir(spec): return cls.DIR else: return cls.NAME