Celery + Django cheatsheet.
Install
uv add celery redis
Project setup
# config/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ["celery_app"]
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TASK_TIME_LIMIT = 300
CELERY_TASK_SOFT_TIME_LIMIT = 270
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Task
# blog/tasks.py
from celery import shared_task
import logging
log = logging.getLogger(__name__)
@shared_task
def send_email_async(to, subject, body):
log.info("sending to %s", to)
send_email(to, subject, body)
Calling tasks
# Async (in queue)
send_email_async.delay("[email protected]", "subj", "body")
# More control
send_email_async.apply_async(
args=["[email protected]", "subj", "body"],
countdown=60, # delay 60s
eta=datetime(2026, 1, 1), # run at specific time
queue="emails",
priority=10,
)
Running
celery -A config worker -l info
celery -A config worker -l info -Q emails,default
celery -A config worker -l info --concurrency=4
Retries
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def flaky_task(self, x):
try:
do_something(x)
except Exception as e:
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
bind=True gives access to self (the task instance).
Auto-retry on specific exceptions
@shared_task(
autoretry_for=(ConnectionError, TimeoutError),
max_retries=5,
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def task():
...
Idempotent tasks
Make tasks safe to run twice:
@shared_task
def charge(payment_id, idempotency_key):
p = Payment.objects.get(id=payment_id)
if Charge.objects.filter(idempotency_key=idempotency_key).exists():
return # already charged
Charge.objects.create(payment_id=payment_id, idempotency_key=idempotency_key, ...)
Periodic tasks (Celery Beat)
uv add django-celery-beat
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
celery -A config beat -l info
Schedule in admin OR settings:
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"cleanup-old-logs": {
"task": "blog.tasks.cleanup_logs",
"schedule": crontab(hour=2, minute=0),
},
"every-minute": {
"task": "blog.tasks.poll",
"schedule": 60.0,
},
}
Groups, chains, chords
from celery import group, chain, chord
# Sequential
chain(task1.s(), task2.s(), task3.s())()
# Parallel
group(task.s(x) for x in items)()
# Map-reduce
chord((task.s(x) for x in items), reducer.s())()
Result handling
result = task.delay(...)
result.get() # block until done (don't in views!)
result.get(timeout=10)
result.ready()
result.successful()
result.failed()
result.id
For long results, store in DB; don’t rely on result backend.
Signals
from celery.signals import task_failure, task_success
@task_failure.connect
def handle_failure(sender=None, task_id=None, exception=None, **kwargs):
log.error("task failed", task_id=task_id, exc=exception)
Monitoring (Flower)
uv add flower
celery -A config flower
Web UI at :5555 showing tasks, workers, queues.
Sending emails async
@shared_task
def send_email_task(to, subject, body):
send_mail(subject, body, "[email protected]", [to])
# In view:
send_email_task.delay(user.email, "Welcome", "Hi!")
Never send sync — request blocks on SMTP.
Periodic cleanup pattern
@shared_task
def cleanup_expired():
Session.objects.filter(expires_at__lt=timezone.now()).delete()
# Beat schedule daily at 3am
DB transaction safety
from django.db import transaction
def view(request):
with transaction.atomic():
order = Order.objects.create(...)
transaction.on_commit(lambda: process_order.delay(order.id))
Without on_commit, the task runs before commit — task can’t find the row.
Long-running task with progress
@shared_task(bind=True)
def process(self, items):
for i, item in enumerate(items):
do(item)
self.update_state(state="PROGRESS", meta={"done": i + 1, "total": len(items)})
result = process.delay(items)
# Poll: result.state, result.info
Common mistakes
- Calling
result.get()synchronously in a view — defeats async. - Long-running task without
time_limit— workers stuck. - Task not idempotent + at-least-once delivery → duplicate work.
- Beat without
DatabaseScheduler— schedule lost on restart. - Forgetting
transaction.on_commit— task can’t find committed row.
Read this next
If you want my Celery setup + retry patterns, they’re at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .