Your async Python code should be fast, but it's crawling. You've added async and await everywhere, but the performance is worse than synchronous code. The event loop is blocked, and you're not sure why.
import asyncio
import time
async def fetch_data(user_id):
# This looks async but blocks everything
time.sleep(2) # Simulating database call
return f"Data for user {user_id}"
async def main():
start = time.time()
tasks = [fetch_data(i) for i in range(5)]
results = await asyncio.gather(*tasks)
print(f"Completed in {time.time() - start:.2f}s")
asyncio.run(main())
# Output on macOS terminal
$ python3 slow_async.py
Completed in 10.02s
This should take 2 seconds with proper async, but it takes 10 seconds. Each task runs sequentially because time.sleep()blocks the entire event loop.
Step 1: Understanding Event Loop Blocking
The event loop is single-threaded. When you use blocking calls like time.sleep(), requests.get(), or synchronous file operations, the entire loop freezes. No other coroutines can run.
import asyncio
import time
import requests
async def blocking_examples():
# All of these block the event loop
time.sleep(1) # Blocks
requests.get('http://api.com') # Blocks
open('file.txt').read() # Blocks
# These don't block
await asyncio.sleep(1) # Yields control
# Using aiohttp instead of requests
# Using aiofiles for file I/O
async def main():
print("Starting tasks...")
# Both tasks should run concurrently
await asyncio.gather(
blocking_examples(),
blocking_examples()
)
asyncio.run(main())
The async keyword doesn't magically make code non-blocking. You need to use async-compatible libraries and operations.
Step 2: Identifying Blocking Operations
Use asyncio.create_task() with timing to spot blocking calls.
import asyncio
import time
async def task_one():
print(f"Task 1 start: {time.time():.2f}")
time.sleep(2) # Blocking
print(f"Task 1 end: {time.time():.2f}")
async def task_two():
print(f"Task 2 start: {time.time():.2f}")
await asyncio.sleep(2) # Non-blocking
print(f"Task 2 end: {time.time():.2f}")
async def main():
start = time.time()
# Create tasks
t1 = asyncio.create_task(task_one())
t2 = asyncio.create_task(task_two())
await asyncio.gather(t1, t2)
print(f"Total: {time.time() - start:.2f}s")
asyncio.run(main())
# Output shows sequential execution
$ python3 identify_blocking.py
Task 1 start: 1234567890.00
Task 1 end: 1234567892.00
Task 2 start: 1234567892.00
Task 2 end: 1234567894.00
Total: 4.00s
Task 2 waits for Task 1 to complete because Task 1 blocks. If both were non-blocking, total time would be 2 seconds.
Step 3: Fix Blocking I/O with run_in_executor
When you must use blocking code, push it to a thread pool using run_in_executor().
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io_operation(user_id):
# Synchronous blocking operation
time.sleep(2)
return f"Data for user {user_id}"
async def fetch_data_fixed(user_id, executor):
loop = asyncio.get_event_loop()
# Run blocking function in thread pool
result = await loop.run_in_executor(
executor,
blocking_io_operation,
user_id
)
return result
async def main():
start = time.time()
# Create thread pool with 5 workers
executor = ThreadPoolExecutor(max_workers=5)
tasks = [fetch_data_fixed(i, executor) for i in range(5)]
results = await asyncio.gather(*tasks)
print(f"Completed in {time.time() - start:.2f}s")
print(f"Results: {results}")
executor.shutdown(wait=True)
asyncio.run(main())
# Output shows parallel execution
$ python3 fixed_async.py
Completed in 2.01s
Results: ['Data for user 0', 'Data for user 1', ...]
Now all tasks run concurrently. The blocking calls happen in separate threads, freeing the event loop.
Step 4: Replace Blocking Libraries
Replace synchronous libraries with async alternatives for better performance.
import asyncio
import aiohttp
import aiofiles
# Bad: Using requests (blocking)
import requests
async def fetch_with_requests():
# This blocks the event loop
response = requests.get('https://api.github.com')
return response.json()
# Good: Using aiohttp (non-blocking)
async def fetch_with_aiohttp():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com') as response:
return await response.json()
# Bad: Synchronous file reading
async def read_file_sync():
with open('large_file.txt', 'r') as f:
content = f.read() # Blocks
return content
# Good: Async file reading
async def read_file_async():
async with aiofiles.open('large_file.txt', 'r') as f:
content = await f.read() # Doesn't block
return content
async def main():
# All operations run concurrently
results = await asyncio.gather(
fetch_with_aiohttp(),
fetch_with_aiohttp(),
read_file_async()
)
print(f"Fetched {len(results)} results concurrently")
asyncio.run(main())
Install async libraries:
$ pip3 install aiohttp aiofiles
Step 5: Optimize Event Loop with uvloop
The default asyncio event loop is slower than it could be. uvloop is a drop-in replacement that's 2-4x faster.
import asyncio
import time
# Install: pip3 install uvloop
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
print("uvloop not available, using default loop")
async def benchmark_task():
await asyncio.sleep(0) # Yield control
async def benchmark():
start = time.time()
tasks = [benchmark_task() for _ in range(10000)]
await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"10,000 tasks completed in {elapsed:.4f}s")
asyncio.run(benchmark())
# Without uvloop
$ python3 benchmark.py
10,000 tasks completed in 0.8234s
# With uvloop
$ python3 benchmark.py
10,000 tasks completed in 0.2156s
On macOS and Linux, uvloop provides significant performance improvements. It's not available on Windows, but the code falls back gracefully.
Step 6: Profile Async Code
Use asyncio debug mode to find blocking operations.
import asyncio
import time
import warnings
# Enable debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
async def slow_coroutine():
await asyncio.sleep(0.01)
# This will trigger a warning
time.sleep(0.15) # Blocks for >100ms
await asyncio.sleep(0.01)
async def main():
await slow_coroutine()
# Run with debug mode
loop = asyncio.new_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
# Output shows blocking warnings
$ python3 profile_async.py
Executing <Task> took 0.151 seconds
/path/to/script.py:8: RuntimeWarning: coroutine 'slow_coroutine'
took too long (0.151s) to run
Debug mode warns when tasks block for more than 100ms. This helps identify performance bottlenecks.
Step 7: Handle CPU-Bound Operations
For CPU-intensive work, use ProcessPoolExecutor instead of threads.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib
def cpu_intensive_task(data):
# Simulate heavy computation
result = hashlib.pbkdf2_hmac('sha256', data.encode(), b'salt', 100000)
return result.hex()
async def process_data(items):
loop = asyncio.get_event_loop()
# Use process pool for CPU-bound work
with ProcessPoolExecutor(max_workers=4) as executor:
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, item)
for item in items
]
results = await asyncio.gather(*tasks)
return results
async def main():
import time
start = time.time()
items = [f"data_{i}" for i in range(20)]
results = await process_data(items)
print(f"Processed {len(results)} items in {time.time() - start:.2f}s")
asyncio.run(main())
Threads don't help with CPU-bound operations due to Python's GIL. Use processes to leverage multiple CPU cores.
Step 8: Avoid Common Async Pitfalls
import asyncio
# Bad: Forgetting await
async def mistake_one():
result = asyncio.sleep(1) # Returns coroutine object, doesn't wait
print("This prints immediately")
return result
# Good: Using await
async def correct_one():
await asyncio.sleep(1) # Actually waits
print("This prints after 1 second")
# Bad: Mixing sync and async iterators
async def mistake_two():
for item in [1, 2, 3]: # Sync iteration
await asyncio.sleep(0.1)
# Good: Using async comprehension
async def correct_two():
results = [await process(item) async for item in async_generator()]
# Bad: Not handling exceptions
async def mistake_three():
tasks = [risky_operation() for _ in range(10)]
await asyncio.gather(*tasks) # One failure cancels all
# Good: Using return_exceptions
async def correct_three():
tasks = [risky_operation() for _ in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
Additional Tips
Set appropriate timeouts to prevent hanging operations:
async def fetch_with_timeout():
try:
async with asyncio.timeout(5.0): # Python 3.11+
result = await slow_operation()
except TimeoutError:
print("Operation timed out")
result = None
return result
# For Python < 3.11
async def fetch_with_timeout_old():
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out")
result = None
return result
Limit concurrency to prevent resource exhaustion:
import asyncio
async def limited_concurrency(items, max_concurrent=10):
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
tasks = [process_with_limit(item) for item in items]
return await asyncio.gather(*tasks)
async def main():
items = range(100) # 100 items, but only 10 at a time
results = await limited_concurrency(items, max_concurrent=10)
Your async code is now properly non-blocking. The event loop runs freely, concurrent operations complete faster, and you've eliminated the performance bottlenecks that made your async code feel slow.