Source code for snakeface.argparser

__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2020-2021, Vanessa Sochat"
__license__ = "MPL 2.0"

from snakemake import get_argument_parser
from snakeface.settings import cfg
from snakeface import settings
from snakeface.apps.main.utils import get_snakefile_choices
from jinja2 import Template
import logging
import os
import json

logger = logging.getLogger("argparser")

# Prepare path to templates
here = os.path.abspath(os.path.dirname(__file__))
templates = os.path.join(here, "apps", "main", "templates", "forms")


[docs]class SnakefaceParser: """A Snakeface Parser is a wrapper to an argparse.Parser, and aims to make it easy to loop over arguments and options, and generate various representations (e.g., an input field) for the interface. The point is not to use it to parse arguments and validate, but to output all fields to a front end form. """ # Update the listing of snakefiles on the parser init snakefiles = get_snakefile_choices() def __init__(self): """load the parser, optionally specifying a profile""" self.parser = get_argument_parser() self._groups = {} self._args = {} self.groups self._errors = [] # A profile can further customize job submission if cfg.PROFILE and os.path.exists(cfg.PROFILE): print("Loading profile %s" % cfg.PROFILE) self.parser = get_argument_parser(cfg.PROFILE) def __str__(self): return "[snakeface-parser]" def __repr__(self): return self.__str__() @property def errors(self): return " ".join(self._errors)
[docs] def get(self, name, default=None): """A general get function to return an argument that might be nested under a group. These objects are the same as linked in _groups. """ return self._args.get(name, default)
[docs] def load(self, argdict): """Load is a wrapper around set - we loop through a dictionary and set all arguments. """ if isinstance(argdict, str): argdict = json.loads(argdict) for key, value in argdict.items(): arg = self._args.get(key) if arg: arg.value = value
[docs] def set(self, name, value): """Set a value for an argument. This is typically what the user has selected.""" arg = self._args.get(name) if arg: arg.value = value
[docs] def to_dict(self): """the opposite of load, this function exports an argument""" return {name: arg.value for name, arg in self._args.items()}
@property def snakefile(self): snakefile = self._args.get("snakefile") if snakefile: return snakefile.value
[docs] def validate(self): """ensure that all required args are defined""" valid = True for key in self.required: if not self._args.get(key): self._errors.append("The %s is required." % key) valid = False return valid
@property def required(self): return ["cores", "snakefile"] @property def command(self): """Given a loaded set of arguments, generate the command.""" command = "snakemake" for name, arg in self._args.items(): if arg.value: # If the value is set to the default, ignore it if arg.value == arg.action["default"] and name not in self.required: continue flag = "" if arg.action["option_strings"]: flag = arg.action["option_strings"][0] # Assemble argument based on type if arg.is_boolean: command += " %s" % flag else: command += " %s %s" % (flag, arg.value) return command @property def groups(self): """yield arguments organized by groups, with the intention to easily map into a form on the front end. The groups seem to have ALL arguments each, so we have to artificially separate them. """ if self._groups: return self._groups # Generate an argument lookup based on dest lookup = {} for action in self.parser._actions: lookup[action.dest] = SnakefaceArgument( action, action.dest in self.required ) # Define choices if action.dest == "snakefile": lookup[action.dest].update_choice_fields({"snakefile": self.snakefiles}) # Set the wms monitor to be this server if action.dest == "wms_monitor": lookup[action.dest].value = settings.DOMAIN_NAME # This top level organizes into groups for group in self.parser._action_groups: group_dict = { a.dest: lookup.get(a.dest) for a in group._group_actions if self.include_argument(a.dest, group.title) } # Store a flattened representation to manipulate later self._args.update(group_dict) # Don't add empty groups if group_dict: self._groups[group.title] = group_dict return self._groups
[docs] def include_argument(self, name, group): """Given an argument name, and a group name, skip if settings disable it """ # Never include these named arguments if name in ["help", "version"]: return False # Skip groups based on specific configuration settings if not cfg.EXECUTOR_CLUSTER and group == "CLUSTER": return False if not cfg.EXECUTOR_GOOGLE_LIFE_SCIENCES and group == "GOOGLE_LIFE_SCIENCE": return False if not cfg.EXECUTOR_KUBERNETES and group == "KUBERNETES": return False if not cfg.EXECUTOR_TIBANNA and group == "TIBANNA": return False if not cfg.EXECUTOR_TIBANNA and group == "TIBANNA": return False if not cfg.EXECUTOR_GA4GH_TES and group == "TES": return False if cfg.DISABLE_SINGULARITY and group == "SINGULARITY": return False if cfg.DISABLE_CONDA and group == "CONDA": return False if cfg.DISABLE_NOTEBOOKS and group == "NOTEBOOKS": return False return True
[docs]class SnakefaceArgument: """A Snakeface argument takes an action from a parser, and is able to easily generate front end views (e.g., a form element) for it """ def __init__(self, action, required=False): self.action = action.__dict__ self.boolean_template = "" self.text_template = "" self.choice_template = "" self.choice_fields = {} self.value = "" self.required = required def __str__(self): return self.action["dest"] def __repr__(self): return self.__str__()
[docs] def update_choice_fields(self, updates): self.choice_fields.update(updates)
@property def field_name(self): return " ".join([x.capitalize() for x in self.action["dest"].split("_")]) @property def is_boolean(self): return self.action["nargs"] == 0 and self.action["const"]
[docs] def field(self): """generate a form field for the argument""" if self.action["dest"] in self.choice_fields: return self.choice_field() if self.is_boolean: return self.boolean_field() return self.text_field()
[docs] def load_template(self, path): """Given a path to a template file, load the template with jinja2""" if os.path.exists(path): with open(path, "r") as fd: template = Template(fd.read()) return template logging.warning("%s does not exist, no template loaded.") return ""
[docs] def boolean_field(self): """generate a boolean field (radio button) via a jinja2 template""" # Ensure that we only load/read the file once if not self.boolean_template: self.boolean_template = self.load_template( os.path.join(templates, "boolean_field.html") ) checked = "checked" if self.action["default"] == True else "" return self.boolean_template.render( label=self.field_name, help=self.action["help"], name=self.action["dest"], checked=checked, required="required" if self.required else "", )
[docs] def text_field(self): """generate a text field for using a pre-loaded jinja2 template""" if not self.text_template: self.text_template = self.load_template( os.path.join(templates, "text_field.html") ) return self.text_template.render( name=self.action["dest"], default=self.action["default"] or self.value, label=self.field_name, help=self.action["help"], required="required" if self.required else "", )
[docs] def choice_field(self): """generate a choice field for using a pre-loaded jinja2 template""" if not self.choice_template: self.choice_template = self.load_template( os.path.join(templates, "choice_field.html") ) return self.choice_template.render( name=self.action["dest"], label=self.field_name, help=self.action["help"], required="required" if self.required else "", choices=self.choice_fields.get(self.action["dest"]), )