Source code for snakemake.path_modifier
__authors__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"
import os
from snakemake.exceptions import WorkflowError
from snakemake.io import is_callable, is_flagged, AnnotatedString, flag, get_flag_value
PATH_MODIFIER_FLAG = "path_modified"
[docs]
class PathModifier:
[docs]
def __init__(self, replace_prefix: dict, prefix: str, workflow):
self.skip_properties = set()
self.workflow = workflow
self.trie = None
self.prefix = None
assert not (prefix and replace_prefix)
if prefix:
if not prefix.endswith("/"):
prefix += "/"
self.prefix = prefix
if replace_prefix:
import datrie
self.trie = datrie.Trie(
"".join(set(char for prefix in replace_prefix for char in prefix))
)
for prefix, replacement in replace_prefix.items():
self.trie[prefix] = replacement
def modify(self, path, property=None):
if get_flag_value(path, PATH_MODIFIER_FLAG) is self:
# Path has been modified before and is reused now, no need to modify again.
return path
modified_path = self.apply_default_storage(self.replace_prefix(path, property))
if modified_path == path:
# nothing has changed
return path
# Important, update with previous flags in case of AnnotatedString #596
if hasattr(path, "flags"):
if not hasattr(modified_path, "flags"):
modified_path = AnnotatedString(modified_path)
modified_path.flags.update(path.flags)
if is_flagged(modified_path, "multiext"):
modified_path.flags["multiext"] = self.apply_default_storage(
self.replace_prefix(modified_path.flags["multiext"], property)
)
# Flag the path as modified and return.
modified_path = flag(modified_path, PATH_MODIFIER_FLAG, self)
return modified_path
def replace_prefix(self, path, property=None):
if (self.trie is None and self.prefix is None) or (
property in self.skip_properties
or os.path.isabs(path)
or path.startswith("..")
or is_flagged(path, "storage_object")
or is_callable(path)
):
# no replacement
return path
if self.trie is not None:
prefixes = self.trie.prefix_items(str(path))
if len(prefixes) > 1:
# ambiguous prefixes
raise WorkflowError(
"Multiple prefixes ({}) match the path {}. Make sure that the replace_prefix statement "
"in your module definition does not yield ambiguous matches.".format(
", ".join(prefix[0] for prefix in prefixes), path
)
)
elif prefixes:
# replace prefix
prefix, replacement = prefixes[0]
return replacement + path[len(prefix) :]
else:
# no matching prefix
return path
else:
# prefix case
return self.prefix + path
def apply_default_storage(self, path):
"""Apply the defined default remote provider to the given path and return the updated _IOFile.
Asserts that default remote provider is defined.
"""
from snakemake.storage import flag_with_storage_object
def is_annotated_callable(value):
if isinstance(value, AnnotatedString):
return bool(value.callable)
provider = self.workflow.storage_settings.default_storage_provider
if (
provider is None
or is_flagged(path, "storage_object")
or is_flagged(path, "local")
or is_flagged(path, "sourcecache_entry")
or is_annotated_callable(path)
):
# no default remote needed
return path
# This will convert any AnnotatedString to str
prefix = self.workflow.storage_settings.default_storage_prefix
if prefix and not prefix.endswith("/"):
prefix = f"{prefix}/"
query = f"{prefix}{os.path.normpath(path)}"
storage_object = self.workflow.storage_registry.default_storage_provider.object(
query
)
validation_res = storage_object.is_valid_query()
if not validation_res:
raise WorkflowError(
f"Error applying default storage provider {provider}. "
"Make sure to provide a valid --default-storage-prefix "
"(see https://snakemake.github.io/snakemake-plugin-catalog/plugins/"
"storage/{provider}.html). {validation_res}",
)
return flag_with_storage_object(path, storage_object)
@property
def modifies_prefixes(self):
return self.trie is not None