__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2020-2021, Vanessa Sochat"
__license__ = "MPL 2.0"
from django.db.models.signals import pre_save
from django.db import models
from django.conf import settings
from django.urls import reverse
from django.contrib.postgres.fields import JSONField as DjangoJSONField
from snakeface.apps.main.utils import CommandRunner, write_file, get_tmpfile, read_file
from snakeface.argparser import SnakefaceParser
from snakeface.settings import cfg
from django.db.models import Field
import itertools
import json
import os
PRIVACY_CHOICES = (
(False, "Public (The workflow collection will be accessible by anyone)"),
(True, "Private (The workflow collection will be not listed.)"),
)
class JSONField(DjangoJSONField):
pass
if "sqlite" in settings.DATABASES["default"]["ENGINE"]:
[docs] class JSONField(Field):
[docs] def db_type(self, connection):
return "text"
[docs] def from_db_value(self, value, expression, connection):
if value is not None:
return self.to_python(value)
return value
[docs] def to_python(self, value):
if value is not None:
try:
return json.loads(value)
except (TypeError, ValueError):
return value
return value
[docs] def get_prep_value(self, value):
if value is not None:
return str(json.dumps(value))
return value
[docs] def value_to_string(self, obj):
return self.value_from_object(obj)
RUNNING_CHOICES = [
("RUNNING", "RUNNING"),
("NOTRUNNING", "NOTRUNNING"),
("CANCELLED", "CANCELLED"),
]
[docs]class Workflow(models.Model):
"""A workflow is associated with a specific git repository and one or more
workflow runs.
"""
add_date = models.DateTimeField("date published", auto_now_add=True)
command = models.TextField(blank=False, null=False)
data = JSONField(blank=False, null=False, default="{}")
dag = models.TextField(blank=True, null=True)
error = models.TextField(blank=True, null=True)
output = models.TextField(blank=True, null=True)
modify_date = models.DateTimeField("date modified", auto_now=True)
name = models.CharField(max_length=250, unique=True, blank=True, null=True)
snakefile = models.TextField(blank=False, null=False, max_length=250)
snakemake_id = models.TextField(blank=False, null=False)
status = models.TextField(
choices=RUNNING_CHOICES, default="NOTRUNNING", blank=False, null=False
)
thread = models.PositiveIntegerField(default=None, blank=True, null=True)
retval = models.PositiveIntegerField(default=None, blank=True, null=True)
workdir = models.TextField(blank=False, null=False, max_length=250)
owners = models.ManyToManyField(
"users.User",
blank=True,
default=None,
related_name="workflow_owners",
related_query_name="owners",
)
contributors = models.ManyToManyField(
"users.User",
related_name="workflow_contributors",
related_query_name="contributor",
blank=True,
help_text="users with edit permission to the workflow",
verbose_name="Contributors",
)
[docs] def get_absolute_url(self):
return_cid = self.id
return reverse("workflow_details", args=[str(return_cid)])
# By default, collections are public
private = models.BooleanField(
choices=PRIVACY_CHOICES,
default=cfg.PRIVATE_ONLY,
verbose_name="Accessibility",
)
@property
def message_fields(self):
fields = set()
for status in self.workflowstatus_set.all():
[fields.add(x) for x in status.msg]
return fields
[docs] def has_view_permission(self):
if cfg.NOTEBOOK or cfg.NOTEBOOK_ONLY:
return True
return (self.private and self.request.user in self.members) or not self.private
[docs] def update_command(self, command=None, do_save=False):
"""Given a command (or an automated save from the signal) update
the command for the workflow.
"""
if command:
self.command = command
else:
parser = SnakefaceParser()
parser.load(self.data)
self.command = parser.command + " --wms-monitor-arg id=%s" % self.id
if do_save:
self.save()
[docs] def update_dag(self, do_save=False):
"""given a snakefile, run the command to update the dag"""
if self.snakefile and os.path.exists(self.snakefile):
runner = CommandRunner()
# First generate the dag, save to temporary dot file
runner.run_command(
["snakemake", "--dag"], cwd=os.path.dirname(self.snakefile)
)
filename = write_file(
get_tmpfile("snakeface-dag-", ".dot"), "".join(runner.output)
)
# Next generate the svg with dot, save to model
runner.run_command(
["dot", "-Tsvg", os.path.basename(filename)],
cwd=os.path.dirname(filename),
)
self.dag = "".join(runner.output)
os.remove(filename)
# If running from a signal, would be infinite loop
if do_save:
self.save()
def __str__(self):
return "[workflow:%s]" % self.name
[docs] def get_label(self):
return "workflow"
@property
def members(self):
return list(itertools.chain(self.owners.all(), self.contributors.all()))
[docs] def get_report(self):
"""load the report file, if it exists."""
report_file = self._get_report_file()
if report_file:
return read_file(report_file)
[docs] def reset(self):
"""Empty all run related fields to prepare for a new run."""
self.output = None
self.error = None
self.retval = None
self.workflowstatus_set.all().delete()
self.save()
[docs] def has_report(self):
"""returns True if the workflow command has a designated report, and
the report file exists
"""
if self._get_report_file():
return True
return False
def _get_report_file(self):
report_file = self.data.get("report")
fullpath = None
if report_file:
fullpath = os.path.join(self.workdir, report_file)
return fullpath
[docs] def has_edit_permission(self):
"""If we are running in a notebook environment, there is just one user
that has edit access to anything. Otherwise, the user must be an owner
"""
if cfg.NOTEBOOK or cfg.NOTEBOOK_ONLY:
return True
return (
not self.request.user.is_anonymous
and self.request.user in self.members
and self.private
)
class Meta:
app_label = "main"
[docs]class WorkflowStatus(models.Model):
"""A workflow status is a status message send from running a workflow"""
# executor = models.TextField(null=False, blank=False)
add_date = models.DateTimeField("date published", auto_now_add=True)
modify_date = models.DateTimeField("date modified", auto_now=True)
msg = JSONField(blank=False, null=False, default="{}")
workflow = models.ForeignKey(
"main.Workflow", null=False, blank=False, on_delete=models.CASCADE
)
[docs]def update_workflow(sender, instance, **kwargs):
instance.update_dag()
instance.update_command()
pre_save.connect(update_workflow, sender=Workflow)