Source code for sccross.utils

r"""
Miscellaneous utilities
"""

import os
import logging
import signal
import subprocess
import sys
from collections import defaultdict
from multiprocessing import Process
from typing import Any, List, Mapping, Optional, Iterable, TypeVar, Union
import numpy as np
import pandas as pd
import torch
import scipy.sparse
import numbers
import anndata
import h5py



Array = Union[np.ndarray, scipy.sparse.spmatrix]
BackedArray = Union[h5py.Dataset, anndata._core.sparse_dataset.SparseDataset]
AnyArray = Union[Array, BackedArray]
ArrayOrScalar = Union[np.ndarray, numbers.Number]
Kws = Optional[Mapping[str, Any]]
RandomState = Optional[Union[np.random.RandomState, int]]

T = TypeVar("T")  # Generic type var


EPS = 1e-7


#------------------------------ Numeric functions ------------------------------

[docs]def prod(x: Iterable) -> Any: r""" Product of elements Parameters ---------- x Input elements Returns ------- prod Product Note ---- For compatibility with Python<=3.7 """ try: from math import prod # pylint: disable=redefined-outer-name return prod(x) except ImportError: ans = 1 for item in x: ans = ans * item return ans
[docs]def sigmoid(x: np.ndarray) -> np.ndarray: r""" The sigmoid function in numpy Parameters ---------- x Input Returns ------- s Sigmoid(x) """ return 1 / (1 + np.exp(-x))
#------------------------------ Arrays & Matrices ------------------------------
[docs]def densify(arr: Array) -> np.ndarray: r""" Convert a matrix to dense regardless of original type. Parameters ---------- arr Input array (either sparse or dense) Returns ------- densified Densified array """ if scipy.sparse.issparse(arr): return arr.toarray() if isinstance(arr, np.ndarray): return arr return np.asarray(arr)
[docs]def col_var( X: Array, Y: Optional[Array] = None, bias: bool = False ) -> np.ndarray: r""" Column-wise variance (sparse friendly) Parameters ---------- X First design matrix Y Second design matrix (optional) bias Whether to return unbiased or biased covariance estimation Returns ------- col_var Column-wise variance, if only X is given. Column-wise covariance, if both X and Y are given. """ Y = X if Y is None else Y if X.shape != Y.shape: raise ValueError("X and Y should have the same shape!") bias_scaling = 1 if bias else X.shape[0] / (X.shape[0] - 1) if scipy.sparse.issparse(X) or scipy.sparse.issparse(Y): if not scipy.sparse.issparse(X): X, Y = Y, X # does not affect trace return ( np.asarray((X.multiply(Y)).mean(axis=0)) - np.asarray(X.mean(axis=0)) * np.asarray(Y.mean(axis=0)) ).ravel() * bias_scaling return ( (X * Y).mean(axis=0) - X.mean(axis=0) * Y.mean(axis=0) ) * bias_scaling
[docs]def col_pcc(X: Array, Y: Array) -> np.ndarray: r""" Column-wise Pearson's correlation coefficient (sparse friendly) Parameters ---------- X First design matrix Y Second design matrix Returns ------- pcc Column-wise Pearson's correlation coefficients """ return col_var(X, Y) / np.sqrt(col_var(X) * col_var(Y))
[docs]def col_spr(X: Array, Y: Array) -> np.ndarray: r""" Column-wise Spearman's rank correlation Parameters ---------- X First design matrix Y Second design matrix Returns ------- spr Column-wise Spearman's rank correlations """ X = densify(X) X = np.array([ scipy.stats.rankdata(X[:, i]) for i in range(X.shape[1]) ]).T Y = densify(Y) Y = np.array([ scipy.stats.rankdata(Y[:, i]) for i in range(Y.shape[1]) ]).T return col_pcc(X, Y)
[docs]def cov_mat( X: Array, Y: Optional[Array] = None, bias: bool = False ) -> np.ndarray: r""" Covariance matrix (sparse friendly) Parameters ---------- X First design matrix Y Second design matrix (optional) bias Whether to return unbiased or biased covariance estimation Returns ------- cov Covariance matrix, if only X is given. Cross-covariance matrix, if both X and Y are given. """ X_mean = X.mean(axis=0) if scipy.sparse.issparse(X) \ else X.mean(axis=0, keepdims=True) if Y is None: Y, Y_mean = X, X_mean else: if X.shape[0] != Y.shape[0]: raise ValueError("X and Y should have the same number of rows!") Y_mean = Y.mean(axis=0) if scipy.sparse.issparse(Y) \ else Y.mean(axis=0, keepdims=True) bias_scaling = 1 if bias else X.shape[0] / (X.shape[0] - 1) return np.asarray((X.T @ Y) / X.shape[0] - X_mean.T @ Y_mean) * bias_scaling
[docs]def pcc_mat( X: Array, Y: Optional[Array] = None ) -> np.ndarray: r""" Pearson's correlation coefficient (sparse friendly) Parameters ---------- X First design matrix Y Second design matrix (optional) Returns ------- pcc Pearson's correlation matrix among columns of X, if only X is given. Pearson's correlation matrix between columns of X and columns of Y, if both X and Y are given. """ X = X.astype(np.float64) Y = Y if Y is None else Y.astype(np.float64) X_std = np.sqrt(col_var(X))[np.newaxis, :] Y_std = X_std if Y is None else np.sqrt(col_var(Y))[np.newaxis, :] pcc = cov_mat(X, Y) / X_std.T / Y_std if Y is None: assert (pcc - pcc.T).max() < EPS pcc = (pcc + pcc.T) / 2 # Remove small floating point errors assert np.abs(np.diag(pcc) - 1).max() < EPS np.fill_diagonal(pcc, 1) # Remove small floating point errors overshoot_mask = pcc > 1 if np.any(overshoot_mask): assert (pcc[overshoot_mask] - 1).max() < EPS pcc[overshoot_mask] = 1 # Remove small floating point errors return pcc
[docs]def spr_mat( X: Array, Y: Optional[Array] = None ) -> np.ndarray: r""" Spearman's rank correlation Parameters ---------- X First design matrix Y Second design matrix (optional) Returns ------- spr Spearman's correlation matrix among columns of X, if only X is given. Spearman's correlation matrix between columns of X and columns of Y, if both X and Y are given. """ X = densify(X) X = np.array([ scipy.stats.rankdata(X[:, i]) for i in range(X.shape[1]) ]).T if Y is not None: Y = densify(Y) Y = np.array([ scipy.stats.rankdata(Y[:, i]) for i in range(Y.shape[1]) ]).T return pcc_mat(X, Y)
[docs]def tfidf(X: Array) -> Array: r""" TF-IDF normalization (following the Seurat v3 approach) Parameters ---------- X Input matrix Returns ------- X_tfidf TF-IDF normalized matrix """ idf = X.shape[0] / X.sum(axis=0) if scipy.sparse.issparse(X): tf = X.multiply(1 / X.sum(axis=1)) return tf.multiply(idf) else: tf = X / X.sum(axis=1, keepdims=True) return tf * idf
[docs]def prob_or(probs: List[float]) -> float: r""" Combined multiple probabilities in a logical OR manner. Parameters ---------- probs Array of probabilities Returns ------- prob Combined probability """ return 1 - (1 - np.asarray(probs)).prod()
[docs]def all_counts(x: Array) -> bool: r""" Check whether an array contains all counts Parameters ---------- x Array to check Returns ------- is_counts Whether the array contains all counts """ if scipy.sparse.issparse(x): x = x.tocsr().data if x.min() < 0: return False return np.allclose(x, x.astype(int))
#------------------------------ Global containers ------------------------------ processes: Mapping[int, Mapping[int, Process]] = defaultdict(dict) # id -> pid -> process #-------------------------------- Meta classes ---------------------------------
[docs]class SingletonMeta(type): r""" Ensure singletons via a meta class """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls]
#--------------------------------- Log manager --------------------------------- class _CriticalFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: return record.levelno >= logging.WARNING class _NonCriticalFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: return record.levelno < logging.WARNING
[docs]class LogManager(metaclass=SingletonMeta): r""" Manage loggers used in the package """ def __init__(self) -> None: self._loggers = {} self._log_file = None self._console_log_level = logging.INFO self._file_log_level = logging.DEBUG self._file_fmt = \ "%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s: %(message)s" self._console_fmt = \ "[%(levelname)s] %(name)s: %(message)s" self._date_fmt = "%Y-%m-%d %H:%M:%S" @property def log_file(self) -> str: r""" Configure log file """ return self._log_file @property def file_log_level(self) -> int: r""" Configure logging level in the log file """ return self._file_log_level @property def console_log_level(self) -> int: r""" Configure logging level printed in the console """ return self._console_log_level def _create_file_handler(self) -> logging.FileHandler: file_handler = logging.FileHandler(self.log_file) file_handler.setLevel(self.file_log_level) file_handler.setFormatter(logging.Formatter( fmt=self._file_fmt, datefmt=self._date_fmt)) return file_handler def _create_console_handler(self, critical: bool) -> logging.StreamHandler: if critical: console_handler = logging.StreamHandler(sys.stderr) console_handler.addFilter(_CriticalFilter()) else: console_handler = logging.StreamHandler(sys.stdout) console_handler.addFilter(_NonCriticalFilter()) console_handler.setLevel(self.console_log_level) console_handler.setFormatter(logging.Formatter(fmt=self._console_fmt)) return console_handler
[docs] def get_logger(self, name: str) -> logging.Logger: r""" Get a logger by name """ if name in self._loggers: return self._loggers[name] new_logger = logging.getLogger(name) new_logger.setLevel(logging.DEBUG) # lowest level new_logger.addHandler(self._create_console_handler(True)) new_logger.addHandler(self._create_console_handler(False)) if self.log_file: new_logger.addHandler(self._create_file_handler()) self._loggers[name] = new_logger return new_logger
@log_file.setter def log_file(self, file_name: os.PathLike) -> None: self._log_file = file_name for logger in self._loggers.values(): for idx, handler in enumerate(logger.handlers): if isinstance(handler, logging.FileHandler): logger.handlers[idx].close() if self.log_file: logger.handlers[idx] = self._create_file_handler() else: del logger.handlers[idx] break else: if file_name: logger.addHandler(self._create_file_handler()) @file_log_level.setter def file_log_level(self, log_level: int) -> None: self._file_log_level = log_level for logger in self._loggers.values(): for handler in logger.handlers: if isinstance(handler, logging.FileHandler): handler.setLevel(self.file_log_level) break @console_log_level.setter def console_log_level(self, log_level: int) -> None: self._console_log_level = log_level for logger in self._loggers.values(): for handler in logger.handlers: if type(handler) is logging.StreamHandler: # pylint: disable=unidiomatic-typecheck handler.setLevel(self.console_log_level)
log = LogManager()
[docs]def logged(obj: T) -> T: r""" Add logger as an attribute """ obj.logger = log.get_logger(obj.__name__) return obj
#---------------------------- Configuration Manager ----------------------------
[docs]@logged class ConfigManager(metaclass=SingletonMeta): r""" Global configurations """ def __init__(self) -> None: self.TMP_PREFIX = "CROSSTMP" self.ANNDATA_KEY = "__sccross__" self.CPU_ONLY = False self.CUDNN_MODE = "repeatability" self.MASKED_GPUS = [] self.ARRAY_SHUFFLE_NUM_WORKERS = 0 self.GRAPH_SHUFFLE_NUM_WORKERS = 1 self.FORCE_TERMINATE_WORKER_PATIENCE = 60 self.DATALOADER_NUM_WORKERS = 0 self.DATALOADER_FETCHES_PER_WORKER = 4 self.DATALOADER_PIN_MEMORY = True self.CHECKPOINT_SAVE_INTERVAL = 10 self.CHECKPOINT_SAVE_NUMBERS = 3 self.PRINT_LOSS_INTERVAL = 10 self.TENSORBOARD_FLUSH_SECS = 5 self.ALLOW_TRAINING_INTERRUPTION = True @property def TMP_PREFIX(self) -> str: r""" Prefix of temporary files and directories created. Default values is ``"CROSSTMP"``. """ return self._TMP_PREFIX @TMP_PREFIX.setter def TMP_PREFIX(self, tmp_prefix: str) -> None: self._TMP_PREFIX = tmp_prefix @property def ANNDATA_KEY(self) -> str: r""" Key in ``adata.uns`` for storing dataset configurations. Default value is ``"__sccross__"`` """ return self._ANNDATA_KEY @ANNDATA_KEY.setter def ANNDATA_KEY(self, anndata_key: str) -> None: self._ANNDATA_KEY = anndata_key @property def CPU_ONLY(self) -> bool: r""" Whether computation should use only CPUs. Default value is ``False``. """ return self._CPU_ONLY @CPU_ONLY.setter def CPU_ONLY(self, cpu_only: bool) -> None: self._CPU_ONLY = cpu_only if self._CPU_ONLY and self._DATALOADER_NUM_WORKERS: self.logger.warning( "It is recommended to set `DATALOADER_NUM_WORKERS` to 0 " "when using CPU_ONLY mode. Otherwise, deadlocks may happen " "occationally." ) @property def CUDNN_MODE(self) -> str: r""" CuDNN computation mode, should be one of {"repeatability", "performance"}. Default value is ``"repeatability"``. Note ---- As of now, due to the use of :meth:`torch.Tensor.scatter_add_` operation, the results are not completely reproducible even when ``CUDNN_MODE`` is set to ``"repeatability"``, if GPU is used as computation device. Exact repeatability can only be achieved on CPU. The situtation might change with new releases of :mod:`torch`. """ return self._CUDNN_MODE @CUDNN_MODE.setter def CUDNN_MODE(self, cudnn_mode: str) -> None: if cudnn_mode not in ("repeatability", "performance"): raise ValueError("Invalid mode!") self._CUDNN_MODE = cudnn_mode torch.backends.cudnn.deterministic = self._CUDNN_MODE == "repeatability" torch.backends.cudnn.benchmark = self._CUDNN_MODE == "performance" @property def MASKED_GPUS(self) -> List[int]: r""" A list of GPUs that should not be used when selecting computation device. This must be set before initializing any model, otherwise would be ineffective. Default value is ``[]``. """ return self._MASKED_GPUS @MASKED_GPUS.setter def MASKED_GPUS(self, masked_gpus: List[int]) -> None: if masked_gpus: import pynvml pynvml.nvmlInit() device_count = pynvml.nvmlDeviceGetCount() for item in masked_gpus: if item >= device_count: raise ValueError(f"GPU device \"{item}\" is non-existent!") self._MASKED_GPUS = masked_gpus @property def ARRAY_SHUFFLE_NUM_WORKERS(self) -> int: r""" Number of background workers for array data shuffling. Default value is ``0``. """ return self._ARRAY_SHUFFLE_NUM_WORKERS @ARRAY_SHUFFLE_NUM_WORKERS.setter def ARRAY_SHUFFLE_NUM_WORKERS(self, array_shuffle_num_workers: int) -> None: self._ARRAY_SHUFFLE_NUM_WORKERS = array_shuffle_num_workers @property def GRAPH_SHUFFLE_NUM_WORKERS(self) -> int: r""" Number of background workers for graph data shuffling. Default value is ``1``. """ return self._GRAPH_SHUFFLE_NUM_WORKERS @GRAPH_SHUFFLE_NUM_WORKERS.setter def GRAPH_SHUFFLE_NUM_WORKERS(self, graph_shuffle_num_workers: int) -> None: self._GRAPH_SHUFFLE_NUM_WORKERS = graph_shuffle_num_workers @property def FORCE_TERMINATE_WORKER_PATIENCE(self) -> int: r""" Seconds to wait before force terminating unresponsive workers. Default value is ``60``. """ return self._FORCE_TERMINATE_WORKER_PATIENCE @FORCE_TERMINATE_WORKER_PATIENCE.setter def FORCE_TERMINATE_WORKER_PATIENCE(self, force_terminate_worker_patience: int) -> None: self._FORCE_TERMINATE_WORKER_PATIENCE = force_terminate_worker_patience @property def DATALOADER_NUM_WORKERS(self) -> int: r""" Number of worker processes to use in data loader. Default value is ``0``. """ return self._DATALOADER_NUM_WORKERS @DATALOADER_NUM_WORKERS.setter def DATALOADER_NUM_WORKERS(self, dataloader_num_workers: int) -> None: if dataloader_num_workers > 8: self.logger.warning( "Worker number 1-8 is generally sufficient, " "too many workers might have negative impact on speed." ) self._DATALOADER_NUM_WORKERS = dataloader_num_workers @property def DATALOADER_FETCHES_PER_WORKER(self) -> int: r""" Number of fetches per worker per batch to use in data loader. Default value is ``4``. """ return self._DATALOADER_FETCHES_PER_WORKER @DATALOADER_FETCHES_PER_WORKER.setter def DATALOADER_FETCHES_PER_WORKER(self, dataloader_fetches_per_worker: int) -> None: self._DATALOADER_FETCHES_PER_WORKER = dataloader_fetches_per_worker @property def DATALOADER_FETCHES_PER_BATCH(self) -> int: r""" Number of fetches per batch in data loader (read-only). """ return max(1, self.DATALOADER_NUM_WORKERS) * self.DATALOADER_FETCHES_PER_WORKER @property def DATALOADER_PIN_MEMORY(self) -> bool: r""" Whether to use pin memory in data loader. Default value is ``True``. """ return self._DATALOADER_PIN_MEMORY @DATALOADER_PIN_MEMORY.setter def DATALOADER_PIN_MEMORY(self, dataloader_pin_memory: bool): self._DATALOADER_PIN_MEMORY = dataloader_pin_memory @property def CHECKPOINT_SAVE_INTERVAL(self) -> int: r""" Automatically save checkpoints every n epochs. Default value is ``10``. """ return self._CHECKPOINT_SAVE_INTERVAL @CHECKPOINT_SAVE_INTERVAL.setter def CHECKPOINT_SAVE_INTERVAL(self, checkpoint_save_interval: int) -> None: self._CHECKPOINT_SAVE_INTERVAL = checkpoint_save_interval @property def CHECKPOINT_SAVE_NUMBERS(self) -> int: r""" Maximal number of checkpoints to preserve at any point. Default value is ``3``. """ return self._CHECKPOINT_SAVE_NUMBERS @CHECKPOINT_SAVE_NUMBERS.setter def CHECKPOINT_SAVE_NUMBERS(self, checkpoint_save_numbers: int) -> None: self._CHECKPOINT_SAVE_NUMBERS = checkpoint_save_numbers @property def PRINT_LOSS_INTERVAL(self) -> int: r""" Print loss values every n epochs. Default value is ``10``. """ return self._PRINT_LOSS_INTERVAL @PRINT_LOSS_INTERVAL.setter def PRINT_LOSS_INTERVAL(self, print_loss_interval: int) -> None: self._PRINT_LOSS_INTERVAL = print_loss_interval @property def TENSORBOARD_FLUSH_SECS(self) -> int: r""" Flush tensorboard logs to file every n seconds. Default values is ``5``. """ return self._TENSORBOARD_FLUSH_SECS @TENSORBOARD_FLUSH_SECS.setter def TENSORBOARD_FLUSH_SECS(self, tensorboard_flush_secs: int) -> None: self._TENSORBOARD_FLUSH_SECS = tensorboard_flush_secs @property def ALLOW_TRAINING_INTERRUPTION(self) -> bool: r""" Allow interruption before model training converges. Default values is ``True``. """ return self._ALLOW_TRAINING_INTERRUPTION @ALLOW_TRAINING_INTERRUPTION.setter def ALLOW_TRAINING_INTERRUPTION(self, allow_training_interruption: bool) -> None: self._ALLOW_TRAINING_INTERRUPTION = allow_training_interruption
config = ConfigManager() #---------------------------- Interruption handling ----------------------------
[docs]@logged class DelayedKeyboardInterrupt: # pragma: no cover r""" Shield a code block from keyboard interruptions, delaying handling till the block is finished (adapted from `https://stackoverflow.com/a/21919644 <https://stackoverflow.com/a/21919644>`_). """ def __init__(self): self.signal_received = None self.old_handler = None def __enter__(self): self.signal_received = False self.old_handler = signal.signal(signal.SIGINT, self._handler) def _handler(self, sig, frame): self.signal_received = (sig, frame) self.logger.debug("SIGINT received, delaying KeyboardInterrupt...") def __exit__(self, exc_type, exc_val, exc_tb): signal.signal(signal.SIGINT, self.old_handler) if self.signal_received: self.old_handler(*self.signal_received)
#--------------------------- Constrained data frame ----------------------------
[docs]@logged class ConstrainedDataFrame(pd.DataFrame): r""" Data frame with certain format constraints Note ---- Format constraints are checked and maintained automatically. """ def __init__(self, *args, **kwargs) -> None: df = pd.DataFrame(*args, **kwargs) df = self.rectify(df) self.verify(df) super().__init__(df) def __setitem__(self, key, value) -> None: super().__setitem__(key, value) self.verify(self) @property def _constructor(self) -> type: return type(self)
[docs] @classmethod def rectify(cls, df: pd.DataFrame) -> pd.DataFrame: r""" Rectify data frame for format integrity Parameters ---------- df Data frame to be rectified Returns ------- rectified_df Rectified data frame """ return df
[docs] @classmethod def verify(cls, df: pd.DataFrame) -> None: r""" Verify data frame for format integrity Parameters ---------- df Data frame to be verified """
@property def df(self) -> pd.DataFrame: r""" Convert to regular data frame """ return pd.DataFrame(self) def __repr__(self) -> str: r""" Note ---- We need to explicitly call :func:`repr` on the regular data frame to bypass integrity verification, because when the terminal is too narrow, :mod:`pandas` would split the data frame internally, causing format verification to fail. """ return repr(self.df)
#--------------------------- Other utility functions ---------------------------
[docs]def get_chained_attr(x: Any, attr: str) -> Any: r""" Get attribute from an object, with support for chained attribute names. Parameters ---------- x Object to get attribute from attr Attribute name Returns ------- attr_value Attribute value """ for k in attr.split("."): if not hasattr(x, k): raise AttributeError(f"{attr} not found!") x = getattr(x, k) return x
[docs]def in_ipynb() -> bool: # pragma: no cover r""" Determine whether running in an ipynb environment. Returns ------- flag Whether running in an ipynb environment """ try: import IPython if IPython.get_ipython().__class__.__name__ == "ZMQInteractiveShell": return True except ModuleNotFoundError: return False return False
[docs]def smart_tqdm(*args, **kwargs): r""" Use ``tqdm.tqdm`` or ``tqdm.tqdm_notebook`` adaptively based on return value of :func:`in_ipynb`. Returns ------- tqdm A tqdm instance """ if in_ipynb(): from tqdm.notebook import tqdm # pragma: no cover else: from tqdm import tqdm return tqdm(*args, **kwargs)
[docs]def get_rs(x: RandomState = None) -> np.random.RandomState: r""" Get random state object Parameters ---------- x Object that can be converted to a random state object Returns ------- rs Random state object """ if isinstance(x, int): return np.random.RandomState(x) if isinstance(x, np.random.RandomState): return x return np.random
[docs]@logged def run_command( command: str, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, log_command: bool = True, print_output: bool = True, err_message: Optional[Mapping[int, str]] = None, **kwargs ) -> Optional[List[str]]: r""" Run an external command and get realtime output Parameters ---------- command A string containing the command to be executed stdout Where to redirect stdout stderr Where to redirect stderr echo_command Whether to log the command being printed (log level is INFO) print_output Whether to print stdout of the command. If ``stdout`` is PIPE and ``print_output`` is set to False, the output will be returned as a list of output lines. err_message Look up dict of error message (indexed by error code) **kwargs Other keyword arguments to be passed to :class:`subprocess.Popen` Returns ------- output_lines A list of output lines (only returned if ``stdout`` is PIPE and ``print_output`` is False) """ if log_command: run_command.logger.info("Executing external command: %s", command) executable = command.split(" ")[0] with subprocess.Popen(command, stdout=stdout, stderr=stderr, shell=True, **kwargs) as p: if stdout == subprocess.PIPE: prompt = f"{executable} ({p.pid}): " output_lines = [] def _handle(line): line = line.strip().decode() if print_output: print(prompt + line) else: output_lines.append(line) while True: _handle(p.stdout.readline()) ret = p.poll() if ret is not None: # Handle output between last readlines and successful poll for line in p.stdout.readlines(): _handle(line) break else: output_lines = None ret = p.wait() if ret != 0: err_message = err_message or {} if ret in err_message: err_message = " " + err_message[ret] elif "__default__" in err_message: err_message = " " + err_message["__default__"] else: err_message = "" raise RuntimeError( f"{executable} exited with error code: {ret}.{err_message}") if stdout == subprocess.PIPE and not print_output: return output_lines