Source code for snakemake.logging

from __future__ import annotations

__author__ = "Johannes Köster"
__copyright__ = "Copyright 2022, Johannes Köster"
__email__ = "johannes.koester@uni-due.de"
__license__ = "MIT"


import logging
import logging.handlers
import platform
import time
import datetime
import sys
import os
import json
import threading
from queue import Queue
from functools import partial
from typing import TYPE_CHECKING
import textwrap
from typing import List, Optional
from snakemake_interface_logger_plugins.base import LogHandlerBase
from snakemake_interface_logger_plugins.settings import OutputSettingsLoggerInterface
from snakemake_interface_logger_plugins.common import LogEvent

if TYPE_CHECKING:
    from snakemake_interface_executor_plugins.settings import ExecMode
    from snakemake.settings.enums import Quietness


[docs] def timestamp(): """Helper method to format the timestamp.""" return f"[{time.asctime()}]"
[docs] def show_logs(logs): """Helper method to show logs.""" for f in logs: try: with open(f, "r") as log_file: content = log_file.read() except FileNotFoundError: yield f"Logfile {f} not found." return except UnicodeDecodeError: yield f"Logfile {f} is not a text file." return lines = content.splitlines() logfile_header = f"Logfile {f}:" if not lines: logfile_header += " empty file" yield logfile_header return yield logfile_header max_len = min(max(max(len(line) for line in lines), len(logfile_header)), 80) yield "=" * max_len yield from lines yield "=" * max_len
[docs] def format_dict(dict_like, omit_keys=None, omit_values=None): from snakemake.io import Namedlist omit_keys = omit_keys or [] omit_values = omit_values or [] if isinstance(dict_like, (Namedlist, dict)): items = dict_like.items() else: raise ValueError( "bug: format_dict applied to something neither a dict nor a Namedlist" ) return ", ".join( f"{name}={value}" for name, value in items if name not in omit_keys and value not in omit_values )
format_resources = partial(format_dict, omit_keys={"_cores", "_nodes"}) format_wildcards = format_dict
[docs] def format_resource_names(resources, omit_resources="_cores _nodes".split()): return ", ".join(name for name in resources if name not in omit_resources)
[docs] def format_percentage(done, total): """Format percentage from given fraction while avoiding superfluous precision.""" if done == total: return "100%" if done == 0: return "0%" precision = 0 fraction = done / total fmt_precision = "{{:.{}%}}".format def fmt(fraction): return fmt_precision(precision).format(fraction) while fmt(fraction) == "100%" or fmt(fraction) == "0%": precision += 1 return fmt(fraction)
[docs] def get_event_level(record: logging.LogRecord) -> tuple[LogEvent, str]: """ Gets snakemake log level from a log record. If there is no snakemake log level, returns the log record's level name. Args: record (logging.LogRecord) Returns: tuple[LogEvent, str] """ event = record.__dict__.get("event", None) return (event, record.levelname)
[docs] def is_quiet_about(quiet: "Quietness", msg_type: str): from snakemake.settings.enums import Quietness parsed = Quietness.parse_choice(msg_type) return Quietness.ALL in quiet or parsed in quiet
[docs] class DefaultFormatter(logging.Formatter): def __init__( self, quiet: "Quietness", show_failed_logs: bool = False, printshellcmds: bool = False, ): self.quiet = set() if quiet is None else quiet self.show_failed_logs = show_failed_logs self.printshellcmds = printshellcmds self.last_msg_was_job_info = False
[docs] def format(self, record): """ Override format method to format Snakemake-specific log messages. """ event, level = get_event_level(record) record_dict = record.__dict__.copy() def default_formatter(rd): return rd["msg"] formatters = { None: default_formatter, LogEvent.JOB_INFO: self.format_job_info, LogEvent.JOB_ERROR: self.format_job_error, LogEvent.JOB_FINISHED: self.format_job_finished, LogEvent.GROUP_INFO: self.format_group_info, LogEvent.GROUP_ERROR: self.format_group_error, LogEvent.SHELLCMD: self.format_shellcmd, LogEvent.RUN_INFO: self.format_run_info, LogEvent.DEBUG_DAG: self.format_dag_debug, LogEvent.PROGRESS: self.format_progress, } formatter = formatters.get(event, default_formatter) return formatter(record_dict)
[docs] def format_info(self, msg): """ Format 'info' level messages. """ output = [] # Check if 'indent' is specified indent = " " if msg.get("indent", False) else "" # Split the message by lines in case it's multiline lines = msg["msg"].split("\n") # Apply indentation to each line for line in lines: output.append(f"{indent}{line}") # Return the formatted message as a single string with newlines return "\n".join(output)
[docs] def format_run_info(self, msg): """Format the run_info log messages.""" return msg["msg"] # Log the message directly
[docs] def format_host(self, msg): """Format for host log.""" return f"host: {platform.node()}"
[docs] def format_job_info(self, msg): """Format for job_info log.""" output = [] output.append(timestamp()) if msg["rule_msg"]: output.append(f"Job {msg['jobid']}: {msg['rule_msg']}") if not is_quiet_about(self.quiet, "reason"): output.append(f"Reason: {msg['reason']}") else: output.append("\n".join(self._format_job_info(msg))) return "\n".join(output)
[docs] def format_group_info(self, msg): """Format for group_info log.""" msg = f"{timestamp()} {msg['msg']}" return msg
[docs] def format_job_error(self, msg): """Format for job_error log.""" output = [] output.append(timestamp()) output.append("\n".join(self._format_job_error(msg))) return "\n".join(output)
[docs] def format_group_error(self, msg): """Format for group_error log.""" output = [] output.append(timestamp()) output.append("\n".join(self._format_group_error(msg))) return "\n".join(output)
[docs] def format_progress(self, msg): """Format for progress log.""" done = msg["done"] total = msg["total"] return f"{done} of {total} steps ({format_percentage(done, total)}) done"
[docs] def format_job_finished(self, msg): """Format for job_finished log.""" return f"{timestamp()}\n{msg['msg']}"
[docs] def format_shellcmd(self, msg): """Format for shellcmd log.""" if self.printshellcmds: return msg["msg"] return ""
[docs] def format_d3dag(self, msg): """Format for d3dag log.""" return json.dumps({"nodes": msg["nodes"], "links": msg["edges"]})
[docs] def format_dag_debug(self, msg): """Format for dag_debug log.""" output = [] if "file" in msg: output.append( f"file {msg['file']}:\n {msg['msg']}\n{textwrap.indent(str(msg['exception']), ' ')}" ) else: job = msg["job"] output.append( f"{msg['status']} job {job.rule.name}\n wildcards: {format_wildcards(job.wildcards)}" ) return "\n".join(output)
def _format_job_info(self, msg): """Helper method to format job info details.""" def format_item(item, omit=None, valueformat=str): value = msg[item] if value != omit: return f" {item}: {valueformat(value)}" output = [ f"{'local' if msg['local'] else ''}{'checkpoint' if msg['is_checkpoint'] else 'rule'} {msg['rule_name']}:" ] for item in ["input", "output", "log"]: fmt = format_item(item, omit=[], valueformat=", ".join) if fmt: output.append(fmt) singleitems = ["jobid", "benchmark"] if not is_quiet_about(self.quiet, "reason"): singleitems.append("reason") for item in singleitems: fmt = format_item(item, omit=None) if fmt: output.append(fmt) wildcards = format_wildcards(msg["wildcards"]) if wildcards: output.append(f" wildcards: {wildcards}") for item, omit in zip("priority threads".split(), [0, 1]): fmt = format_item(item, omit=omit) if fmt: output.append(fmt) resources = format_resources(msg["resources"]) if resources: output.append(f" resources: {resources}") return output def _format_job_error(self, msg): """Helper method to format job error details.""" output = [f"Error in rule {msg['rule_name']}:"] if msg["msg"]: output.append(f" message: {msg['rule_msg']}") output.append(f" jobid: {msg['jobid']}") if msg["input"]: output.append(f" input: {', '.join(msg['input'])}") if msg["output"]: output.append(f" output: {', '.join(msg['output'])}") if msg["log"]: output.append( f" log: {', '.join(msg['log'])} (check log file(s) for error details)" ) if msg["conda_env"]: output.append(f" conda-env: {msg['conda_env']}") if msg["shellcmd"]: output.append( f" shell:\n {msg['shellcmd']}\n (command exited with non-zero exit code)" ) for item in msg["aux"].items(): output.append(f" {item[0]}: {item[1]}") if self.show_failed_logs and msg["log"]: output.extend(show_logs(msg["log"])) return output def _format_group_error(self, msg): """Helper method to format group error details.""" output = [] if msg["msg"]: output.append(f" message: {msg['msg']}") if msg["aux_logs"]: output.append( f" log: {', '.join(msg['aux_logs'])} (check log file(s) for error details)" ) output.append(" jobs:") for info in msg["job_error_info"]: output.append(f" rule {info['name']}:") output.append(f" jobid: {info['jobid']}") if info["output"]: output.append(f" output: {', '.join(info['output'])}") if info["log"]: output.append( f" log: {', '.join(info['log'])} (check log file(s) for error details)" ) logs = msg["aux_logs"] + [ f for info in msg["job_error_info"] for f in info["log"] ] if self.show_failed_logs and logs: output.extend(show_logs(logs)) return output
[docs] class DefaultFilter: def __init__(self, quiet, debug_dag, dryrun) -> None: if quiet is None: quiet = set() self.quiet = quiet self.debug_dag = debug_dag self.dryrun = dryrun
[docs] def filter(self, record): from snakemake.settings.enums import Quietness event, level = get_event_level(record) if self.dryrun and level == "run_info": return True if Quietness.ALL in self.quiet and not self.dryrun: return False quietness_map = { LogEvent.JOB_INFO: Quietness.RULES, LogEvent.GROUP_INFO: Quietness.RULES, LogEvent.JOB_ERROR: Quietness.RULES, LogEvent.GROUP_ERROR: Quietness.RULES, LogEvent.PROGRESS: Quietness.PROGRESS, LogEvent.SHELLCMD: Quietness.PROGRESS, LogEvent.JOB_FINISHED: Quietness.PROGRESS, LogEvent.RESOURCES_INFO: Quietness.PROGRESS, LogEvent.RUN_INFO: Quietness.PROGRESS, } # Check quietness for specific levels if event in quietness_map: if quietness_map[event] in self.quiet: return False # Handle dag_debug specifically if event == LogEvent.DEBUG_DAG and not self.debug_dag: return False return True
[docs] class ColorizingTextHandler(logging.StreamHandler): """ Custom handler that combines colorization and Snakemake-specific formatting. """ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) RESET_SEQ = "\033[0m" COLOR_SEQ = "\033[%dm" BOLD_SEQ = "\033[1m" colors = { "WARNING": YELLOW, "INFO": GREEN, "DEBUG": BLUE, "CRITICAL": MAGENTA, "ERROR": RED, } yellow_info_events = [ LogEvent.RUN_INFO, LogEvent.SHELLCMD, LogEvent.JOB_STARTED, None, # To mimic old coloring where log.info was mapped to log.warn ] def __init__( self, nocolor=False, stream=sys.stderr, mode=None, formatter: Optional[logging.Formatter] = None, filter: Optional[logging.Filter] = None, ): super().__init__(stream=stream) self.last_msg_was_job_info = False self._output_lock = threading.Lock() self.nocolor = nocolor or not self.can_color_tty(mode) self.mode = mode if formatter: self.setFormatter(formatter) if filter: self.addFilter(filter)
[docs] def can_color_tty(self, mode): """ Colors are supported when: 1. Terminal is not "dumb" 2. Running in subprocess mode 3. Using a TTY on non-Windows systems """ from snakemake_interface_executor_plugins.settings import ExecMode # Case 1: Check if terminal is "dumb" if os.environ.get("TERM") == "dumb": return False # Case 2: Always support colors in subprocess mode if mode == ExecMode.SUBPROCESS: return True # Case 3: Support colors on TTY except for Windows is_windows = platform.system() == "Windows" has_tty = self.is_tty if has_tty and not is_windows: return True return False
@property def is_tty(self): isatty = getattr(self.stream, "isatty", None) return isatty and isatty()
[docs] def emit(self, record): """ Emit a log message with custom formatting and color. """ with self._output_lock: try: event, level = get_event_level(record) if event == LogEvent.JOB_INFO: if not self.last_msg_was_job_info: self.stream.write( "\n" ) # Add a blank line before a new job_info message self.last_msg_was_job_info = True else: # Reset flag if the message is not a 'job_info' self.last_msg_was_job_info = False formatted_message = self.format(record) if formatted_message == "None": return # Apply color to the formatted message self.stream.write(self.decorate(record, formatted_message)) self.stream.write(getattr(self, "terminator", "\n")) self.flush() except BrokenPipeError: raise except (KeyboardInterrupt, SystemExit): pass # Ignore exceptions for these cases, all errors have been handled before. except Exception: self.handleError(record)
[docs] def decorate(self, record, message): """ Add color to the log message based on its level. """ message = [message] event, level = get_event_level(record) if not self.nocolor and record.levelname in self.colors: if level == "INFO" and event in self.yellow_info_events: color = self.colors["WARNING"] else: color = self.colors[record.levelname] message.insert(0, self.COLOR_SEQ % (30 + color)) message.append(self.RESET_SEQ) return "".join(message)
[docs] class LoggerManager: def __init__(self, logger: logging.Logger): self.logger = logger self.initialized = False self.queue_listener = None self.mode = None self.needs_rulegraph = False self.logfile_handlers = {} self.settings: OutputSettingsLoggerInterface = None
[docs] def setup( self, mode: "ExecMode", handlers: List[LogHandlerBase], settings: OutputSettingsLoggerInterface, ): from snakemake_interface_executor_plugins.settings import ExecMode self.mode = mode self.settings = settings self.initialized = True stream_handlers = [] other_handlers = [] if self.mode == ExecMode.SUBPROCESS: handler = self._default_streamhandler() handler.setLevel(logging.ERROR) stream_handlers.append(handler) elif self.mode == ExecMode.REMOTE: stream_handlers.append(self._default_streamhandler()) elif handlers: for handler in handlers: if handler.needs_rulegraph: self.needs_rulegraph = True configured_handler = self._configure_plugin_handler(handler) if configured_handler.writes_to_file: self.logfile_handlers[configured_handler] = ( configured_handler.baseFilename ) elif configured_handler.writes_to_stream: stream_handlers.append(configured_handler) else: other_handlers.append(configured_handler) if len(stream_handlers) > 1: raise ValueError("More than 1 stream log handler specified!") elif len(stream_handlers) == 0: # we dont have any stream_handlers from plugin(s) so give us the default one stream_handlers.append(self._default_streamhandler()) all_handlers = ( stream_handlers + other_handlers + list(self.logfile_handlers.keys()) ) q = Queue(-1) self.queue_listener = logging.handlers.QueueListener( q, *all_handlers, respect_handler_level=True, ) self.queue_listener.start() self.logger.setLevel(logging.DEBUG if settings.verbose else logging.INFO) self.logger.addHandler(logging.handlers.QueueHandler(q))
def _configure_plugin_handler(self, plugin): if not plugin.has_filter: plugin.addFilter(self._default_filter()) if not plugin.has_formatter: plugin.setFormatter(self._default_formatter()) return plugin def _default_filter(self): return DefaultFilter( self.settings.quiet, self.settings.debug_dag, self.settings.dryrun ) def _default_formatter(self): return DefaultFormatter( self.settings.quiet, self.settings.show_failed_logs, self.settings.printshellcmds, ) def _default_filehandler(self, logfile): logfile_handler = logging.FileHandler(logfile) logfile_handler.setFormatter(self._default_formatter()) logfile_handler.addFilter(self._default_filter()) logfile_handler.setLevel( logging.DEBUG if self.settings.verbose else logging.INFO ) logfile_handler.name = "DefaultLogFileHandler" return logfile_handler def _default_streamhandler(self): stream_handler = ColorizingTextHandler( nocolor=self.settings.nocolor, stream=sys.stdout if self.settings.stdout else sys.stderr, mode=self.mode, ) stream_handler.addFilter(self._default_filter()) stream_handler.setFormatter(self._default_formatter()) stream_handler.name = "DefaultStreamHandler" return stream_handler
[docs] def get_logfile(self): return self.logfile_handlers.values()
[docs] def logfile_hint(self): from snakemake_interface_executor_plugins.settings import ExecMode """Log the logfile location if applicable.""" logfiles = self.logfile_handlers.values() if self.mode == ExecMode.DEFAULT and not self.settings.dryrun and logfiles: log_paths = ", ".join([os.path.abspath(p) for p in logfiles]) self.logger.info(f"Complete log(s): {log_paths}") return logfiles
[docs] def cleanup_logfile(self): from snakemake_interface_executor_plugins.settings import ExecMode if self.mode == ExecMode.DEFAULT: for handler in self.logfile_handlers.keys(): handler.close()
[docs] def setup_logfile(self): from snakemake_interface_executor_plugins.settings import ExecMode if self.mode == ExecMode.DEFAULT and not self.settings.dryrun: try: os.makedirs(os.path.join(".snakemake", "log"), exist_ok=True) logfile = os.path.abspath( os.path.join( ".snakemake", "log", datetime.datetime.now().isoformat().replace(":", "") + ".snakemake.log", ) ) handler = self._default_filehandler(logfile) self.logfile_handlers[handler] = logfile except OSError as e: self.logger.error(f"Failed to setup log file: {e}")
[docs] def stop(self): if self.queue_listener is not None and self.queue_listener._thread is not None: self.queue_listener.stop()
# Global logger instance logger = logging.getLogger(__name__) logger_manager = LoggerManager(logger)