How to Use the await Keyword in Python
The await keyword is the foundation of Python's asynchronous programming. It tells Python: "Pause this function here, go do other work, and come back when this task is finished."
This mechanism allows a single thread to handle thousands of concurrent operations, such as web requests or database queries, without waiting for each one to complete individually.
Understanding the Mental Model
Think of a waiter in a restaurant:
Synchronous (No await): The waiter takes an order, walks to the kitchen, waits 20 minutes for the chef to cook it, brings it back, and only then takes the next customer's order. This is extremely inefficient.
Asynchronous (With await): The waiter takes an order, gives it to the kitchen, and immediately serves other tables while the food cooks. When the kitchen signals "Order Ready," the waiter returns to deliver it. This is much more efficient.
In Python terms:
- The Waiter = The Event Loop
- The Kitchen = I/O operations (network, database, disk)
await= "Go serve other tables until this is done"
Basic Usage
You can only use await inside functions defined with async def.
import asyncio
import time
async def say_after(delay: int, message: str) -> None:
"""Wait for delay seconds, then print message."""
await asyncio.sleep(delay)
print(message)
async def main():
print(f"Started at {time.strftime('%X')}")
# These run SEQUENTIALLY (one after another)
await say_after(1, "Hello")
await say_after(2, "World")
print(f"Finished at {time.strftime('%X')}")
if __name__ == "__main__":
asyncio.run(main())
Output:
Started at 21:23:59
Hello
World
Finished at 21:24:02
The total time is 3 seconds because each await completes before the next one starts.
Running Tasks Concurrently with asyncio.gather
The real power of async comes when you run multiple awaitables at the same time.
import asyncio
import time
async def brew_coffee() -> str:
print("Starting coffee...")
await asyncio.sleep(3)
return "Coffee ready"
async def toast_bread() -> str:
print("Starting toast...")
await asyncio.sleep(2)
return "Toast ready"
async def fry_eggs() -> str:
print("Starting eggs...")
await asyncio.sleep(2)
return "Eggs ready"
async def main():
start = time.perf_counter()
# Run all three AT THE SAME TIME
results = await asyncio.gather(
brew_coffee(),
toast_bread(),
fry_eggs()
)
elapsed = time.perf_counter() - start
print(f"\nResults: {results}")
print(f"Total time: {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Starting coffee...
Starting toast...
Starting eggs...
Results: ['Coffee ready', 'Toast ready', 'Eggs ready']
Total time: 3.00 seconds
Running synchronously would take 3 + 2 + 2 = 7 seconds. With asyncio.gather, the total time equals the longest task (3 seconds), saving over 50% of the time.
Creating and Managing Tasks
For more control over concurrent execution, use asyncio.create_task():
import asyncio
async def fetch_data(source: str, delay: int) -> dict:
print(f"Fetching from {source}...")
await asyncio.sleep(delay)
return {"source": source, "data": f"Results from {source}"}
async def main():
# Create tasks (they start running immediately)
task1 = asyncio.create_task(fetch_data("API", 2))
task2 = asyncio.create_task(fetch_data("Database", 3))
task3 = asyncio.create_task(fetch_data("Cache", 1))
# Do other work while tasks run
print("Tasks are running in the background...")
# Await results when needed
result1 = await task1
result2 = await task2
result3 = await task3
print(f"Results: {result1}, {result2}, {result3}")
asyncio.run(main())
Output:
Tasks are running in the background...
Fetching from API...
Fetching from Database...
Fetching from Cache...
Results: {'source': 'API', 'data': 'Results from API'}, {'source': 'Database', 'data': 'Results from Database'}, {'source': 'Cache', 'data': 'Results from Cache'}
Handling Errors in Concurrent Tasks
When running multiple tasks, you need to handle potential failures:
import asyncio
async def risky_operation(name: str, should_fail: bool) -> str:
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"{name} failed!")
return f"{name} succeeded"
async def main():
# With return_exceptions=True, exceptions become return values
results = await asyncio.gather(
risky_operation("Task A", should_fail=False),
risky_operation("Task B", should_fail=True),
risky_operation("Task C", should_fail=False),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i}: Error - {result}")
else:
print(f"Task {i}: {result}")
asyncio.run(main())
Output:
Task 0: Task A succeeded
Task 1: Error - Task B failed!
Task 2: Task C succeeded
Common Pitfall: Blocking the Event Loop
A critical mistake is using standard "blocking" functions inside async code. This freezes the entire event loop.
import asyncio
import time
async def bad_example():
# INCORRECT: Blocks the entire event loop
time.sleep(5)
print("This freezes everything!")
async def good_example():
# CORRECT: Yields control to the event loop
await asyncio.sleep(5)
print("Other tasks can run during this wait!")
Functions without await in front of them (like time.sleep(), requests.get(), or file I/O) will block your entire application. Use async alternatives:
| Blocking | Async Alternative |
|---|---|
time.sleep() | asyncio.sleep() |
requests | aiohttp or httpx |
| File I/O | aiofiles |
| Database queries | asyncpg, aiomysql |
Running Blocking Code Safely
When you must use blocking code, run it in a thread pool:
import asyncio
import time
def blocking_io_operation() -> str:
"""Simulates a blocking I/O operation."""
time.sleep(2)
return "Blocking operation complete"
async def main():
loop = asyncio.get_event_loop()
# Run blocking code in a thread pool
result = await loop.run_in_executor(
None, # Use default executor
blocking_io_operation
)
print(result)
asyncio.run(main())
Quick Reference
| Keyword/Function | Usage | Purpose |
|---|---|---|
async def | async def my_func(): | Defines a coroutine |
await | await coroutine() | Pause until coroutine completes |
asyncio.run() | asyncio.run(main()) | Entry point for async programs |
asyncio.gather() | await gather(a(), b()) | Run coroutines concurrently |
asyncio.create_task() | task = create_task(coro()) | Schedule coroutine to run soon |
asyncio.sleep() | await sleep(1) | Non-blocking delay |
Summary
- Use
async defto define coroutines that can be paused. - Use
awaitto pause execution until an async operation completes. - Use
asyncio.gather()to run multiple coroutines concurrently. - Never use blocking functions like
time.sleep()orrequests.get()in async code; use their async alternatives instead. - Run unavoidable blocking code in a thread pool using
run_in_executor().