How to Fix Slow Python Async Performance: Event Loop Blocking and I/O Issues


Your async Python code should be fast, but it's crawling. You've added async and await everywhere, but the performance is worse than synchronous code. The event loop is blocked, and you're not sure why.


import asyncio
import time

async def fetch_data(user_id):
    # This looks async but blocks everything
    time.sleep(2)  # Simulating database call
    return f"Data for user {user_id}"

async def main():
    start = time.time()
    tasks = [fetch_data(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    print(f"Completed in {time.time() - start:.2f}s")

asyncio.run(main())
# Output on macOS terminal
$ python3 slow_async.py
Completed in 10.02s


This should take 2 seconds with proper async, but it takes 10 seconds. Each task runs sequentially because time.sleep()blocks the entire event loop.


Step 1: Understanding Event Loop Blocking

The event loop is single-threaded. When you use blocking calls like time.sleep(), requests.get(), or synchronous file operations, the entire loop freezes. No other coroutines can run.

import asyncio
import time
import requests

async def blocking_examples():
    # All of these block the event loop
    time.sleep(1)                    # Blocks
    requests.get('http://api.com')   # Blocks
    open('file.txt').read()          # Blocks
    
    # These don't block
    await asyncio.sleep(1)           # Yields control
    # Using aiohttp instead of requests
    # Using aiofiles for file I/O

async def main():
    print("Starting tasks...")
    # Both tasks should run concurrently
    await asyncio.gather(
        blocking_examples(),
        blocking_examples()
    )

asyncio.run(main())


The async keyword doesn't magically make code non-blocking. You need to use async-compatible libraries and operations.


Step 2: Identifying Blocking Operations

Use asyncio.create_task() with timing to spot blocking calls.

import asyncio
import time

async def task_one():
    print(f"Task 1 start: {time.time():.2f}")
    time.sleep(2)  # Blocking
    print(f"Task 1 end: {time.time():.2f}")

async def task_two():
    print(f"Task 2 start: {time.time():.2f}")
    await asyncio.sleep(2)  # Non-blocking
    print(f"Task 2 end: {time.time():.2f}")

async def main():
    start = time.time()
    
    # Create tasks
    t1 = asyncio.create_task(task_one())
    t2 = asyncio.create_task(task_two())
    
    await asyncio.gather(t1, t2)
    print(f"Total: {time.time() - start:.2f}s")

asyncio.run(main())
# Output shows sequential execution
$ python3 identify_blocking.py
Task 1 start: 1234567890.00
Task 1 end: 1234567892.00
Task 2 start: 1234567892.00
Task 2 end: 1234567894.00
Total: 4.00s


Task 2 waits for Task 1 to complete because Task 1 blocks. If both were non-blocking, total time would be 2 seconds.


Step 3: Fix Blocking I/O with run_in_executor

When you must use blocking code, push it to a thread pool using run_in_executor().

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io_operation(user_id):
    # Synchronous blocking operation
    time.sleep(2)
    return f"Data for user {user_id}"

async def fetch_data_fixed(user_id, executor):
    loop = asyncio.get_event_loop()
    # Run blocking function in thread pool
    result = await loop.run_in_executor(
        executor,
        blocking_io_operation,
        user_id
    )
    return result

async def main():
    start = time.time()
    
    # Create thread pool with 5 workers
    executor = ThreadPoolExecutor(max_workers=5)
    
    tasks = [fetch_data_fixed(i, executor) for i in range(5)]
    results = await asyncio.gather(*tasks)
    
    print(f"Completed in {time.time() - start:.2f}s")
    print(f"Results: {results}")
    
    executor.shutdown(wait=True)

asyncio.run(main())
# Output shows parallel execution
$ python3 fixed_async.py
Completed in 2.01s
Results: ['Data for user 0', 'Data for user 1', ...]


Now all tasks run concurrently. The blocking calls happen in separate threads, freeing the event loop.


Step 4: Replace Blocking Libraries

Replace synchronous libraries with async alternatives for better performance.

import asyncio
import aiohttp
import aiofiles

# Bad: Using requests (blocking)
import requests

async def fetch_with_requests():
    # This blocks the event loop
    response = requests.get('https://api.github.com')
    return response.json()

# Good: Using aiohttp (non-blocking)
async def fetch_with_aiohttp():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.github.com') as response:
            return await response.json()

# Bad: Synchronous file reading
async def read_file_sync():
    with open('large_file.txt', 'r') as f:
        content = f.read()  # Blocks
    return content

# Good: Async file reading
async def read_file_async():
    async with aiofiles.open('large_file.txt', 'r') as f:
        content = await f.read()  # Doesn't block
    return content

async def main():
    # All operations run concurrently
    results = await asyncio.gather(
        fetch_with_aiohttp(),
        fetch_with_aiohttp(),
        read_file_async()
    )
    print(f"Fetched {len(results)} results concurrently")

asyncio.run(main())


Install async libraries:

$ pip3 install aiohttp aiofiles


Step 5: Optimize Event Loop with uvloop

The default asyncio event loop is slower than it could be. uvloop is a drop-in replacement that's 2-4x faster.

import asyncio
import time

# Install: pip3 install uvloop
try:
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
    print("uvloop not available, using default loop")

async def benchmark_task():
    await asyncio.sleep(0)  # Yield control

async def benchmark():
    start = time.time()
    tasks = [benchmark_task() for _ in range(10000)]
    await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print(f"10,000 tasks completed in {elapsed:.4f}s")

asyncio.run(benchmark())
# Without uvloop
$ python3 benchmark.py
10,000 tasks completed in 0.8234s

# With uvloop
$ python3 benchmark.py
10,000 tasks completed in 0.2156s


On macOS and Linux, uvloop provides significant performance improvements. It's not available on Windows, but the code falls back gracefully.


Step 6: Profile Async Code

Use asyncio debug mode to find blocking operations.

import asyncio
import time
import warnings

# Enable debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

async def slow_coroutine():
    await asyncio.sleep(0.01)
    # This will trigger a warning
    time.sleep(0.15)  # Blocks for >100ms
    await asyncio.sleep(0.01)

async def main():
    await slow_coroutine()

# Run with debug mode
loop = asyncio.new_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
# Output shows blocking warnings
$ python3 profile_async.py
Executing <Task> took 0.151 seconds
/path/to/script.py:8: RuntimeWarning: coroutine 'slow_coroutine' 
took too long (0.151s) to run


Debug mode warns when tasks block for more than 100ms. This helps identify performance bottlenecks.


Step 7: Handle CPU-Bound Operations

For CPU-intensive work, use ProcessPoolExecutor instead of threads.

import asyncio
from concurrent.futures import ProcessPoolExecutor
import hashlib

def cpu_intensive_task(data):
    # Simulate heavy computation
    result = hashlib.pbkdf2_hmac('sha256', data.encode(), b'salt', 100000)
    return result.hex()

async def process_data(items):
    loop = asyncio.get_event_loop()
    
    # Use process pool for CPU-bound work
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, cpu_intensive_task, item)
            for item in items
        ]
        results = await asyncio.gather(*tasks)
    
    return results

