Python’s asyncio had a rocky decade. By Python 3.11+, it grew up: TaskGroup, ExceptionGroup, structured concurrency, real timeouts. By 2026, the patterns are stable and pleasant. This post is the working set.

If you’re still writing asyncio.gather and asyncio.wait, this post will save you bugs.

Structured concurrency — the big idea

Before TaskGroup:

results = await asyncio.gather(fetch_a(), fetch_b(), fetch_c())

The problem: if fetch_b() raises and fetch_a() is still running, what happens? Default: gather returns the exception and leaves fetch_a running in the background. Resource leaks. Surprising errors. Hours debugging.

With TaskGroup (Python 3.11+):

async with asyncio.TaskGroup() as tg:
    a = tg.create_task(fetch_a())
    b = tg.create_task(fetch_b())
    c = tg.create_task(fetch_c())

# After exit, all three are done. None leak.
results = (a.result(), b.result(), c.result())

If any task raises, the group cancels the others, waits for them to finish (allowing cleanup), and raises an ExceptionGroup. If multiple tasks raise, all errors propagate.

This is structured concurrency: tasks have a defined lifetime tied to a scope. They cannot outlive their scope. It’s the same insight that made with blocks superior to manual f.close().

ExceptionGroup — handling multiple errors

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(fetch_a())
        tg.create_task(fetch_b())
except* ConnectionError as eg:
    log.error("connection problems: %s", eg.exceptions)
except* TimeoutError as eg:
    log.error("timeouts: %s", eg.exceptions)

except* is a Python 3.11 syntax — pattern-matches against exception groups. Each branch handles one exception type; multiple branches can fire if the group has multiple types.

Your old code:

try:
    await fetch()
except ConnectionError:
    ...

Still works against single exceptions. Use except* only when you might handle a group.

Timeouts — the right way

Old:

try:
    result = await asyncio.wait_for(fetch(), timeout=5)
except asyncio.TimeoutError:
    ...

wait_for is fine, but the modern equivalent is asyncio.timeout (3.11+):

async with asyncio.timeout(5):
    result = await fetch()

Cleaner, composes better, and the cancellation semantics are correct. You can also reschedule:

async with asyncio.timeout(5) as cm:
    if special_case:
        cm.reschedule(asyncio.get_running_loop().time() + 30)
    await fetch()

For “deadline” semantics across many calls, use timeout_at:

deadline = time.monotonic() + 30
async with asyncio.timeout_at(deadline):
    ...

Cancellation — what to know

Cancellation is async Python’s most subtle topic. Three rules:

1. Don’t catch CancelledError unless you re-raise

# ❌ swallowing — cancellation no longer works
try:
    await fetch()
except CancelledError:
    pass

# ✅ cleanup, then re-raise
try:
    await fetch()
except CancelledError:
    await cleanup()
    raise

If you swallow CancelledError, the cancellation message is lost and the surrounding TaskGroup/timeout won’t know to stop trying.

2. shield to protect critical sections

async with asyncio.timeout(5):
    await prepare()
    await asyncio.shield(commit_to_db())   # don't cancel mid-commit

shield makes a coroutine uncancellable from the outside. Your DB commit completes; only then does the cancellation propagate.

3. Be cancellation-safe in resource handling

# ✅ Resource is released even if we're cancelled
async with conn.transaction():
    await do_work()

Async context managers’ __aexit__ is called on cancellation. Use them everywhere instead of try/finally for resources.

Concurrency primitives — when each is right

asyncio.gather (still useful)

a, b, c = await asyncio.gather(fa(), fb(), fc(), return_exceptions=True)

For when you genuinely want return_exceptions=True semantics: collect successes and failures both, no all-or-nothing. Otherwise, prefer TaskGroup.

as_completed — process in completion order

for coro in asyncio.as_completed([fa(), fb(), fc()]):
    result = await coro
    process(result)

Useful when you want to make progress on the fast ones without waiting for the slow ones.

asyncio.Queue — pipelines

queue: asyncio.Queue[Job] = asyncio.Queue(maxsize=100)

async def producer():
    for j in pull_jobs():
        await queue.put(j)
    await queue.put(None)        # sentinel

async def consumer():
    while True:
        j = await queue.get()
        if j is None:
            break
        await handle(j)
        queue.task_done()

async with asyncio.TaskGroup() as tg:
    tg.create_task(producer())
    for _ in range(8):
        tg.create_task(consumer())

Bounded queue → backpressure. Eight workers run in parallel. The pattern.

asyncio.Semaphore — bounded concurrency

sem = asyncio.Semaphore(10)

async def fetch_one(url):
    async with sem:
        return await client.get(url)

async with asyncio.TaskGroup() as tg:
    for url in urls:
        tg.create_task(fetch_one(url))

