Here’s a quick note about how to write tests for Django models that need to queue Celery tasks when they are created.
The Obvious Way
I have model Locator
. When it is created I want it to queue a Celery task to
asynchronously fetch and process a web page. First write a test that looks something like this:
class TestLocator(TestCase):
…
def test_queues_fetch_when_locator_created(self):
with self.settings(NOTES_FETCH_LOCATORS=True), \
patch.object(tasks, 'fetch_locator_page') as fetch_locator_page:
locator = Locator.objects.create(url='https://example.com/1')
fetch_locator_page.delay.assert_called_once_with(
locator.pk,
if_not_fetched_since=None)
The NOTES_FETCH_LOCATORS
setting will normally be false when running tests: it
avoids having to clutter all other tests with patches to prevent irrelevant
tasks from being queued.
The implementation is a handler for the post_save
signal:
def on_locator_post_save(sender, instance, created, **kwargs):
if created and settings.NOTES_FETCH_LOCATORS:
instance.queue_fetch_page()
where the queue_fatch_page
method is just a wrapper around invoking the
delay
method of the task, something like this:
class Locator(models.Model):
…
def queue_fetch_page(self):
"""Arrange for page to be fetched."""
from . import tasks
t = self.fetched.timestamp() if self.fetched else None
tasks.fetch_locator_page.delay(self.pk, if_not_fetched_since=t)
(The import
is inside the function because it cannot go at the top of the
module without creating circular imports.) I also needed to write some code in
the apps.py
to add ready
method that wires the signal handler to the
signal.
There is a problem with this, however.
The Problem
All the automated tests worked fine but when I tried creating Locator
instances on a running server I got Locator.DoesNotExist
exceptions raised
in the Celery worker processor. Naturally I could check and see that the
locator in question did exist. Mysterious!
The cause was (of course) a race condition, in this case a race between
- Celery queuing the task and it in turn attempting to retrieve the Locator entity from the database, and
- the transaction that creates the entity completing so that the entity is there to be retrieved.
The Fix: transaction.on_commit
If I were the perfectly logical TDD programmer I would at this point have rewritten the test to ensure the task is queued outside the transaction. As it was I didn’t think of this until later but let’s pretend I did:
class TestLocator(TestCase):
…
def test_queues_fetch_when_locator_created(self):
with self.settings(NOTES_FETCH_LOCATORS=True), \
patch.object(tasks, 'fetch_locator_page') as fetch_locator_page:
with transaction.atomic():
locator = Locator.objects.create(url='https://example.com/1')
self.assertFalse(fetch_locator_page.delay.called)
fetch_locator_page.delay.assert_called_once_with(
locator.pk,
if_not_fetched_since=None)
As of Django 1.9 there is a mechanism for achieving this:
transaction.on_commit
queues a hook function to be called
when the current transaction commits. So all we need is a minor change to the
queue method:
class Locator(models.Model):
…
def queue_fetch_page(self):
"""Arrange for page to be fetched."""
from . import tasks
t = self.fetched.timestamp() if self.fetched else None
transaction.on_commit(
lambda: tasks.fetch_locator_page.delay(self.pk, if_not_fetched_since=t))
If the creation of the Locator instance is inside a transaction then this
ensures we wait until it is known to exist before we queue the task that
requires it. If there is no wrapping transaction then on_commit
calls the
hook immediately, so it works either way.
The Problem with the Fix, and a Fix for That
The trouble is I could not get the unit tests to pass: the mock function
delay
was not being called. The cause of this is that the tests are
themselves all run within transaction.atomic()
, so the on_commit
hook is
not called until after the outer transaction, thus after the test has completed.
I tried various things before doing what in retrospect is the obvious fix: use
a different TestCase
subclass that does not wrap tests in a transaction. This
is in fact its superclass TransactionTestCase
, which instead
of using transactions to reset the database after each test, truncates the
database tables explicitly.
Lessons
Everything I needed to fix this problem was nicely documented in Django’s excellent website as footnotes to the respective features of the framework. This entry just draws together the scattered threads so I will do it right first time next time.
Moral: There is never a good time to not be worrying about concurrency in today’s crazy, messed-up world.