Skip to main content

How to Use the await Keyword in Python

The await keyword is the foundation of Python's asynchronous programming. It tells Python: "Pause this function here, go do other work, and come back when this task is finished."

This mechanism allows a single thread to handle thousands of concurrent operations, such as web requests or database queries, without waiting for each one to complete individually.

Understanding the Mental Model

Think of a waiter in a restaurant:

Synchronous (No await): The waiter takes an order, walks to the kitchen, waits 20 minutes for the chef to cook it, brings it back, and only then takes the next customer's order. This is extremely inefficient.

Asynchronous (With await): The waiter takes an order, gives it to the kitchen, and immediately serves other tables while the food cooks. When the kitchen signals "Order Ready," the waiter returns to deliver it. This is much more efficient.

In Python terms:

  • The Waiter = The Event Loop
  • The Kitchen = I/O operations (network, database, disk)
  • await = "Go serve other tables until this is done"

Basic Usage

You can only use await inside functions defined with async def.

import asyncio
import time


async def say_after(delay: int, message: str) -> None:
"""Wait for delay seconds, then print message."""
await asyncio.sleep(delay)
print(message)


async def main():
print(f"Started at {time.strftime('%X')}")

# These run SEQUENTIALLY (one after another)
await say_after(1, "Hello")
await say_after(2, "World")

print(f"Finished at {time.strftime('%X')}")


if __name__ == "__main__":
asyncio.run(main())

Output:

Started at 21:23:59
Hello
World
Finished at 21:24:02

The total time is 3 seconds because each await completes before the next one starts.

Running Tasks Concurrently with asyncio.gather

The real power of async comes when you run multiple awaitables at the same time.

import asyncio
import time


async def brew_coffee() -> str:
print("Starting coffee...")
await asyncio.sleep(3)
return "Coffee ready"


async def toast_bread() -> str:
print("Starting toast...")
await asyncio.sleep(2)
return "Toast ready"


async def fry_eggs() -> str:
print("Starting eggs...")
await asyncio.sleep(2)
return "Eggs ready"


async def main():
start = time.perf_counter()

# Run all three AT THE SAME TIME
results = await asyncio.gather(
brew_coffee(),
toast_bread(),
fry_eggs()
)

elapsed = time.perf_counter() - start

print(f"\nResults: {results}")
print(f"Total time: {elapsed:.2f} seconds")


asyncio.run(main())

Output:

Starting coffee...
Starting toast...
Starting eggs...

Results: ['Coffee ready', 'Toast ready', 'Eggs ready']
Total time: 3.00 seconds
Time Savings

Running synchronously would take 3 + 2 + 2 = 7 seconds. With asyncio.gather, the total time equals the longest task (3 seconds), saving over 50% of the time.

Creating and Managing Tasks

For more control over concurrent execution, use asyncio.create_task():

import asyncio


async def fetch_data(source: str, delay: int) -> dict:
print(f"Fetching from {source}...")
await asyncio.sleep(delay)
return {"source": source, "data": f"Results from {source}"}


async def main():
# Create tasks (they start running immediately)
task1 = asyncio.create_task(fetch_data("API", 2))
task2 = asyncio.create_task(fetch_data("Database", 3))
task3 = asyncio.create_task(fetch_data("Cache", 1))

# Do other work while tasks run
print("Tasks are running in the background...")

# Await results when needed
result1 = await task1
result2 = await task2
result3 = await task3

print(f"Results: {result1}, {result2}, {result3}")


asyncio.run(main())

Output:

Tasks are running in the background...
Fetching from API...
Fetching from Database...
Fetching from Cache...
Results: {'source': 'API', 'data': 'Results from API'}, {'source': 'Database', 'data': 'Results from Database'}, {'source': 'Cache', 'data': 'Results from Cache'}

Handling Errors in Concurrent Tasks

When running multiple tasks, you need to handle potential failures:

import asyncio


async def risky_operation(name: str, should_fail: bool) -> str:
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} failed!")
return f"{name} succeeded"


async def main():
# With return_exceptions=True, exceptions become return values
results = await asyncio.gather(
risky_operation("Task A", should_fail=False),
risky_operation("Task B", should_fail=True),
risky_operation("Task C", should_fail=False),
return_exceptions=True
)

for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i}: Error - {result}")
else:
print(f"Task {i}: {result}")


asyncio.run(main())

Output:

Task 0: Task A succeeded
Task 1: Error - Task B failed!
Task 2: Task C succeeded

Common Pitfall: Blocking the Event Loop

A critical mistake is using standard "blocking" functions inside async code. This freezes the entire event loop.

import asyncio
import time


async def bad_example():
# INCORRECT: Blocks the entire event loop
time.sleep(5)
print("This freezes everything!")


async def good_example():
# CORRECT: Yields control to the event loop
await asyncio.sleep(5)
print("Other tasks can run during this wait!")
Blocking Calls

Functions without await in front of them (like time.sleep(), requests.get(), or file I/O) will block your entire application. Use async alternatives:

BlockingAsync Alternative
time.sleep()asyncio.sleep()
requestsaiohttp or httpx
File I/Oaiofiles
Database queriesasyncpg, aiomysql

Running Blocking Code Safely

When you must use blocking code, run it in a thread pool:

import asyncio
import time


def blocking_io_operation() -> str:
"""Simulates a blocking I/O operation."""
time.sleep(2)
return "Blocking operation complete"


async def main():
loop = asyncio.get_event_loop()

# Run blocking code in a thread pool
result = await loop.run_in_executor(
None, # Use default executor
blocking_io_operation
)

print(result)


asyncio.run(main())

Quick Reference

Keyword/FunctionUsagePurpose
async defasync def my_func():Defines a coroutine
awaitawait coroutine()Pause until coroutine completes
asyncio.run()asyncio.run(main())Entry point for async programs
asyncio.gather()await gather(a(), b())Run coroutines concurrently
asyncio.create_task()task = create_task(coro())Schedule coroutine to run soon
asyncio.sleep()await sleep(1)Non-blocking delay

Summary

  • Use async def to define coroutines that can be paused.
  • Use await to pause execution until an async operation completes.
  • Use asyncio.gather() to run multiple coroutines concurrently.
  • Never use blocking functions like time.sleep() or requests.get() in async code; use their async alternatives instead.
  • Run unavoidable blocking code in a thread pool using run_in_executor().