async def main():
    import time
    start = time.time()
    
    items = [f"data_{i}" for i in range(20)]
    results = await process_data(items)
    
    print(f"Processed {len(results)} items in {time.time() - start:.2f}s")

asyncio.run(main())


Threads don't help with CPU-bound operations due to Python's GIL. Use processes to leverage multiple CPU cores.


Step 8: Avoid Common Async Pitfalls

import asyncio

# Bad: Forgetting await
async def mistake_one():
    result = asyncio.sleep(1)  # Returns coroutine object, doesn't wait
    print("This prints immediately")
    return result

# Good: Using await
async def correct_one():
    await asyncio.sleep(1)  # Actually waits
    print("This prints after 1 second")

# Bad: Mixing sync and async iterators
async def mistake_two():
    for item in [1, 2, 3]:  # Sync iteration
        await asyncio.sleep(0.1)

# Good: Using async comprehension
async def correct_two():
    results = [await process(item) async for item in async_generator()]

# Bad: Not handling exceptions
async def mistake_three():
    tasks = [risky_operation() for _ in range(10)]
    await asyncio.gather(*tasks)  # One failure cancels all

# Good: Using return_exceptions
async def correct_three():
    tasks = [risky_operation() for _ in range(10)]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")


Additional Tips

Set appropriate timeouts to prevent hanging operations:

async def fetch_with_timeout():
    try:
        async with asyncio.timeout(5.0):  # Python 3.11+
            result = await slow_operation()
    except TimeoutError:
        print("Operation timed out")
        result = None
    
    return result

# For Python < 3.11
async def fetch_with_timeout_old():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
    except asyncio.TimeoutError:
        print("Operation timed out")
        result = None
    
    return result


Limit concurrency to prevent resource exhaustion:

import asyncio

async def limited_concurrency(items, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)
    
    tasks = [process_with_limit(item) for item in items]
    return await asyncio.gather(*tasks)

async def main():
    items = range(100)  # 100 items, but only 10 at a time
    results = await limited_concurrency(items, max_concurrent=10)


Your async code is now properly non-blocking. The event loop runs freely, concurrent operations complete faster, and you've eliminated the performance bottlenecks that made your async code feel slow.


How to Fix Python Bugs Caused by Shallow Copy vs Deep Copy Confusion