Source code for snakeface.apps.main.tasks

__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2020-2021, Vanessa Sochat"
__license__ = "MPL 2.0"

from snakeface.settings import cfg
from django.contrib import messages
from django.shortcuts import redirect, get_object_or_404
from snakeface.apps.main.models import Workflow
from snakeface.apps.users.models import User
from snakeface.apps.main.utils import CommandRunner, ThreadRunner

import re

# Notebook run workflow functions


[docs]def run_workflow(request, wid, uid): """Top level function to ensure that the user has permission to do the run, and we direct to the correct function (notebook or not written, another backend) """ workflow = get_object_or_404(Workflow, pk=wid) user = get_object_or_404(User, pk=uid) running_notebook = cfg.NOTEBOOK or cfg.NOTEBOOK_ONLY # Ensure the user has permission to run the workflow if user not in workflow.members: messages.info(request, "You are not allowed to run this workflow.") # The workflow cannot already be running elif workflow.status == "RUNNING": messages.info(request, "This workflow is already running.") elif run_is_allowed(request) and running_notebook: workflow.reset() t = ThreadRunner(target=doRun, args=[workflow.id, user.id]) t.setDaemon(True) t.set_workflow(workflow) t.start() workflow.thread = t.thread_id workflow.save() messages.success(request, "Workflow %s has started running." % workflow.id) else: messages.info(request, "Snakeface currently only supports notebook runs.") return redirect("main:view_workflow", wid=workflow.id)
# Permissions
[docs]def run_is_allowed(request): """Given a request, check that the run is allowed meaning: 1. If running a notebook, we aren't over quota for jobs 2. If not running a notebook, we aren't over user or global limits """ running_notebook = cfg.NOTEBOOK or cfg.NOTEBOOK_ONLY running_jobs = Workflow.objects.filter(status="RUNNING").count() allowed = True if ( running_notebook and cfg.MAXIMUM_NOTEBOOK_JOBS and running_jobs >= cfg.MAXIMUM_NOTEBOOK_JOBS ): messages.info( request, "You already have the maximum %s jobs running." % cfg.MAXIMUM_NOTEBOOK_JOBS, ) allowed = False elif ( not running_notebook and cfg.USER_WORKFLOW_RUNS_LIMIT >= Workflow.objects.filter(user=request.user, status="RUNNING").count() ): messages.info( request, "You are at your workflow active runs limit. Finish some and try again later.", ) allowed = False elif not running_notebook and cfg.USER_WORKFLOW_GLOBAL_RUNS_LIMIT >= running_jobs: messages.info( request, "The server is at the global limit of workflow runs. Try again later.", ) allowed = False return allowed
# Statuses
[docs]def serialize_workflow_statuses(workflow): """A shared helper function to serialize a list of workflow statuses into json. """ levels = { "debug": "primary", "dag_debug": "primary", "info": "info", "warning": "warning", "error": "danger", } data = [] for i, status in enumerate(workflow.workflowstatus_set.all()): entry = status.msg msg = entry.get("msg", "") level = levels.get(entry.get("level"), "secondary") badge = "<span class='badge badge-%s'>%s</span>" % ( level, entry.get("level", "info"), ) # If it's a traceback, format as code if msg and re.search("traceback|exception", msg, re.IGNORECASE): msg = "<code>%s</code>" % msg.replace("\n", "<br>") entry.update( { "order": i, "job": entry.get("job", ""), "msg": msg, "level": badge, } ) data.append(entry) return data
[docs]def doRun(wid, uid): """The task to run a workflow""" workflow = Workflow.objects.get(pk=wid) user = User.objects.get(pk=uid) runner = CommandRunner() workflow.status = "RUNNING" workflow.save() # Define the function to determine cancelling the run def cancel_workflow(wid): workflow = Workflow.objects.get(pk=wid) return workflow.status == "CANCELLED" # Run the command, update when finished runner.run_command( workflow.command.split(" "), env={"WMS_MONITOR_TOKEN": user.token}, cancel_func=cancel_workflow, cancel_func_kwargs={"wid": wid}, ) workflow.error = "<br>".join(runner.error) workflow.output = "<br>".join(runner.output) workflow.status = "NOTRUNNING" workflow.retval = runner.retval workflow.save()