Celery + Django cheatsheet.

Install

uv add celery redis

Project setup

# config/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery("myapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ["celery_app"]
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_TASK_TIME_LIMIT = 300
CELERY_TASK_SOFT_TIME_LIMIT = 270
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Task

# blog/tasks.py
from celery import shared_task
import logging

log = logging.getLogger(__name__)

@shared_task
def send_email_async(to, subject, body):
    log.info("sending to %s", to)
    send_email(to, subject, body)

Calling tasks

# Async (in queue)
send_email_async.delay("[email protected]", "subj", "body")

# More control
send_email_async.apply_async(
    args=["[email protected]", "subj", "body"],
    countdown=60,                   # delay 60s
    eta=datetime(2026, 1, 1),       # run at specific time
    queue="emails",
    priority=10,
)

Running

celery -A config worker -l info
celery -A config worker -l info -Q emails,default
celery -A config worker -l info --concurrency=4

Retries

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def flaky_task(self, x):
    try:
        do_something(x)
    except Exception as e:
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

bind=True gives access to self (the task instance).

Auto-retry on specific exceptions

@shared_task(
    autoretry_for=(ConnectionError, TimeoutError),
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def task():
    ...

Idempotent tasks

Make tasks safe to run twice:

@shared_task
def charge(payment_id, idempotency_key):
    p = Payment.objects.get(id=payment_id)
    if Charge.objects.filter(idempotency_key=idempotency_key).exists():
        return                       # already charged
    Charge.objects.create(payment_id=payment_id, idempotency_key=idempotency_key, ...)

Periodic tasks (Celery Beat)

uv add django-celery-beat
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
celery -A config beat -l info

Schedule in admin OR settings:

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    "cleanup-old-logs": {
        "task": "blog.tasks.cleanup_logs",
        "schedule": crontab(hour=2, minute=0),
    },
    "every-minute": {
        "task": "blog.tasks.poll",
        "schedule": 60.0,
    },
}

Groups, chains, chords

from celery import group, chain, chord

# Sequential
chain(task1.s(), task2.s(), task3.s())()

# Parallel
group(task.s(x) for x in items)()

# Map-reduce
chord((task.s(x) for x in items), reducer.s())()

Result handling

result = task.delay(...)

result.get()                    # block until done (don't in views!)
result.get(timeout=10)
result.ready()
result.successful()
result.failed()
result.id

For long results, store in DB; don’t rely on result backend.

Signals

from celery.signals import task_failure, task_success

@task_failure.connect
def handle_failure(sender=None, task_id=None, exception=None, **kwargs):
    log.error("task failed", task_id=task_id, exc=exception)

Monitoring (Flower)

uv add flower
celery -A config flower

Web UI at :5555 showing tasks, workers, queues.

Sending emails async

@shared_task
def send_email_task(to, subject, body):
    send_mail(subject, body, "[email protected]", [to])

# In view:
send_email_task.delay(user.email, "Welcome", "Hi!")

Never send sync — request blocks on SMTP.

Periodic cleanup pattern

@shared_task
def cleanup_expired():
    Session.objects.filter(expires_at__lt=timezone.now()).delete()

# Beat schedule daily at 3am

DB transaction safety

from django.db import transaction

def view(request):
    with transaction.atomic():
        order = Order.objects.create(...)
        transaction.on_commit(lambda: process_order.delay(order.id))

Without on_commit, the task runs before commit — task can’t find the row.

Long-running task with progress

@shared_task(bind=True)
def process(self, items):
    for i, item in enumerate(items):
        do(item)
        self.update_state(state="PROGRESS", meta={"done": i + 1, "total": len(items)})
result = process.delay(items)
# Poll: result.state, result.info

Common mistakes

  • Calling result.get() synchronously in a view — defeats async.
  • Long-running task without time_limit — workers stuck.
  • Task not idempotent + at-least-once delivery → duplicate work.
  • Beat without DatabaseScheduler — schedule lost on restart.
  • Forgetting transaction.on_commit — task can’t find committed row.

Read this next

If you want my Celery setup + retry patterns, they’re at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .