Branching workflows in Wagtail

Branching workflows in Wagtail

based on value of specified Page field

I wrote this post in order to demonstrate and document a working solution to this Stack Overflow question which I asked on September 2nd, 2021. Big ups to LB Ben Johnston, who outlined the solution and provided a proof of concept.

Contents

The problem

Suppose we have an organization that annually publishes a booklet consisting of daily reflections. This booklet is published towards the end of the year, for use in the coming year. The daily reflections are written by different authors, and we would like to have a publishing workflow where, for instance, the daily reflections from January to June are reviewed by one group, and those from July to December are reviewed by another group, as illustrated below:

Branching Workflows based on value of specified Page field

We have a DailyReflectionPage Model with a reflection_date field that forms the basis for the Page's slug, which is in the form YYYY-MM-DD. Here's an extract of the Page model:

class DailyReflectionPage(Page):
    """
    The Daily Reflection Model
    """
    ...
    ...

    reflection_date = models.DateField("Reflection Date", max_length=254)
    ...
    ...
    @cached_property
    def date(self):
        """
        Returns the Reflection's date as a string in %Y-%m-%d format
        """
        fmt = "%Y-%m-%d"
        date_as_string = (self.reflection_date).strftime(fmt)
        return date_as_string      
    ...
    ...
    def full_clean(self, *args, **kwargs):
        # first call the built-in cleanups (including default slug generation)
        super(DailyReflectionPage, self).full_clean(*args, **kwargs)

        # now make your additional modifications
        if self.slug is not self.date:
            self.slug = self.date
    ...
    ...

If you want to see the complete Page Model, then have a look at the models.py file in the reflections app. This link points to a commit before implementation of the solution outlined herein.

The solution

I initially created a gist showing the full implementation of LB's solution. It only consists of themodels.py file, so if you just want to jump straight to the basic solution, then you can look at it. However, if you would like to see a more complete solution within the wider context of an actual Wagtail project, you can have a look at the sample Wagtail project I created to accompany this post. The source code is available at github.com/engineervix/wagtail-branching-workflows.

0. First things first

If you would like to follow along, starting from the point before implementing the solution, you can clone the repo and checkout commit 5745df

git clone https://github.com/engineervix/wagtail-branching-workflows.git
cd wagtail-branching-workflows
git checkout 5745df

The project uses docker and docker-compose, so make sure that these are installed and working on your machine:

# check that you have docker on your machine
docker -v

# check that you have docker-compose on your machine
docker-compose -v

Then create the required .env files:

cp -v app/.envs/.dev.env.sample app/.envs/.dev.env
cp -v app/.envs/.test.env.sample app/.envs/.test.env

Build the images and spin up the containers:

docker-compose up -d --build

You'll have to wait a few seconds for some processes to initialize / run (postgres, database migrations, browser-sync, Django server, etc.). You can check the status via

docker-compose logs web

When all set, you should see something like this:

