Modern asyncio cheatsheet.
Define and run
import asyncio
async def main():
print("hi")
asyncio.run(main())
One asyncio.run per process. Don’t nest event loops.
await
async def fetch(url):
await asyncio.sleep(1)
return url
async def main():
result = await fetch("a")
await only inside async def.
Concurrent: TaskGroup (3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
a = tg.create_task(fetch("a"))
b = tg.create_task(fetch("b"))
print(a.result(), b.result())
If one fails: cancels rest; raises ExceptionGroup.
gather (older)
results = await asyncio.gather(fetch("a"), fetch("b"))
# With error handling
results = await asyncio.gather(*coros, return_exceptions=True)
for r in results:
if isinstance(r, Exception): ...
TaskGroup preferred for new code.
Timeouts
async with asyncio.timeout(5):
result = await slow()
# raises TimeoutError after 5s
Or:
result = await asyncio.wait_for(slow(), timeout=5)
Cancellation
async def worker():
try:
await long_op()
except asyncio.CancelledError:
await cleanup()
raise # always re-raise!
Swallowing CancelledError breaks the cancellation chain.
Run sync in async
result = await asyncio.to_thread(blocking_io, arg)
For sync libs in async context. Doesn’t help CPU-bound (GIL).
CPU work
from concurrent.futures import ProcessPoolExecutor
pool = ProcessPoolExecutor()
async def compute():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(pool, heavy_function, arg)
Semaphore (concurrency limit)
sem = asyncio.Semaphore(10)
async def fetch(url):
async with sem:
return await http.get(url)
Max 10 concurrent.
Queue (producer / consumer)
q = asyncio.Queue(maxsize=100)
async def producer():
for i in range(1000):
await q.put(i)
await q.put(None) # poison pill
async def consumer():
while True:
item = await q.get()
if item is None: break
await process(item)
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())
Event
e = asyncio.Event()
async def waiter():
await e.wait()
print("done")
async def signaler():
await asyncio.sleep(1)
e.set()
Lock
lock = asyncio.Lock()
async def critical():
async with lock:
# only one coroutine at a time
...
Rarely needed (asyncio is cooperative; serialization is the default).
Fire-and-forget (carefully)
async def bg(): ...
task = asyncio.create_task(bg())
# task runs; result ignored
# WARNING: errors silently swallowed unless you await or add done_callback
def on_done(t):
if t.exception():
log.error(t.exception())
task.add_done_callback(on_done)
Better: use TaskGroup or track the task.
asyncio.gather + cancellation gotcha
# If one fails, gather doesn't cancel others by default
results = await asyncio.gather(*tasks, return_exceptions=True)
TaskGroup does cancel siblings on error.
Run blocking with timeout
async def run_with_timeout():
try:
async with asyncio.timeout(5):
return await asyncio.to_thread(blocking, arg)
except TimeoutError:
return None
Common mistakes
asyncio.runinside an existing loop.- Mixing
time.sleepwith async — blocks the loop. requests/psycopg2(sync) in async handlers.- Swallowing CancelledError.
- Detached
create_taskwithout callback — silent errors.
Read this next
If you want my asyncio production patterns library, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .