Using Celery in Django Production Setup
Best practices around writing celery tasks for production
Introduction
It is not rare for a Django web application to contain one or more asynchronous Celery tasks, whether periodic or sporadic. Asynchronous tasks handle time-consuming user requests such as sending emails in the background so as to prevent the foreground HTTP request from timing out. They are essential for any non-hello world web application.
After I started using Celery for async tasks for Konigle web app, I made several mistakes around writing a good celery task along the way. Over time, I have come across many tips that helped me to overcome my mistakes. This is an attempt to list them out so that you don't make the same mistake again! I will include code snippets wherever possible to make the concepts clear. Let's go.
Always write idempotent tasks
So, what does it actually mean? This means that the task shouldn't cause unintended effects even if called multiple times with the same arguments.
Since the tasks are run in a distributed setup, it is possible that the same task can be run more than once. Hence, the task should be written to handle this possibility.
Let's consider a simple task that sends a welcome email to new signups in a Django app -
python
from celery import shared_task
from django.contrib.auth.models import User
from django.db.transaction import atomic
def send_welcome_email(user: User) -> bool:
pass
@shared_task()
def user_onboard_task(user_id: int):
with atomic():
user = User.objects.select_for_update.get(pk=user_id)
if user.onboarded:
return
sent = send_welcome_email(user)
if sent:
user.onboarded = True
user.save(update_fields=['onboarded'])
If you see the above task, we check for a flag to make sure that the email was not sent before and then proceed to send the email. Moreover, we are taking a database lock just to address the situation where two instances of the same task with the same arguments are executing simultaneously. In this scenario, whichever task gets the lock first, will send the email and update the flag while the other task will see that the email is already sent and simply return. This makes the task idempotent, meaning that it can now be called any number of times without worrying about unintended effects - in this case, sending a welcome email multiple times. Well, that's not a good onboarding experience for any user after all!
Handle Task Failures
It is natural for a task to fail. For example, even the simplest task of sending an email can fail due to network error or provider API downtime, or some other reason. A production-ready task must handle failures and include a retry strategy.
When it comes to celery, there are two ways that you can retry a failed task.
- Retry the task upon specific exception from within the task You can catch the known exceptions and retry the task after a delay as shown below. Both default retry delay and manual override are supported by Celery.
python
from celery import shared_task
from django.contrib.auth.models import User
from django.db.transaction import atomic
def send_welcome_email(user: User) -> bool:
pass
@shared_task(bind=True, default_retry_delay=30)
def user_onboard_task(self, user_id: int):
with atomic():
# maybe it is not a good idea to hold on to the database lock for such a long time.
# a distributed lock will suffice
user = User.objects.select_for_update.get(pk=user_id)
if user.onboarded:
return
try:
sent = send_welcome_email(user)
except NetworkError as exc:
# retry after 10 minutes
raise self.retry(exc=exc, countdown=10)
except APIError as exc:
# this will retry after the default delay of 30 seconds
raise self.retry(exc=exc)
if sent:
user.onboarded = True
user.save(update_fields=['onboarded'])
- Configure task for auto-retry Starting celery version 4.0, you can configure a task to automatically retry for known exceptions. Celery will retry the task automatically after the specified delay up to a specified number of attempts. I would not repeat these details here. The instructions for auto-retry are provided in celery documentation
Making the task idempotent is even more important when you are retrying. A task can fail halfway and leave it in an intermediate state. Without idempotency, retrying may lead to unintended effects.
For example, if the task is responsible for sending a welcome email and SMS to the user, and if it fails after sending the email, retrying will send the email again if you don't make the task idempotent.
Worker pool configuration
By default, each task is executed in a separate thread by the Celery worker. As you know, threads have their own stack, they turn out to be a bit expensive for certain types of tasks that are purely I/O bound.
I/O bound tasks - tasks that wait for external network requests such as API calls, and file operations, and don't do any intensive computations. Tasks that are sending emails, and scraping a web page are good examples of I/O-bound tasks.
CPU & Memory bound tasks - tasks that perform heavy computation(consume a lot of CPU cycles) and require large memory. Tasks that perform facial recognition, image processing, and data analysis are examples of CPU and Memory bound tasks.
Generally, it is a good idea to separate your I/O-bound and CPU-bound tasks into different queues and run separate celery worker processes with appropriate worker pool configurations for these queues.
The worker handling IO-bound tasks can use a lighter gevents or eventlet pool with a high concurrency factor.
celery worker --app=example --pool=gevent --concurrency=100
The worker handling CPU-bound tasks can use the default prefork pool with default concurrency.
celery worker --app=example --loglevel=info
Here is a very good article about celery execution pools
Ignore unnecessary task results
The task results can sometimes overload the result backend. Most of the time the tasks execute in the background and we don't need the results at all. In such cases, it is a good practice to ignore the task results if they are not required.
You can configure the task to ignore all the results by setting a parameter as below. If you need the result for a specific instance/execution of the task, you can explicitly specify when creating the task.
from celery import shared_task
# by default all runs of this task ignore the result
@shared_task(ignore_result=True)
def sample_task():
pass
# result will be stored and returned
result = sample_task.apply_async(ignore_result=False).get(timeout=30)
In fact, celery gives a global configuration to ignore the results by default.
Avoid shared global variables
Remember that each task is executed in its own thread and tasks are executed in parallel by the worker process. It is a bad idea to use global variables that are read and written by the tasks.
In the event that global variables cannot be avoided, access them with a distributed lock. You can easily implement a fast, distributed lock using caching frameworks such memcached. Memcached provides atomic operations on the key-value pairs even under distributed setup and hence can be safely used as a lock. Below is an example - not a very good one though
# assuming memcached is the default Django cache
import time
from contextlib import contextmanager
from django.core.cache import cache
PAGES_SCRAPED = 0
@contextmanager
def scraper_lock(key, timeout: int = 30):
timeout_at = time.monotonic() + timeout
status = cache.add(key, True, timeout)
try:
yield status
finally:
if time.monotonic() < timeout_at and status:
cache.delete(key)
@shared_task()
def scrape_web_page(url: str):
global PAGES_SCRAPED
with scraper_lock('page-scraped-lock') locked:
if locked:
# scrape here
PAGES_SCRAPED += 1
That's it for now. I am pretty sure that there are other tips that I missed or am not aware of. I will update the post when I come across new ones. Feel free to point out and comment about the best practices of using celery in production that I missed. Happy coding!
Author