web_1  | Performing system checks...
web_1  | 
web_1  | [Browsersync] Proxying: http://127.0.0.1:8000
web_1  | [Browsersync] Access URLs:
web_1  |  -----------------------------------
web_1  |        Local: http://localhost:3000
web_1  |     External: http://172.19.0.3:3000
web_1  |  -----------------------------------
web_1  |           UI: http://localhost:3001
web_1  |  UI External: http://localhost:3001
web_1  |  -----------------------------------
web_1  | [Browsersync] Watching files...
web_1  | System check identified no issues (0 silenced).
web_1  | 
web_1  | Django version 3.2.8, using settings 'config.settings.dev'
web_1  | Development server is running at http://0.0.0.0:8000/
web_1  | Using the Werkzeug debugger (http://werkzeug.pocoo.org/)
web_1  | Quit the server with CONTROL-C.
web_1  | [Browsersync] Reloading Browsers... (buffered 2 events)
web_1  |  * Debugger is active!
web_1  |  * Debugger PIN: 104-102-219

You can now proceed to create a superuser:

docker-compose exec web ./manage.py createsuperuser

Load initial data:

docker-compose exec web ./manage.py load_initial_data

This initial data includes 6 users with the following details:

No.Email AddressPasswordGroupFirst NameLast Name
1WriterPassword1WritersJohnDoe
2WriterPassword2WritersJaneDoe
3WriterPassword3WritersAnotherWriter
4ModeratorPassword1ModeratorsGinaStephenson
5ModeratorPassword2ModeratorsGeorgeBenson
6ApproverPassword0ApproversConnieMontgomery

You can access the dev server at http://127.0.0.1:3009. This project uses MailDev for viewing and testing emails generated during development. The MailDev server is accessible at http://localhost:1089.

1. Update the DailyReflectionPage model

First, we add a method called date_in_first_semester:

@cached_property
def date_in_first_semester(self):
    """
    Returns True if Reflection's date
    is in the first half of the year
    """
    month = (self.reflection_date).month
    return month <= 6

Then, we add a method called get_approval_group_key which will return a simple Boolean or maybe something like 'A' or 'B' (feel free to suit this to your liking).

def get_approval_group_key(self):
    # custom logic here that checks all the date stuff
    if self.date_in_first_semester:
        return "A"
    return "B"

2. Create a new Task

We will now create a new Task that extends the Wagtail Task class.

The following official Wagtail resources are essential in gaining an understanding of what's going on:

  1. how to add a new Task Type
  2. Task model reference.
  3. source code of the built-in GroupApprovalTask
# only **additional** imports are shown here
from django import forms
...
...
# from wagtail.core.models import Page
from wagtail.core.models import Group, Page, Task, TaskState, WorkflowState
...
...

class SplitGroupApprovalTask(Task):

    ## note: this is the simplest approach, two fields of linked groups, you could further refine this approach as needed.

    groups_a = models.ManyToManyField(
        Group,
        verbose_name="for Jan - June Daily Reflections",
        help_text="Pages at this step in a workflow will be moderated or approved by these groups of users",
        related_name="split_task_group_a",
    )
    groups_b = models.ManyToManyField(
        Group,
        verbose_name="for Jul - Dec Daily Reflections",
        help_text="Pages at this step in a workflow will be moderated or approved by these groups of users",
        related_name="split_task_group_b",
    )

    admin_form_fields = Task.admin_form_fields + ["groups_a", "groups_b"]
    admin_form_widgets = {
        "groups_a": forms.CheckboxSelectMultiple,
        "groups_b": forms.CheckboxSelectMultiple,
    }

    def get_approval_groups(self, page):
        """This method gets used by all checks when determining what group to allow/assign this Task to"""

        # recommend some checks here, what if `get_approval_group` is not on the Page?

        # here's a simple check
        if hasattr(page.specific, "get_approval_group_key"):
            approval_group = page.specific.get_approval_group_key()
        else:
            # arbitrarily assign to group A 
            # (you could instead do something else)
            approval_group = "A"

        if approval_group == "A":
            return self.groups_a

        return self.groups_b

    # each of the following methods will need to be implemented, all checking for the correct groups for the Page when called
    # def start(self, ...etc)
    # def user_can_access_editor(self, ...etc)
    # def page_locked_for_user(self, ...etc)
    # def user_can_lock(self, ...etc)
    # def user_can_unlock(self, ...etc)
    # def get_task_states_user_can_moderate(self, ...etc)

    def start(self, workflow_state, user=None):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        approval_groups = self.get_approval_groups(workflow_state.page)

        if workflow_state.page.locked_by:
            # If the person who locked the page isn't in one of the groups, unlock the page
            # if not workflow_state.page.locked_by.groups.filter(id__in=self.groups.all()).exists():
            if not approval_groups.filter(id__in=self.groups.all()).exists():
                workflow_state.page.locked = False
                workflow_state.page.locked_by = None
                workflow_state.page.locked_at = None
                workflow_state.page.save(
                    update_fields=["locked", "locked_by", "locked_at"]
                )

        return super().start(workflow_state, user=user)

    def user_can_access_editor(self, page, user):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        approval_groups = self.get_approval_groups(page)

        # return self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
        return (
            approval_groups.filter(id__in=user.groups.all()).exists()
            or user.is_superuser
        )

    def page_locked_for_user(self, page, user):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        approval_groups = self.get_approval_groups(page)

        # return not (self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser)
        return not (
            approval_groups.filter(id__in=user.groups.all()).exists()
            or user.is_superuser
        )

    def user_can_lock(self, page, user):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        approval_groups = self.get_approval_groups(page)

        # return self.groups.filter(id__in=user.groups.all()).exists()
        return approval_groups.filter(id__in=user.groups.all()).exists()

    def user_can_unlock(self, page, user):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        # approval_groups = self.get_approval_groups(page)
        return False

    def get_actions(self, page, user):
        # essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
        approval_groups = self.get_approval_groups(page)

        if (
            approval_groups.filter(id__in=user.groups.all()).exists()
            or user.is_superuser
        ):
            return [
                ("reject", "Request changes", True),
                ("approve", "Approve", False),
                ("approve", "Approve with comment", True),
            ]

        return super().get_actions(page, user)

    def get_task_states_user_can_moderate(self, user, **kwargs):

        # not a very DRY approach, but it works!

        if user.is_superuser:
            return TaskState.objects.filter(
                status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr
            )
        elif self.groups_a.filter(id__in=user.groups.all()).exists():
            return TaskState.objects.filter(
                status=TaskState.STATUS_IN_PROGRESS,
                task=self.task_ptr,
                workflow_state__in=WorkflowState.objects.filter(
                    page__in=DailyReflectionPage.objects.filter(
                        reflection_date__month__lte=6
                    )
                ),
            )
        elif self.groups_b.filter(id__in=user.groups.all()).exists():
            return TaskState.objects.filter(
                status=TaskState.STATUS_IN_PROGRESS,
                task=self.task_ptr,
                workflow_state__in=WorkflowState.objects.filter(
                    page__in=DailyReflectionPage.objects.filter(reflection_date__month__gt=6)
                ),
            )
        else:
            return TaskState.objects.none()

    @classmethod
    def get_description(cls):
        return "Groups are assigned to approve this task based on reflection month"

    class Meta:
        verbose_name = "reflection-month-dependent approval task"
        verbose_name_plural = "reflection-month-dependent approval tasks"

3. Email Notifications

By default, email notifications are sent upon workflow submission, approval and rejection, and upon submission to the built-in GroupApprovalTask. We will replicate this behaviour in our SplitGroupApprovalTask. This is a three-step process:

  1. Create a mail.py file within our reflections app and add the following classes:
    • A base Notifier to send updates for SplitGroupApprovalTask events
    • A notifier to send updates for SplitGroupApprovalTask submission events
  2. Create a signal_handlers.pyfile within our reflections app. In here, we instantiate the notifier, and connect it to the task_submitted signal via the register_signal_handlers() method.
  3. Run register_signal_handlers() upon loading our reflections app

Here's the code:

# Step 1: reflections/mail.py

from django.conf import settings
from django.contrib.auth import get_user_model
from wagtail.admin.mail import EmailNotificationMixin, Notifier
from wagtail.core.models import TaskState

from mysite.reflections.models import SplitGroupApprovalTask


class BaseSplitGroupApprovalTaskStateEmailNotifier(EmailNotificationMixin, Notifier):
    """A base notifier to send updates for SplitGroupApprovalTask events"""

    def __init__(self):
        # Allow TaskState to send notifications
        super().__init__((TaskState))

    def can_handle(self, instance, **kwargs):
        if super().can_handle(instance, **kwargs) and isinstance(
            instance.task.specific, SplitGroupApprovalTask
        ):
            # Don't send notifications if a Task has been cancelled and then resumed - ie page was updated to a new revision
            return not TaskState.objects.filter(
                workflow_state=instance.workflow_state,
                task=instance.task,
                status=TaskState.STATUS_CANCELLED,
            ).exists()
        return False

    def get_context(self, task_state, **kwargs):
        context = super().get_context(task_state, **kwargs)
        context["page"] = task_state.workflow_state.page
        context["task"] = task_state.task.specific
        return context

    def get_recipient_users(self, task_state, **kwargs):

        triggering_user = kwargs.get("user", None)

        approval_groups = task_state.task.specific.get_approval_groups(
            task_state.workflow_state.page
        )

        group_members = get_user_model().objects.filter(
            groups__in=approval_groups.all()
        )

        recipients = group_members

        include_superusers = getattr(
            settings, "WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS", True
        )
        if include_superusers:
            superusers = get_user_model().objects.filter(is_superuser=True)
            recipients = recipients | superusers

        if triggering_user:
            recipients = recipients.exclude(pk=triggering_user.pk)

        return recipients


class SplitGroupApprovalTaskStateSubmissionEmailNotifier(
    BaseSplitGroupApprovalTaskStateEmailNotifier
):
    """A notifier to send updates for SplitGroupApprovalTask submission events"""

    notification = "submitted"
# Step 2: reflections/signal_handlers.py

from wagtail.core.signals import task_submitted

from mysite.reflections.mail import (
    SplitGroupApprovalTaskStateSubmissionEmailNotifier,
)

task_submission_email_notifier = SplitGroupApprovalTaskStateSubmissionEmailNotifier()


def register_signal_handlers():
    task_submitted.connect(
        task_submission_email_notifier,
        dispatch_uid="task_submitted_email_notification",
    )
# Step 3: reflections/apps.py

from django.apps import AppConfig


class ReflectionsConfig(AppConfig):
    name = "mysite.reflections"

    def ready(self):
        from .signal_handlers import register_signal_handlers

        register_signal_handlers()

You can check out the Extending Wagtail --> How to add new Task types --> Adding notifications section of the Wagtail Docs for more information.

In my initial stages of trying to figure this out, the example source code in the docs was not up to date. I submitted a PR to fix this, and it was merged.

You might also wanna have a look at the workflow settings reference for further workflow customization.

Well, that's about it! You can run migrations, log into the Wagtail Admin, create some content, some workflows and tasks ... check out the video below to see this in action!

Demo


Did you find this article valuable?

Support Victor Miti by becoming a sponsor. Any amount is appreciated!