Modern asyncio cheatsheet.

Define and run

import asyncio

async def main():
    print("hi")

asyncio.run(main())

One asyncio.run per process. Don’t nest event loops.

await

async def fetch(url):
    await asyncio.sleep(1)
    return url

async def main():
    result = await fetch("a")

await only inside async def.

Concurrent: TaskGroup (3.11+)

async def main():
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(fetch("a"))
        b = tg.create_task(fetch("b"))
    print(a.result(), b.result())

If one fails: cancels rest; raises ExceptionGroup.

gather (older)

results = await asyncio.gather(fetch("a"), fetch("b"))

# With error handling
results = await asyncio.gather(*coros, return_exceptions=True)
for r in results:
    if isinstance(r, Exception): ...

TaskGroup preferred for new code.

Timeouts

async with asyncio.timeout(5):
    result = await slow()
# raises TimeoutError after 5s

Or:

result = await asyncio.wait_for(slow(), timeout=5)

Cancellation

async def worker():
    try:
        await long_op()
    except asyncio.CancelledError:
        await cleanup()
        raise            # always re-raise!

Swallowing CancelledError breaks the cancellation chain.

Run sync in async

result = await asyncio.to_thread(blocking_io, arg)

For sync libs in async context. Doesn’t help CPU-bound (GIL).

CPU work

from concurrent.futures import ProcessPoolExecutor

pool = ProcessPoolExecutor()

async def compute():
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(pool, heavy_function, arg)

Semaphore (concurrency limit)

sem = asyncio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await http.get(url)

Max 10 concurrent.

Queue (producer / consumer)

q = asyncio.Queue(maxsize=100)

async def producer():
    for i in range(1000):
        await q.put(i)
    await q.put(None)        # poison pill

async def consumer():
    while True:
        item = await q.get()
        if item is None: break
        await process(item)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer())

Event

e = asyncio.Event()

async def waiter():
    await e.wait()
    print("done")

async def signaler():
    await asyncio.sleep(1)
    e.set()

Lock

lock = asyncio.Lock()

async def critical():
    async with lock:
        # only one coroutine at a time
        ...

Rarely needed (asyncio is cooperative; serialization is the default).

Fire-and-forget (carefully)

async def bg(): ...

task = asyncio.create_task(bg())
# task runs; result ignored
# WARNING: errors silently swallowed unless you await or add done_callback

def on_done(t):
    if t.exception():
        log.error(t.exception())

task.add_done_callback(on_done)

Better: use TaskGroup or track the task.

asyncio.gather + cancellation gotcha

# If one fails, gather doesn't cancel others by default
results = await asyncio.gather(*tasks, return_exceptions=True)

TaskGroup does cancel siblings on error.

Run blocking with timeout

async def run_with_timeout():
    try:
        async with asyncio.timeout(5):
            return await asyncio.to_thread(blocking, arg)
    except TimeoutError:
        return None

Common mistakes

  • asyncio.run inside an existing loop.
  • Mixing time.sleep with async — blocks the loop.
  • requests / psycopg2 (sync) in async handlers.
  • Swallowing CancelledError.
  • Detached create_task without callback — silent errors.

Read this next

If you want my asyncio production patterns library, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .