Python’s asyncio had a rocky decade. By Python 3.11+, it grew up: TaskGroup, ExceptionGroup, structured concurrency, real timeouts. By 2026, the patterns are stable and pleasant. This post is the working set.
If you’re still writing asyncio.gather and asyncio.wait, this post will save you bugs.
Structured concurrency — the big idea
Before TaskGroup:
results = await asyncio.gather(fetch_a(), fetch_b(), fetch_c())
The problem: if fetch_b() raises and fetch_a() is still running, what happens? Default: gather returns the exception and leaves fetch_a running in the background. Resource leaks. Surprising errors. Hours debugging.
With TaskGroup (Python 3.11+):
async with asyncio.TaskGroup() as tg:
a = tg.create_task(fetch_a())
b = tg.create_task(fetch_b())
c = tg.create_task(fetch_c())
# After exit, all three are done. None leak.
results = (a.result(), b.result(), c.result())
If any task raises, the group cancels the others, waits for them to finish (allowing cleanup), and raises an ExceptionGroup. If multiple tasks raise, all errors propagate.
This is structured concurrency: tasks have a defined lifetime tied to a scope. They cannot outlive their scope. It’s the same insight that made with blocks superior to manual f.close().
ExceptionGroup — handling multiple errors
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_a())
tg.create_task(fetch_b())
except* ConnectionError as eg:
log.error("connection problems: %s", eg.exceptions)
except* TimeoutError as eg:
log.error("timeouts: %s", eg.exceptions)
except* is a Python 3.11 syntax — pattern-matches against exception groups. Each branch handles one exception type; multiple branches can fire if the group has multiple types.
Your old code:
try:
await fetch()
except ConnectionError:
...
Still works against single exceptions. Use except* only when you might handle a group.
Timeouts — the right way
Old:
try:
result = await asyncio.wait_for(fetch(), timeout=5)
except asyncio.TimeoutError:
...
wait_for is fine, but the modern equivalent is asyncio.timeout (3.11+):
async with asyncio.timeout(5):
result = await fetch()
Cleaner, composes better, and the cancellation semantics are correct. You can also reschedule:
async with asyncio.timeout(5) as cm:
if special_case:
cm.reschedule(asyncio.get_running_loop().time() + 30)
await fetch()
For “deadline” semantics across many calls, use timeout_at:
deadline = time.monotonic() + 30
async with asyncio.timeout_at(deadline):
...
Cancellation — what to know
Cancellation is async Python’s most subtle topic. Three rules:
1. Don’t catch CancelledError unless you re-raise
# ❌ swallowing — cancellation no longer works
try:
await fetch()
except CancelledError:
pass
# ✅ cleanup, then re-raise
try:
await fetch()
except CancelledError:
await cleanup()
raise
If you swallow CancelledError, the cancellation message is lost and the surrounding TaskGroup/timeout won’t know to stop trying.
2. shield to protect critical sections
async with asyncio.timeout(5):
await prepare()
await asyncio.shield(commit_to_db()) # don't cancel mid-commit
shield makes a coroutine uncancellable from the outside. Your DB commit completes; only then does the cancellation propagate.
3. Be cancellation-safe in resource handling
# ✅ Resource is released even if we're cancelled
async with conn.transaction():
await do_work()
Async context managers’ __aexit__ is called on cancellation. Use them everywhere instead of try/finally for resources.
Concurrency primitives — when each is right
asyncio.gather (still useful)
a, b, c = await asyncio.gather(fa(), fb(), fc(), return_exceptions=True)
For when you genuinely want return_exceptions=True semantics: collect successes and failures both, no all-or-nothing. Otherwise, prefer TaskGroup.
as_completed — process in completion order
for coro in asyncio.as_completed([fa(), fb(), fc()]):
result = await coro
process(result)
Useful when you want to make progress on the fast ones without waiting for the slow ones.
asyncio.Queue — pipelines
queue: asyncio.Queue[Job] = asyncio.Queue(maxsize=100)
async def producer():
for j in pull_jobs():
await queue.put(j)
await queue.put(None) # sentinel
async def consumer():
while True:
j = await queue.get()
if j is None:
break
await handle(j)
queue.task_done()
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
for _ in range(8):
tg.create_task(consumer())
Bounded queue → backpressure. Eight workers run in parallel. The pattern.
asyncio.Semaphore — bounded concurrency
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await client.get(url)
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch_one(url))
Cap simultaneous in-flight requests to avoid overwhelming downstreams. Critical when you fan out to many tasks.
anyio — portable async
anyio is the cross-runtime equivalent: same code on asyncio and Trio. By 2026, anyio is the default in major libraries (httpx, FastAPI, AsyncIO bridges).
Key reasons to use anyio:
- Library code — write once, work in either runtime.
- Better cancellation semantics — anyio’s TaskGroup is sometimes safer than asyncio’s.
- Cleaner IO API — file IO, byte streams, networking all in one place.
import anyio
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(fa)
tg.start_soon(fb)
# at exit, both are done
anyio.run(main)
start_soon is the analog of create_task. create_task_group is the analog of TaskGroup. The semantics are equivalent (with edge-case improvements).
Sync code in async — asyncio.to_thread
A sync library you can’t avoid:
result = await asyncio.to_thread(blocking_function, arg1, arg2)
Runs the function on a thread pool, returns asynchronously. Use for:
- Sync libraries (PIL, hashlib for big inputs, pandas).
- Filesystem I/O on Windows where async file I/O is broken.
- CPU-light blocking work.
Don’t use for:
- CPU-heavy work — threads share the GIL, parallelism is limited. Use
ProcessPoolExecutorinstead. - Long-running blocking — exhausts the default thread pool. Spawn a real thread.
Common patterns
Fan-out, fan-in with results
async def get_all_users(ids: list[int]) -> list[User]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(i)) for i in ids]
return [t.result() for t in tasks]
The standard pattern. Replaces gather for almost every case.
Streaming results to a queue
async def producer(out: asyncio.Queue[User]):
async with asyncio.TaskGroup() as tg:
for i in ids:
tg.create_task(fetch_and_put(i, out))
await out.put(None) # done sentinel
Tasks push results into a queue as they complete; downstream consumes. Useful for streaming large fan-outs.
Retry with backoff
async def with_retry(coro_factory, *, attempts=3, base=0.2):
for i in range(1, attempts + 1):
try:
return await coro_factory()
except RetryableError:
if i == attempts:
raise
await asyncio.sleep(base * 2 ** (i - 1) * (0.5 + random.random()))
Exponential + jitter. Same as the client retry pattern .
Race two things, take the first
async def first_to_finish(a, b):
async with asyncio.timeout(10):
async with asyncio.TaskGroup() as tg:
ta = tg.create_task(a)
tb = tg.create_task(b)
done, _ = await asyncio.wait({ta, tb}, return_when=asyncio.FIRST_COMPLETED)
for t in {ta, tb} - done:
t.cancel()
return next(iter(done)).result()
A bit verbose. anyio’s start_soon + cancellation sometimes reads cleaner here.
Common mistakes
1. Forgetting to await
async def bad():
fetch() # ⚠ created but never awaited; warning at runtime
Always await or asyncio.create_task(...). The “RuntimeWarning: coroutine was never awaited” is your friend.
2. Mixing sync and async in the same function
If a function calls anything async, it must be async. There’s no “fire and forget” sync→async without asyncio.run (which can’t be called inside a running loop).
3. Long-running tasks without cancellation points
async def cpu_loop():
while True:
compute() # sync, blocks event loop
This freezes the event loop. Either use threads, processes, or insert await asyncio.sleep(0) to yield control.
4. Acquiring sync locks across awaits
lock = threading.Lock()
async def bad():
with lock:
await fetch() # holds the lock during await — deadlock-prone
Use asyncio.Lock for cross-await synchronization, and even then, hold it briefly.
5. asyncio.run() inside a running loop
Calling asyncio.run() from within an async function or notebook crashes. There’s only one loop per thread. Use await main() instead.
When to reach for sync
Async isn’t always the answer. Sync wins when:
- The whole call tree is sync (no I/O bound work that benefits from concurrency).
- The code path is short and serial.
- You want simpler debugging and stack traces.
For CPU-bound work or short scripts, sync is genuinely simpler. Don’t async-ify reflexively.
A real production pattern
Putting it together — a worker that fans out RAG retrieval, with timeouts and bounded concurrency:
async def retrieve_for_questions(questions: list[str]) -> list[list[Doc]]:
sem = asyncio.Semaphore(10)
async def one(q: str) -> list[Doc]:
async with sem:
async with asyncio.timeout(5):
emb = await embed(q)
docs = await pgvector_search(emb, k=8)
return docs
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(one(q)) for q in questions]
return [t.result() for t in tasks]
Cap concurrency at 10. Per-call 5-second timeout. Group ensures all complete or all are cancelled cleanly. Errors propagate as ExceptionGroup that the caller can handle.
Read this next
- Async/Await Explained — the basics.
- Modern Python Tooling — the surrounding toolchain.
- Tokio Async Fundamentals — same problem, Rust’s solution.
If you want a working async-FastAPI starter using these patterns, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .