Source code for snakemake.notebook

from abc import abstractmethod
import os
from pathlib import Path
import tempfile
import re

from snakemake.exceptions import WorkflowError
from snakemake.script import get_source, ScriptBase, PythonScript, RScript
from snakemake.logging import logger
from snakemake.common import is_local_file
from snakemake.common import ON_WINDOWS
from snakemake.sourcecache import SourceCache, infer_source_file
from snakemake.utils import format

KERNEL_STARTED_RE = re.compile(r"Kernel started: (?P<kernel_id>\S+)")
KERNEL_SHUTDOWN_RE = re.compile(r"Kernel shutdown: (?P<kernel_id>\S+)")

[docs] def get_cell_sources(source): import nbformat nb = nbformat.reads(source, as_version=nbformat.NO_CONVERT) return [cell["source"] for cell in nb["cells"]]
[docs] class JupyterNotebook(ScriptBase): editable = True def draft(self): import nbformat preamble = self.get_preamble() nb = nbformat.v4.new_notebook() self.insert_preamble_cell(preamble, nb) nb["cells"].append(nbformat.v4.new_code_cell("# start coding here")) nb["metadata"] = {"language_info": {"name": self.get_language_name()}} os.makedirs(os.path.dirname(self.local_path), exist_ok=True) with open(self.local_path, "wb") as out: out.write(nbformat.writes(nb).encode()) def draft_and_edit(self, listen): self.draft() self.source = open(self.local_path).read() self.evaluate(edit=listen) def write_script(self, preamble, fd): import nbformat nb = nbformat.reads(self.source, as_version=nbformat.NO_CONVERT) self.remove_preamble_cell(nb) self.insert_preamble_cell(preamble, nb) fd.write(nbformat.writes(nb).encode()) def execute_script(self, fname, edit=None): import nbformat fname_out = self.log.get("notebook", None) if fname_out is None or edit: output_parameter = "" else: fname_out = os.path.join(os.getcwd(), fname_out) output_parameter = "--output {fname_out:q}" with tempfile.TemporaryDirectory() as tmp: if edit is not None: assert not edit.draft_only"Opening notebook for editing.") cmd = ( "jupyter notebook --browser ':' --no-browser --log-level ERROR --ip {edit.ip} --port {edit.port} " "--NotebookApp.quit_button=True {{fname:q}}".format(edit=edit) ) else: cmd = ( "jupyter-nbconvert --log-level ERROR --execute {output_parameter} " "--to notebook --ExecutePreprocessor.timeout=-1 {{fname:q}}".format( output_parameter=output_parameter ) ) if ON_WINDOWS: fname = fname.replace("\\", "/") fname_out = fname_out.replace("\\", "/") if fname_out else fname_out self._execute_cmd( cmd, fname_out=fname_out, fname=fname, additional_envvars={"IPYTHONDIR": tmp}, ) if edit:"Saving modified notebook.") nb =, as_version=4) self.remove_preamble_cell(nb) # clean up all outputs for cell in nb["cells"]: if "outputs" in cell: cell["outputs"] = [] if "execution_count" in cell: cell["execution_count"] = None nbformat.write(nb, self.local_path) def insert_preamble_cell(self, preamble, notebook): import nbformat preamble_cell = nbformat.v4.new_code_cell(preamble) preamble_cell["metadata"]["tags"] = ["snakemake-job-properties"] notebook["cells"].insert(0, preamble_cell) def remove_preamble_cell(self, notebook): preambles = [ i for i, cell in enumerate(notebook["cells"]) if "snakemake-job-properties" in cell["metadata"].get("tags", []) ] if len(preambles) > 1: raise WorkflowError( "More than one snakemake preamble cell found in notebook. " "Please clean up the notebook first, by removing all or all but one of them." ) elif len(preambles) == 1: preamble = preambles[0] # remove old preamble del notebook["cells"][preamble] @abstractmethod def get_language_name(self): ... @abstractmethod def get_interpreter_exec(self): ...
[docs] class PythonJupyterNotebook(JupyterNotebook): def get_preamble(self): preamble_addendum = f"import os; os.chdir(r'{os.getcwd()}');" return PythonScript.generate_preamble( self.path, self.cache_path, self.source, self.basedir, self.input, self.output, self.params, self.wildcards, self.threads, self.resources, self.log, self.config, self.rulename, self.conda_env, self.container_img, self.singularity_args, self.env_modules, self.bench_record, self.jobid, self.bench_iteration, self.cleanup_scripts, self.shadow_dir, self.is_local, preamble_addendum=preamble_addendum, ) def get_language_name(self): return "python" def get_interpreter_exec(self): return "python"
[docs] class RJupyterNotebook(JupyterNotebook): def get_preamble(self): preamble_addendum = f"setwd('{os.getcwd()}');" return RScript.generate_preamble( self.path, self.source, self.basedir, self.input, self.output, self.params, self.wildcards, self.threads, self.resources, self.log, self.config, self.rulename, self.conda_env, self.container_img, self.singularity_args, self.env_modules, self.bench_record, self.jobid, self.bench_iteration, self.cleanup_scripts, self.shadow_dir, preamble_addendum=preamble_addendum, ) def get_language_name(self): return "r" def get_interpreter_exec(self): return "RScript"
[docs] def get_exec_class(language): exec_class = { "jupyter_python": PythonJupyterNotebook, "jupyter_r": RJupyterNotebook, }.get(language, None) if exec_class is None: raise ValueError("Unsupported notebook: Expecting Jupyter Notebook (.ipynb).") return exec_class
[docs] def notebook( path, basedir, input, output, params, wildcards, threads, resources, log, config, rulename, conda_env, conda_base_path, container_img, singularity_args, env_modules, bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, edit, sourcecache_path, runtime_sourcecache_path, ): """ Load a script from the given basedir + path and execute it. """ draft = False path = format(path, wildcards=wildcards, params=params) if edit is not None: if is_local_file(path): if not os.path.isabs(path): local_path = os.path.join(basedir, path) else: local_path = path if not os.path.exists(local_path): # draft the notebook, it does not exist yet language = None draft = True path = f"file://{os.path.abspath(local_path)}" if path.endswith(".py.ipynb"): language = "jupyter_python" elif path.endswith(".r.ipynb"): language = "jupyter_r" else: raise WorkflowError( "Notebook to edit has to end on .py.ipynb or .r.ipynb in order " "to decide which programming language shall be used." ) else: raise WorkflowError( "Notebook {} is not local, but edit mode is only allowed for " "local notebooks.".format(path) ) if not draft: path, source, language, is_local, cache_path = get_source( path, SourceCache(sourcecache_path, runtime_sourcecache_path), basedir, wildcards, params, ) else: source = None cache_path = None is_local = True path = infer_source_file(path) exec_class = get_exec_class(language) executor = exec_class( path, cache_path, source, basedir, input, output, params, wildcards, threads, resources, log, config, rulename, conda_env, conda_base_path, container_img, singularity_args, env_modules, bench_record, jobid, bench_iteration, cleanup_scripts, shadow_dir, is_local, ) if edit is None: executor.evaluate(edit=edit) elif edit.draft_only: executor.draft() msg = f"Generated skeleton notebook:\n{path} " if conda_env and not container_img: msg += ( "\n\nEditing with VSCode:\nOpen notebook, run command 'Select notebook kernel' (Ctrl+Shift+P or Cmd+Shift+P), and choose:" "\n{}\n".format( str(Path(conda_env) / "bin" / executor.get_interpreter_exec()) ) ) msg += ( "\nEditing with Jupyter CLI:" "\nconda activate {}\njupyter notebook {}\n".format(conda_env, path) ) elif draft: executor.draft_and_edit(listen=edit) else: executor.evaluate(edit=edit)