The Obvious Way
I have model
Locator. When it is created I want it to queue a Celery task to
asynchronously fetch and process a web page. First write a test that looks something like this:
class TestLocator(TestCase): … def test_queues_fetch_when_locator_created(self): with self.settings(NOTES_FETCH_LOCATORS=True), \ patch.object(tasks, 'fetch_locator_page') as fetch_locator_page: locator = Locator.objects.create(url='https://example.com/1') fetch_locator_page.delay.assert_called_once_with( locator.pk, if_not_fetched_since=None)
NOTES_FETCH_LOCATORS setting will normally be false when running tests: it
avoids having to clutter all other tests with patches to prevent irrelevant
tasks from being queued.
The implementation is a handler for the
def on_locator_post_save(sender, instance, created, **kwargs): if created and settings.NOTES_FETCH_LOCATORS: instance.queue_fetch_page()
queue_fatch_page method is just a wrapper around invoking the
delay method of the task, something like this:
class Locator(models.Model): … def queue_fetch_page(self): """Arrange for page to be fetched.""" from . import tasks t = self.fetched.timestamp() if self.fetched else None tasks.fetch_locator_page.delay(self.pk, if_not_fetched_since=t)
import is inside the function because it cannot go at the top of the
module without creating circular imports.) I also needed to write some code in
apps.py to add
ready method that wires the signal handler to the
There is a problem with this, however.
All the automated tests worked fine but when I tried creating
instances on a running server I got
Locator.DoesNotExist exceptions raised
in the Celery worker processor. Naturally I could check and see that the
locator in question did exist. Mysterious!
The cause was (of course) a race condition, in this case a race between
- Celery queuing the task and it in turn attempting to retrieve the Locator entity from the database, and
- the transaction that creates the entity completing so that the entity is there to be retrieved.
The Fix: transaction.on_commit
If I were the perfectly logical TDD programmer I would at this point have rewritten the test to ensure the task is queued outside the transaction. As it was I didn’t think of this until later but let’s pretend I did:
class TestLocator(TestCase): … def test_queues_fetch_when_locator_created(self): with self.settings(NOTES_FETCH_LOCATORS=True), \ patch.object(tasks, 'fetch_locator_page') as fetch_locator_page: with transaction.atomic(): locator = Locator.objects.create(url='https://example.com/1') self.assertFalse(fetch_locator_page.delay.called) fetch_locator_page.delay.assert_called_once_with( locator.pk, if_not_fetched_since=None)
As of Django 1.9 there is a mechanism for achieving this:
transaction.on_commit queues a hook function to be called
when the current transaction commits. So all we need is a minor change to the
class Locator(models.Model): … def queue_fetch_page(self): """Arrange for page to be fetched.""" from . import tasks t = self.fetched.timestamp() if self.fetched else None transaction.on_commit( lambda: tasks.fetch_locator_page.delay(self.pk, if_not_fetched_since=t))
If the creation of the Locator instance is inside a transaction then this
ensures we wait until it is known to exist before we queue the task that
requires it. If there is no wrapping transaction then
on_commit calls the
hook immediately, so it works either way.
The Problem with the Fix, and a Fix for That
The trouble is I could not get the unit tests to pass: the mock function
delay was not being called. The cause of this is that the tests are
themselves all run within
transaction.atomic(), so the
on_commit hook is
not called until after the outer transaction, thus after the test has completed.
I tried various things before doing what in retrospect is the obvious fix: use
TestCase subclass that does not wrap tests in a transaction. This
is in fact its superclass
TransactionTestCase, which instead
of using transactions to reset the database after each test, truncates the
database tables explicitly.
Everything I needed to fix this problem was nicely documented in Django’s excellent website as footnotes to the respective features of the framework. This entry just draws together the scattered threads so I will do it right first time next time.
Moral: There is never a good time to not be worrying about concurrency in today’s crazy, messed-up world.