Source code for snakemake.executors.touch

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"

import time

from snakemake_interface_executor_plugins.executors.real import RealExecutor
from snakemake_interface_executor_plugins.dag import DAGExecutorInterface
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.jobs import (
    JobExecutorInterface,
)
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.settings import CommonSettings
from snakemake.common import async_run

from snakemake.exceptions import print_exception


common_settings = CommonSettings(
    non_local_exec=False,
    implies_no_shared_fs=False,
    job_deploy_sources=False,
    touch_exec=True,
    pass_envvar_declarations_to_cmd=False,
    auto_deploy_default_storage_provider=False,
)


[docs] class Executor(RealExecutor): def run_job( self, job: JobExecutorInterface, ): job_info = SubmittedJobInfo(job=job) try: time.sleep(0.1) if job.output: async def touch(): for f in job.output: if f.is_storage and await f.exists_in_storage(): await f.touch() elif await f.exists_local(): f.touch() async_run(touch()) self.report_job_submission(job_info) self.report_job_success(job_info) except OSError as ex: print_exception(ex, self.workflow.linemaps) self.report_job_error(job_info) def get_exec_mode(self): raise NotImplementedError() def handle_job_success(self, job: JobExecutorInterface): super().handle_job_success(job) def cancel(self): # nothing to do pass def shutdown(self): # nothing to do pass def get_python_executable(self): raise NotImplementedError() @property def cores(self): return self.workflow.resource_settings.cores