Python async grew up. By 2026 the modern primitives (TaskGroup, asyncio.timeout, structured concurrency) make most async code cleaner than the gather-and-pray patterns of 2020. This post is the working set.
TaskGroup (Python 3.11+)
import asyncio
async def fetch_user(uid):
...
async def fetch_orders(uid):
...
async def handler(uid):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(uid))
orders_task = tg.create_task(fetch_orders(uid))
return user_task.result(), orders_task.result()
Both tasks run concurrently. If one fails: other is cancelled, exception propagates as ExceptionGroup.
Compare to the old asyncio.gather:
# OLD — lossy / clunky
results = await asyncio.gather(fetch_user(uid), fetch_orders(uid), return_exceptions=True)
# Exceptions are values. Or `gather(...)` raises first error and silently leaks the rest.
TaskGroup is strictly better. Use it.
asyncio.timeout (Python 3.11+)
async def fetch_with_timeout(url):
async with asyncio.timeout(5):
return await client.get(url)
Cleaner than asyncio.wait_for. Composes naturally:
async with asyncio.timeout(30):
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_a())
tg.create_task(slow_b())
If 30s elapses: both tasks cancelled, TimeoutError raised.
Cancellation
async def worker():
try:
while True:
await asyncio.sleep(1)
do_work()
except asyncio.CancelledError:
await cleanup()
raise
Always re-raise CancelledError. Swallowing it breaks task cleanup.
For shielded sections (don’t cancel mid-write):
async def critical():
try:
await asyncio.shield(write_to_db())
except asyncio.CancelledError:
# Caller cancelled, but write_to_db completes.
raise
Use sparingly — shielded code blocks the cancel.
Concurrency limit
sem = asyncio.Semaphore(10)
async def fetch(url):
async with sem:
return await client.get(url)
async def main():
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(fetch(url))
Bound parallelism — don’t open 10000 sockets at once.
For larger pipelines: asyncio.Queue + N workers.
Backpressure with Queue
async def producer(q):
for item in source:
await q.put(item) # awaits if queue is full
for _ in range(N): await q.put(None) # poison pills
async def consumer(q):
while True:
item = await q.get()
if item is None: return
await process(item)
async def main():
q = asyncio.Queue(maxsize=100)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(q))
for _ in range(10):
tg.create_task(consumer(q))
Bounded queue = bounded memory + automatic flow control.
Async iterators
async def stream_results():
async for row in db.iterate("SELECT ..."):
yield process(row)
async def main():
async for r in stream_results():
print(r)
For streamed pipelines, async generators are clean.
Sync code in async context
Don’t block the event loop. For CPU work or blocking IO:
import asyncio
result = await asyncio.to_thread(blocking_function, arg)
to_thread ships work to a thread pool. Default pool is fine for IO; use a ProcessPoolExecutor for CPU-bound work.
Common pitfall: forgotten tasks
async def f():
asyncio.create_task(send_email()) # detached: errors silently dropped
If send_email raises: nobody sees it. Use TaskGroup or save the reference and await/cleanup.
For fire-and-forget: log errors via a wrapper:
def fire_and_forget(coro):
async def wrap():
try: await coro
except Exception: log.exception("background task failed")
asyncio.create_task(wrap())
ExceptionGroup handling (3.11+)
TaskGroup raises ExceptionGroup if multiple tasks fail:
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail_a())
tg.create_task(may_fail_b())
except* ConnectionError as eg:
log.warning("connection issues: %s", eg.exceptions)
except* ValueError as eg:
log.error("validation: %s", eg.exceptions)
except* matches inside the group. Worth learning.
anyio
For libs that should support both asyncio and trio:
import anyio
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(work_a)
tg.start_soon(work_b)
Same structured concurrency, broader compatibility. FastAPI uses anyio under the hood. For most app code: pick asyncio or anyio and stick.
Common mistakes
1. asyncio.gather forgetting return_exceptions=True
First error raised; later tasks silently lost. Use TaskGroup.
2. Blocking calls in async functions
time.sleep(), requests.get(), sync DB. Pin the event loop. Use to_thread or async libs.
3. Detached create_task
Errors swallowed. Always own the task or wrap with try/except.
4. asyncio.run inside another async
Can’t nest event loops. If you need to call async from sync: asyncio.run once at the top.
5. Forgetting to close clients
client = httpx.AsyncClient()
# ... never closed → connection leak
Use async with httpx.AsyncClient() as c: or initialize once at app start with cleanup.
What I’d ship today
For new Python 3.11+ code:
- TaskGroup instead of gather.
- asyncio.timeout instead of wait_for.
async withfor resource lifecycle.- Bounded queues for fan-out/fan-in.
to_threadfor blocking work.- Structured cancellation: every coroutine handles CancelledError correctly.
Read this next
- FastAPI Streaming and SSE 2026
- FastAPI Background Tasks 2026
- Modern Python Tooling 2026
- Python Type Hints 2026
If you want my asyncio production patterns reference, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .