Source code for snakeface.apps.main.utils

__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2020-2021, Vanessa Sochat"
__license__ = "MPL 2.0"

from snakeface.settings import cfg
import subprocess
import threading

import tempfile
import os
import re


[docs]def get_workdir_choices(path=None): """Given the working directory set on init, return potential subdirectories.""" path = path or cfg.WORKDIR choices = [(path, "/")] # Recursive to working directory is default for root, dirs, files in sorted(os.walk(path)): for f in sorted(dirs): if f == "__pycache__": continue fullpath = os.path.join(root, f) # Ignore all hidden files and paths if "/." in f or "/." in fullpath or "/." in root: continue choices.append((fullpath, fullpath)) return choices
[docs]def get_snakefile_choices(path=None): """Given the working directory set on init, return all discovered snakefiles.""" path = path or cfg.WORKDIR choices = [] # Recursive to working directory is default for root, dirs, files in sorted(os.walk(path)): for f in sorted(files): fullpath = os.path.join(root, f) if re.search("Snakefile", f): choices.append((fullpath, fullpath)) return choices
[docs]def write_file(filename, content): """Write some text content to a file""" with open(filename, "w") as fd: fd.write(content) return filename
[docs]def read_file(filename): """Write some text content to a file""" with open(filename, "r") as fd: content = fd.read() return content
[docs]def get_tmpfile(prefix="", suffix=""): """get a temporary file with an optional prefix. By default, the file is closed (and just a name returned). Arguments: - prefix (str) : prefix with this string """ tmpdir = tempfile.gettempdir() prefix = os.path.join(tmpdir, os.path.basename(prefix)) fd, tmp_file = tempfile.mkstemp(prefix=prefix, suffix=suffix) os.close(fd) return tmp_file
[docs]class ThreadRunner(threading.Thread): """We need to be able to run a Snakemake job as a thread, and kill it if an exception is raised based on it's id """
[docs] def set_workflow(self, workflow): self.workflow = workflow
@property def thread_id(self): """Return the id of the thread, either attributed to the class or by matching the Thread instance """ if hasattr(self, "_thread_id"): return self._thread_id for thread_id, thread in threading._active.items(): if thread is self: return thread_id
[docs]class CommandRunner(object): """Wrapper to use subprocess to run a command. This is based off of pypi vendor distlib SubprocesMixin. """ def __init__(self): self.reset()
[docs] def reset(self): self.error = [] self.output = [] self.retval = None
[docs] def reader(self, stream, context): """Get output and error lines and save to command runner.""" # Make sure we save to the correct field lines = self.error if context == "stdout": lines = self.output while True: s = stream.readline() if not s: break lines.append(s.decode("utf-8")) stream.close()
[docs] def run_command( self, cmd, env=None, cancel_func=None, cancel_func_kwargs=None, **kwargs ): self.reset() cancel_func_kwargs = cancel_func_kwargs or {} # If we need to update the environment # **IMPORTANT: this will include envars from host. Absolutely cannot # be any secrets (they should be defined in the app settings file) envars = os.environ.copy() if env: envars.update(env) p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=envars, **kwargs ) # Create threads for error and output t1 = threading.Thread(target=self.reader, args=(p.stdout, "stdout")) t1.start() t2 = threading.Thread(target=self.reader, args=(p.stderr, "stderr")) t2.start() # Continue running unless cancel function is called counter = 0 while True: # Check on process for finished or cancelled if p.poll() != None: print("Return value found, stopping.") break # Check the cancel function every 100 loops elif ( counter % 10000 == 0 and cancel_func and cancel_func(**cancel_func_kwargs) ): print("Process is terminated") p.terminate() break counter += 1 # p.wait() t1.join() t2.join() self.retval = p.returncode return self.output