I wrote this post in order to demonstrate and document a working solution to this Stack Overflow question which I asked on September 2nd, 2021. Big ups to LB Ben Johnston, who outlined the solution and provided a proof of concept.
Contents
The problem
Suppose we have an organization that annually publishes a booklet consisting of daily reflections. This booklet is published towards the end of the year, for use in the coming year. The daily reflections are written by different authors, and we would like to have a publishing workflow where, for instance, the daily reflections from January to June are reviewed by one group, and those from July to December are reviewed by another group, as illustrated below:
We have a DailyReflectionPage
Model with a reflection_date
field that forms the basis for the Page's slug, which is in the form YYYY-MM-DD
. Here's an extract of the Page model:
class DailyReflectionPage(Page):
"""
The Daily Reflection Model
"""
...
...
reflection_date = models.DateField("Reflection Date", max_length=254)
...
...
@cached_property
def date(self):
"""
Returns the Reflection's date as a string in %Y-%m-%d format
"""
fmt = "%Y-%m-%d"
date_as_string = (self.reflection_date).strftime(fmt)
return date_as_string
...
...
def full_clean(self, *args, **kwargs):
# first call the built-in cleanups (including default slug generation)
super(DailyReflectionPage, self).full_clean(*args, **kwargs)
# now make your additional modifications
if self.slug is not self.date:
self.slug = self.date
...
...
If you want to see the complete Page Model, then have a look at the models.py
file in the reflections
app. This link points to a commit before implementation of the solution outlined herein.
The solution
I initially created a gist showing the full implementation of LB's solution. It only consists of themodels.py
file, so if you just want to jump straight to the basic solution, then you can look at it. However, if you would like to see a more complete solution within the wider context of an actual Wagtail project, you can have a look at the sample Wagtail project I created to accompany this post. The source code is available at github.com/engineervix/wagtail-branching-workflows.
0. First things first
If you would like to follow along, starting from the point before implementing the solution, you can clone the repo and checkout commit 5745df
git clone https://github.com/engineervix/wagtail-branching-workflows.git
cd wagtail-branching-workflows
git checkout 5745df
The project uses docker and docker-compose, so make sure that these are installed and working on your machine:
# check that you have docker on your machine
docker -v
# check that you have docker-compose on your machine
docker-compose -v
Then create the required .env
files:
cp -v app/.envs/.dev.env.sample app/.envs/.dev.env
cp -v app/.envs/.test.env.sample app/.envs/.test.env
Build the images and spin up the containers:
docker-compose up -d --build
You'll have to wait a few seconds for some processes to initialize / run (postgres, database migrations, browser-sync, Django server, etc.). You can check the status via
docker-compose logs web
When all set, you should see something like this:
web_1 | Performing system checks...
web_1 |
web_1 | [Browsersync] Proxying: http://127.0.0.1:8000
web_1 | [Browsersync] Access URLs:
web_1 | -----------------------------------
web_1 | Local: http://localhost:3000
web_1 | External: http://172.19.0.3:3000
web_1 | -----------------------------------
web_1 | UI: http://localhost:3001
web_1 | UI External: http://localhost:3001
web_1 | -----------------------------------
web_1 | [Browsersync] Watching files...
web_1 | System check identified no issues (0 silenced).
web_1 |
web_1 | Django version 3.2.8, using settings 'config.settings.dev'
web_1 | Development server is running at http://0.0.0.0:8000/
web_1 | Using the Werkzeug debugger (http://werkzeug.pocoo.org/)
web_1 | Quit the server with CONTROL-C.
web_1 | [Browsersync] Reloading Browsers... (buffered 2 events)
web_1 | * Debugger is active!
web_1 | * Debugger PIN: 104-102-219
You can now proceed to create a superuser:
docker-compose exec web ./manage.py createsuperuser
Load initial data:
docker-compose exec web ./manage.py load_initial_data
This initial data includes 6 users with the following details:
No. | Email Address | Password | Group | First Name | Last Name |
1 | john.doe@example.com | WriterPassword1 | Writers | John | Doe |
2 | jane.doe@example.com | WriterPassword2 | Writers | Jane | Doe |
3 | another.writer@example.com | WriterPassword3 | Writers | Another | Writer |
4 | moderator.one@example.org | ModeratorPassword1 | Moderators | Gina | Stephenson |
5 | moderator.two@example.org | ModeratorPassword2 | Moderators | George | Benson |
6 | chief@example.org | ApproverPassword0 | Approvers | Connie | Montgomery |
You can access the dev server at http://127.0.0.1:3009. This project uses MailDev for viewing and testing emails generated during development. The MailDev server is accessible at http://localhost:1089.
1. Update the DailyReflectionPage
model
First, we add a method called date_in_first_semester
:
@cached_property
def date_in_first_semester(self):
"""
Returns True if Reflection's date
is in the first half of the year
"""
month = (self.reflection_date).month
return month <= 6
Then, we add a method called get_approval_group_key
which will return a simple Boolean or maybe something like 'A' or 'B' (feel free to suit this to your liking).
def get_approval_group_key(self):
# custom logic here that checks all the date stuff
if self.date_in_first_semester:
return "A"
return "B"
2. Create a new Task
We will now create a new Task
that extends the Wagtail Task
class.
The following official Wagtail resources are essential in gaining an understanding of what's going on:
- how to add a new Task Type
- Task model reference.
- source code of the built-in
GroupApprovalTask
# only **additional** imports are shown here
from django import forms
...
...
# from wagtail.core.models import Page
from wagtail.core.models import Group, Page, Task, TaskState, WorkflowState
...
...
class SplitGroupApprovalTask(Task):
## note: this is the simplest approach, two fields of linked groups, you could further refine this approach as needed.
groups_a = models.ManyToManyField(
Group,
verbose_name="for Jan - June Daily Reflections",
help_text="Pages at this step in a workflow will be moderated or approved by these groups of users",
related_name="split_task_group_a",
)
groups_b = models.ManyToManyField(
Group,
verbose_name="for Jul - Dec Daily Reflections",
help_text="Pages at this step in a workflow will be moderated or approved by these groups of users",
related_name="split_task_group_b",
)
admin_form_fields = Task.admin_form_fields + ["groups_a", "groups_b"]
admin_form_widgets = {
"groups_a": forms.CheckboxSelectMultiple,
"groups_b": forms.CheckboxSelectMultiple,
}
def get_approval_groups(self, page):
"""This method gets used by all checks when determining what group to allow/assign this Task to"""
# recommend some checks here, what if `get_approval_group` is not on the Page?
# here's a simple check
if hasattr(page.specific, "get_approval_group_key"):
approval_group = page.specific.get_approval_group_key()
else:
# arbitrarily assign to group A
# (you could instead do something else)
approval_group = "A"
if approval_group == "A":
return self.groups_a
return self.groups_b
# each of the following methods will need to be implemented, all checking for the correct groups for the Page when called
# def start(self, ...etc)
# def user_can_access_editor(self, ...etc)
# def page_locked_for_user(self, ...etc)
# def user_can_lock(self, ...etc)
# def user_can_unlock(self, ...etc)
# def get_task_states_user_can_moderate(self, ...etc)
def start(self, workflow_state, user=None):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
approval_groups = self.get_approval_groups(workflow_state.page)
if workflow_state.page.locked_by:
# If the person who locked the page isn't in one of the groups, unlock the page
# if not workflow_state.page.locked_by.groups.filter(id__in=self.groups.all()).exists():
if not approval_groups.filter(id__in=self.groups.all()).exists():
workflow_state.page.locked = False
workflow_state.page.locked_by = None
workflow_state.page.locked_at = None
workflow_state.page.save(
update_fields=["locked", "locked_by", "locked_at"]
)
return super().start(workflow_state, user=user)
def user_can_access_editor(self, page, user):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
approval_groups = self.get_approval_groups(page)
# return self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
return (
approval_groups.filter(id__in=user.groups.all()).exists()
or user.is_superuser
)
def page_locked_for_user(self, page, user):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
approval_groups = self.get_approval_groups(page)
# return not (self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser)
return not (
approval_groups.filter(id__in=user.groups.all()).exists()
or user.is_superuser
)
def user_can_lock(self, page, user):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
approval_groups = self.get_approval_groups(page)
# return self.groups.filter(id__in=user.groups.all()).exists()
return approval_groups.filter(id__in=user.groups.all()).exists()
def user_can_unlock(self, page, user):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
# approval_groups = self.get_approval_groups(page)
return False
def get_actions(self, page, user):
# essentially a copy of this method on `GroupApprovalTask` but with the ability to have a dynamic 'group' returned.
approval_groups = self.get_approval_groups(page)
if (
approval_groups.filter(id__in=user.groups.all()).exists()
or user.is_superuser
):
return [
("reject", "Request changes", True),
("approve", "Approve", False),
("approve", "Approve with comment", True),
]
return super().get_actions(page, user)
def get_task_states_user_can_moderate(self, user, **kwargs):
# not a very DRY approach, but it works!
if user.is_superuser:
return TaskState.objects.filter(
status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr
)
elif self.groups_a.filter(id__in=user.groups.all()).exists():
return TaskState.objects.filter(
status=TaskState.STATUS_IN_PROGRESS,
task=self.task_ptr,
workflow_state__in=WorkflowState.objects.filter(
page__in=DailyReflectionPage.objects.filter(
reflection_date__month__lte=6
)
),
)
elif self.groups_b.filter(id__in=user.groups.all()).exists():
return TaskState.objects.filter(
status=TaskState.STATUS_IN_PROGRESS,
task=self.task_ptr,
workflow_state__in=WorkflowState.objects.filter(
page__in=DailyReflectionPage.objects.filter(reflection_date__month__gt=6)
),
)
else:
return TaskState.objects.none()
@classmethod
def get_description(cls):
return "Groups are assigned to approve this task based on reflection month"
class Meta:
verbose_name = "reflection-month-dependent approval task"
verbose_name_plural = "reflection-month-dependent approval tasks"
3. Email Notifications
By default, email notifications are sent upon workflow submission, approval and rejection, and upon submission to the built-in GroupApprovalTask
. We will replicate this behaviour in our SplitGroupApprovalTask
. This is a three-step process:
- Create a
mail.py
file within ourreflections
app and add the following classes:- A base
Notifier
to send updates forSplitGroupApprovalTask
events - A notifier to send updates for
SplitGroupApprovalTask
submission events
- A base
- Create a
signal_handlers.py
file within ourreflections
app. In here, we instantiate the notifier, and connect it to thetask_submitted
signal via theregister_signal_handlers()
method. - Run
register_signal_handlers()
upon loading ourreflections
app
Here's the code:
# Step 1: reflections/mail.py
from django.conf import settings
from django.contrib.auth import get_user_model
from wagtail.admin.mail import EmailNotificationMixin, Notifier
from wagtail.core.models import TaskState
from mysite.reflections.models import SplitGroupApprovalTask
class BaseSplitGroupApprovalTaskStateEmailNotifier(EmailNotificationMixin, Notifier):
"""A base notifier to send updates for SplitGroupApprovalTask events"""
def __init__(self):
# Allow TaskState to send notifications
super().__init__((TaskState))
def can_handle(self, instance, **kwargs):
if super().can_handle(instance, **kwargs) and isinstance(
instance.task.specific, SplitGroupApprovalTask
):
# Don't send notifications if a Task has been cancelled and then resumed - ie page was updated to a new revision
return not TaskState.objects.filter(
workflow_state=instance.workflow_state,
task=instance.task,
status=TaskState.STATUS_CANCELLED,
).exists()
return False
def get_context(self, task_state, **kwargs):
context = super().get_context(task_state, **kwargs)
context["page"] = task_state.workflow_state.page
context["task"] = task_state.task.specific
return context
def get_recipient_users(self, task_state, **kwargs):
triggering_user = kwargs.get("user", None)
approval_groups = task_state.task.specific.get_approval_groups(
task_state.workflow_state.page
)
group_members = get_user_model().objects.filter(
groups__in=approval_groups.all()
)
recipients = group_members
include_superusers = getattr(
settings, "WAGTAILADMIN_NOTIFICATION_INCLUDE_SUPERUSERS", True
)
if include_superusers:
superusers = get_user_model().objects.filter(is_superuser=True)
recipients = recipients | superusers
if triggering_user:
recipients = recipients.exclude(pk=triggering_user.pk)
return recipients
class SplitGroupApprovalTaskStateSubmissionEmailNotifier(
BaseSplitGroupApprovalTaskStateEmailNotifier
):
"""A notifier to send updates for SplitGroupApprovalTask submission events"""
notification = "submitted"
# Step 2: reflections/signal_handlers.py
from wagtail.core.signals import task_submitted
from mysite.reflections.mail import (
SplitGroupApprovalTaskStateSubmissionEmailNotifier,
)
task_submission_email_notifier = SplitGroupApprovalTaskStateSubmissionEmailNotifier()
def register_signal_handlers():
task_submitted.connect(
task_submission_email_notifier,
dispatch_uid="task_submitted_email_notification",
)
# Step 3: reflections/apps.py
from django.apps import AppConfig
class ReflectionsConfig(AppConfig):
name = "mysite.reflections"
def ready(self):
from .signal_handlers import register_signal_handlers
register_signal_handlers()
You can check out the Extending Wagtail --> How to add new Task types --> Adding notifications section of the Wagtail Docs for more information.
In my initial stages of trying to figure this out, the example source code in the docs was not up to date. I submitted a PR to fix this, and it was merged.
You might also wanna have a look at the workflow settings reference for further workflow customization.
Well, that's about it! You can run migrations, log into the Wagtail Admin, create some content, some workflows and tasks ... check out the video below to see this in action!