Cap simultaneous in-flight requests to avoid overwhelming downstreams. Critical when you fan out to many tasks.

anyio — portable async

anyio is the cross-runtime equivalent: same code on asyncio and Trio. By 2026, anyio is the default in major libraries (httpx, FastAPI, AsyncIO bridges).

Key reasons to use anyio:

  • Library code — write once, work in either runtime.
  • Better cancellation semantics — anyio’s TaskGroup is sometimes safer than asyncio’s.
  • Cleaner IO API — file IO, byte streams, networking all in one place.
import anyio

async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(fa)
        tg.start_soon(fb)

    # at exit, both are done

anyio.run(main)

start_soon is the analog of create_task. create_task_group is the analog of TaskGroup. The semantics are equivalent (with edge-case improvements).

Sync code in async — asyncio.to_thread

A sync library you can’t avoid:

result = await asyncio.to_thread(blocking_function, arg1, arg2)

Runs the function on a thread pool, returns asynchronously. Use for:

  • Sync libraries (PIL, hashlib for big inputs, pandas).
  • Filesystem I/O on Windows where async file I/O is broken.
  • CPU-light blocking work.

Don’t use for:

  • CPU-heavy work — threads share the GIL, parallelism is limited. Use ProcessPoolExecutor instead.
  • Long-running blocking — exhausts the default thread pool. Spawn a real thread.

Common patterns

Fan-out, fan-in with results

async def get_all_users(ids: list[int]) -> list[User]:
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(i)) for i in ids]
    return [t.result() for t in tasks]

The standard pattern. Replaces gather for almost every case.

Streaming results to a queue

async def producer(out: asyncio.Queue[User]):
    async with asyncio.TaskGroup() as tg:
        for i in ids:
            tg.create_task(fetch_and_put(i, out))
    await out.put(None)        # done sentinel

Tasks push results into a queue as they complete; downstream consumes. Useful for streaming large fan-outs.

Retry with backoff

async def with_retry(coro_factory, *, attempts=3, base=0.2):
    for i in range(1, attempts + 1):
        try:
            return await coro_factory()
        except RetryableError:
            if i == attempts:
                raise
            await asyncio.sleep(base * 2 ** (i - 1) * (0.5 + random.random()))

Exponential + jitter. Same as the client retry pattern .

Race two things, take the first

async def first_to_finish(a, b):
    async with asyncio.timeout(10):
        async with asyncio.TaskGroup() as tg:
            ta = tg.create_task(a)
            tb = tg.create_task(b)
            done, _ = await asyncio.wait({ta, tb}, return_when=asyncio.FIRST_COMPLETED)
            for t in {ta, tb} - done:
                t.cancel()
            return next(iter(done)).result()

A bit verbose. anyio’s start_soon + cancellation sometimes reads cleaner here.

Common mistakes

1. Forgetting to await

async def bad():
    fetch()        # ⚠ created but never awaited; warning at runtime

Always await or asyncio.create_task(...). The “RuntimeWarning: coroutine was never awaited” is your friend.

2. Mixing sync and async in the same function

If a function calls anything async, it must be async. There’s no “fire and forget” sync→async without asyncio.run (which can’t be called inside a running loop).

3. Long-running tasks without cancellation points

async def cpu_loop():
    while True:
        compute()        # sync, blocks event loop

This freezes the event loop. Either use threads, processes, or insert await asyncio.sleep(0) to yield control.

4. Acquiring sync locks across awaits

lock = threading.Lock()
async def bad():
    with lock:
        await fetch()        # holds the lock during await — deadlock-prone

Use asyncio.Lock for cross-await synchronization, and even then, hold it briefly.

5. asyncio.run() inside a running loop

Calling asyncio.run() from within an async function or notebook crashes. There’s only one loop per thread. Use await main() instead.

When to reach for sync

Async isn’t always the answer. Sync wins when:

  • The whole call tree is sync (no I/O bound work that benefits from concurrency).
  • The code path is short and serial.
  • You want simpler debugging and stack traces.

For CPU-bound work or short scripts, sync is genuinely simpler. Don’t async-ify reflexively.

A real production pattern

Putting it together — a worker that fans out RAG retrieval, with timeouts and bounded concurrency:

async def retrieve_for_questions(questions: list[str]) -> list[list[Doc]]:
    sem = asyncio.Semaphore(10)

    async def one(q: str) -> list[Doc]:
        async with sem:
            async with asyncio.timeout(5):
                emb = await embed(q)
                docs = await pgvector_search(emb, k=8)
                return docs

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(one(q)) for q in questions]
    return [t.result() for t in tasks]

Cap concurrency at 10. Per-call 5-second timeout. Group ensures all complete or all are cancelled cleanly. Errors propagate as ExceptionGroup that the caller can handle.

Read this next

If you want a working async-FastAPI starter using these patterns, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .