Python async grew up. By 2026 the modern primitives (TaskGroup, asyncio.timeout, structured concurrency) make most async code cleaner than the gather-and-pray patterns of 2020. This post is the working set.

TaskGroup (Python 3.11+)

import asyncio

async def fetch_user(uid):
    ...

async def fetch_orders(uid):
    ...

async def handler(uid):
    async with asyncio.TaskGroup() as tg:
        user_task = tg.create_task(fetch_user(uid))
        orders_task = tg.create_task(fetch_orders(uid))
    return user_task.result(), orders_task.result()

Both tasks run concurrently. If one fails: other is cancelled, exception propagates as ExceptionGroup.

Compare to the old asyncio.gather:

# OLD — lossy / clunky
results = await asyncio.gather(fetch_user(uid), fetch_orders(uid), return_exceptions=True)
# Exceptions are values. Or `gather(...)` raises first error and silently leaks the rest.

TaskGroup is strictly better. Use it.

asyncio.timeout (Python 3.11+)

async def fetch_with_timeout(url):
    async with asyncio.timeout(5):
        return await client.get(url)

Cleaner than asyncio.wait_for. Composes naturally:

async with asyncio.timeout(30):
    async with asyncio.TaskGroup() as tg:
        tg.create_task(slow_a())
        tg.create_task(slow_b())

If 30s elapses: both tasks cancelled, TimeoutError raised.

Cancellation

async def worker():
    try:
        while True:
            await asyncio.sleep(1)
            do_work()
    except asyncio.CancelledError:
        await cleanup()
        raise

Always re-raise CancelledError. Swallowing it breaks task cleanup.

For shielded sections (don’t cancel mid-write):

async def critical():
    try:
        await asyncio.shield(write_to_db())
    except asyncio.CancelledError:
        # Caller cancelled, but write_to_db completes.
        raise

Use sparingly — shielded code blocks the cancel.

Concurrency limit

sem = asyncio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await client.get(url)

async def main():
    async with asyncio.TaskGroup() as tg:
        for url in urls:
            tg.create_task(fetch(url))

Bound parallelism — don’t open 10000 sockets at once.

For larger pipelines: asyncio.Queue + N workers.

Backpressure with Queue

async def producer(q):
    for item in source:
        await q.put(item)  # awaits if queue is full
    for _ in range(N): await q.put(None)  # poison pills

async def consumer(q):
    while True:
        item = await q.get()
        if item is None: return
        await process(item)

async def main():
    q = asyncio.Queue(maxsize=100)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(q))
        for _ in range(10):
            tg.create_task(consumer(q))

Bounded queue = bounded memory + automatic flow control.

Async iterators

async def stream_results():
    async for row in db.iterate("SELECT ..."):
        yield process(row)

async def main():
    async for r in stream_results():
        print(r)

For streamed pipelines, async generators are clean.

Sync code in async context

Don’t block the event loop. For CPU work or blocking IO:

import asyncio

result = await asyncio.to_thread(blocking_function, arg)

to_thread ships work to a thread pool. Default pool is fine for IO; use a ProcessPoolExecutor for CPU-bound work.

Common pitfall: forgotten tasks

async def f():
    asyncio.create_task(send_email())  # detached: errors silently dropped

If send_email raises: nobody sees it. Use TaskGroup or save the reference and await/cleanup.

For fire-and-forget: log errors via a wrapper:

def fire_and_forget(coro):
    async def wrap():
        try: await coro
        except Exception: log.exception("background task failed")
    asyncio.create_task(wrap())

ExceptionGroup handling (3.11+)

TaskGroup raises ExceptionGroup if multiple tasks fail:

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(may_fail_a())
        tg.create_task(may_fail_b())
except* ConnectionError as eg:
    log.warning("connection issues: %s", eg.exceptions)
except* ValueError as eg:
    log.error("validation: %s", eg.exceptions)

except* matches inside the group. Worth learning.

anyio

For libs that should support both asyncio and trio:

import anyio

async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(work_a)
        tg.start_soon(work_b)

Same structured concurrency, broader compatibility. FastAPI uses anyio under the hood. For most app code: pick asyncio or anyio and stick.

Common mistakes

1. asyncio.gather forgetting return_exceptions=True

First error raised; later tasks silently lost. Use TaskGroup.

2. Blocking calls in async functions

time.sleep(), requests.get(), sync DB. Pin the event loop. Use to_thread or async libs.

3. Detached create_task

Errors swallowed. Always own the task or wrap with try/except.

4. asyncio.run inside another async

Can’t nest event loops. If you need to call async from sync: asyncio.run once at the top.

5. Forgetting to close clients

client = httpx.AsyncClient()
# ... never closed → connection leak

Use async with httpx.AsyncClient() as c: or initialize once at app start with cleanup.

What I’d ship today

For new Python 3.11+ code:

  • TaskGroup instead of gather.
  • asyncio.timeout instead of wait_for.
  • async with for resource lifecycle.
  • Bounded queues for fan-out/fan-in.
  • to_thread for blocking work.
  • Structured cancellation: every coroutine handles CancelledError correctly.

Read this next

If you want my asyncio production patterns reference, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .