Source code for snakemake.executors.dryrun

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2023, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

from snakemake_interface_executor_plugins.executors.base import AbstractExecutor
from snakemake_interface_executor_plugins.jobs import (
    JobExecutorInterface,
)
from snakemake_interface_executor_plugins.settings import CommonSettings
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake.common import async_run

from snakemake.logging import logger


common_settings = CommonSettings(
    non_local_exec=False,
    dryrun_exec=True,
    implies_no_shared_fs=False,
    job_deploy_sources=False,
    pass_envvar_declarations_to_cmd=False,
    auto_deploy_default_storage_provider=False,
)


[docs] class Executor(AbstractExecutor): def run_job( self, job: JobExecutorInterface, ): job_info = SubmittedJobInfo(job=job) self.report_job_submission(job_info) self.report_job_success(job_info) def get_exec_mode(self): raise NotImplementedError() def printjob(self, job: JobExecutorInterface): super().printjob(job) if job.is_group(): for j in job.jobs: self.printcache(j) else: self.printcache(job) def printcache(self, job: JobExecutorInterface): cache_mode = self.workflow.get_cache_mode(job.rule) if cache_mode: if async_run(self.workflow.output_file_cache.exists(job, cache_mode)): logger.info( "Output file {} will be obtained from global between-workflow cache.".format( job.output[0] ) ) else: logger.info( "Output file {} will be written to global between-workflow cache.".format( job.output[0] ) ) def cancel(self): # nothing to do pass def shutdown(self): # nothing to do pass def handle_job_success(self, job: JobExecutorInterface): # nothing to do pass def handle_job_error(self, job: JobExecutorInterface): # nothing to do pass @property def cores(self): return self.workflow.resource_settings.cores