How to Spot and Resolve Memory Leaks in Python AsyncIO



Step 1: Understanding AsyncIO Memory Leaks


Memory leaks in asyncio applications occur when coroutines, tasks, or resources aren't properly cleaned up. These leaks gradually consume more memory until your application crashes or becomes unresponsive.


Here's a typical scenario that creates a memory leak:

import asyncio
import weakref

# Track all running tasks - this will show the leak
running_tasks = weakref.WeakSet()

async def leaky_background_task(data):
    """This task runs forever without proper cleanup"""
    while True:
        # Simulate work with the data
        await asyncio.sleep(1)
        result = data * 2  # Keep reference to data
        
async def create_leaky_tasks():
    """Creates tasks without proper cleanup"""
    for i in range(100):
        # Creating tasks without storing references
        task = asyncio.create_task(leaky_background_task(f"data_{i}"))
        running_tasks.add(task)
        
    # Main function exits, but tasks keep running
    await asyncio.sleep(2)
    print(f"Active tasks: {len(running_tasks)}")

# Run this and watch memory usage grow
asyncio.run(create_leaky_tasks())


Running this code shows tasks accumulating in memory. The program exits but leaves unfinished tasks, causing warnings:

$ python leak_example.py
Active tasks: 100
sys:1: RuntimeWarning: coroutine 'leaky_background_task' was never awaited


Step 2: Identifying Common Memory Leak Patterns


AsyncIO memory leaks typically follow these patterns. Let's examine each with detection code:


Pattern 1: Uncancelled Background Tasks

import asyncio
import tracemalloc
import gc

# Start memory tracking
tracemalloc.start()

async def monitor_memory():
    """Helper to track memory usage"""
    while True:
        current, peak = tracemalloc.get_traced_memory()
        print(f"Current memory: {current / 1024 / 1024:.2f} MB")
        await asyncio.sleep(2)

async def leaky_worker(queue):
    """Worker that doesn't clean up properly"""
    local_cache = []  # This grows indefinitely
    
    while True:
        try:
            item = await queue.get()
            local_cache.append(item)  # Never cleared
            # Process item...
            await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            # Should clean up here but doesn't
            raise

async def main_with_leak():
    """Demonstrates task leak pattern"""
    queue = asyncio.Queue()
    
    # Start memory monitor
    monitor_task = asyncio.create_task(monitor_memory())
    
    # Create workers without proper tracking
    workers = []
    for i in range(10):
        task = asyncio.create_task(leaky_worker(queue))
        workers.append(task)
    
    # Add items to queue
    for i in range(100):
        await queue.put(f"item_{i}")
    
    # Wait a bit then exit without cleanup
    await asyncio.sleep(5)
    
    # This causes memory leak - tasks aren't cancelled
    monitor_task.cancel()

# This will show growing memory usage
asyncio.run(main_with_leak())


Pattern 2: Event Loop Reference Cycles

import asyncio
import sys

class LeakyConnection:
    """Simulates a connection that creates reference cycles"""
    
    def __init__(self, loop):
        self.loop = loop
        self.callbacks = []
        self.data_buffer = []
        
        # Creating circular reference
        self.handler = lambda: self.process_data()
        self.loop.call_soon(self.handler)
        
    def process_data(self):
        """Keeps adding to buffer without cleanup"""
        self.data_buffer.append("data" * 1000)
        # Re-schedule itself, creating more references
        self.loop.call_later(0.1, self.handler)
        
    async def close(self):
        """Cleanup method that's often forgotten"""
        self.callbacks.clear()
        self.data_buffer.clear()
        self.handler = None

async def create_leak_via_references():
    """Shows how reference cycles cause leaks"""
    loop = asyncio.get_running_loop()
    
    connections = []
    for i in range(50):
        conn = LeakyConnection(loop)
        connections.append(conn)
    
    await asyncio.sleep(3)
    
    # Forgot to call close() on connections
    # References remain in memory
    print(f"Created {len(connections)} leaky connections")
    print(f"Reference count: {sys.getrefcount(connections[0])}")

asyncio.run(create_leak_via_references())


Pattern 3: Unfinished Async Generators

import asyncio

async def leaky_generator():
    """Generator that holds resources"""
    large_data = [i for i in range(1000000)]  # 1M integers
    
    try:
        for item in large_data:
            yield item
            await asyncio.sleep(0.001)
    finally:
        # Cleanup code here, but might not be reached
        print("Generator cleanup called")
        
async def consume_partially():
    """Only consumes part of generator"""
    gen = leaky_generator()
    
    # Only take first 10 items
    count = 0
    async for item in gen:
        count += 1
        if count >= 10:
            break  # Generator not fully consumed
    
    # Generator still holds reference to large_data
    # No explicit cleanup
    print(f"Consumed {count} items")

# Memory isn't freed properly
asyncio.run(consume_partially())


Step 3: Implementing Detection Tools


Before fixing leaks, you need to detect them. Here's a comprehensive detection setup:

import asyncio
import gc
import tracemalloc
import weakref
from typing import Dict, Set
import psutil
import os

class MemoryLeakDetector:
    """Comprehensive memory leak detection for asyncio"""
    
    def __init__(self):
        self.baseline_memory = 0
        self.task_refs = weakref.WeakSet()
        self.snapshots = []
        tracemalloc.start()
        
    def take_snapshot(self, label: str):
        """Capture memory snapshot with label"""
        gc.collect()  # Force garbage collection
        
        snapshot = tracemalloc.take_snapshot()
        self.snapshots.append((label, snapshot))
        
        # Also track process memory
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        
        return {
            'label': label,
            'rss': mem_info.rss / 1024 / 1024,  # MB
            'vms': mem_info.vms / 1024 / 1024,  # MB
            'snapshot': snapshot
        }
    
    def compare_snapshots(self, earlier_label: str, later_label: str):
        """Compare two snapshots to find leaks"""
        earlier = next((s for l, s in self.snapshots if l == earlier_label), None)
        later = next((s for l, s in self.snapshots if l == later_label), None)
        
        if not earlier or not later:
            return None
            
        top_stats = later.compare_to(earlier, 'lineno')
        
        print(f"\n=== Memory changes from '{earlier_label}' to '{later_label}' ===")
        for stat in top_stats[:10]:
            print(stat)
        
        return top_stats
    
    def track_task(self, task):
        """Track a task for leak detection"""
        self.task_refs.add(task)
        
    def get_active_tasks(self):
        """Get currently active tasks"""
        return [task for task in self.task_refs if not task.done()]
    
    async def monitor_loop(self, interval: float = 1.0):
        """Continuously monitor for leaks"""
        while True:
            active_tasks = self.get_active_tasks()
            
            # Check for suspicious patterns
            if len(active_tasks) > 100:
                print(f"WARNING: {len(active_tasks)} active tasks!")
                
            # Monitor memory growth
            process = psutil.Process(os.getpid())
            current_memory = process.memory_info().rss / 1024 / 1024
            
            if self.baseline_memory == 0:
                self.baseline_memory = current_memory
            elif current_memory > self.baseline_memory * 1.5:
                print(f"WARNING: Memory grown by 50%! Current: {current_memory:.2f} MB")
                
            await asyncio.sleep(interval)

# Example usage of detector
async def test_with_detector():
    """Demonstrate leak detection"""
    detector = MemoryLeakDetector()
    
    # Start monitoring
    monitor = asyncio.create_task(detector.monitor_loop())
    
    # Take initial snapshot
    detector.take_snapshot("start")
    
    # Create potential leak
    tasks = []
    for i in range(50):
        task = asyncio.create_task(asyncio.sleep(100))
        detector.track_task(task)
        tasks.append(task)
    
    await asyncio.sleep(2)
    detector.take_snapshot("after_creating_tasks")
    
    # Cancel some tasks
    for task in tasks[:25]:
        task.cancel()
    
    await asyncio.sleep(1)
    detector.take_snapshot("after_partial_cleanup")
    
    # Compare snapshots
    detector.compare_snapshots("start", "after_creating_tasks")
    detector.compare_snapshots("after_creating_tasks", "after_partial_cleanup")
    
    # Cleanup
    monitor.cancel()
    for task in tasks:
        task.cancel()
    
    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        pass

asyncio.run(test_with_detector())


Step 4: Fixing Memory Leaks - Complete Solutions


Here are proven solutions for each leak pattern:


Solution 1: Proper Task Management

import asyncio
from contextlib import asynccontextmanager
from typing import Set

class TaskManager:
    """Manages async tasks with proper cleanup"""
    
    def __init__(self):
        self.tasks: Set[asyncio.Task] = set()
        
    def create_task(self, coro):
        """Create and track a task"""
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        
        # Remove task when done
        task.add_done_callback(self.tasks.discard)
        return task
    
    async def cancel_all(self):
        """Cancel all tracked tasks properly"""
        # Cancel all tasks
        for task in self.tasks:
            task.cancel()
        
        # Wait for cancellation to complete
        results = await asyncio.gather(*self.tasks, return_exceptions=True)
        
        # Process cancellation results
        for task, result in zip(self.tasks, results):
            if isinstance(result, asyncio.CancelledError):
                pass  # Expected
            elif isinstance(result, Exception):
                print(f"Task failed with: {result}")
        
        self.tasks.clear()
        
    @asynccontextmanager
    async def task_scope(self):
        """Context manager for automatic cleanup"""
        try:
            yield self
        finally:
            await self.cancel_all()

# Fixed version with proper cleanup
async def fixed_background_tasks():
    """Demonstrates proper task management"""
    
    async def worker(worker_id: int, queue: asyncio.Queue):
        """Worker with proper cleanup"""
        local_data = []
        
        try:
            while True:
                try:
                    # Use timeout to avoid infinite wait
                    item = await asyncio.wait_for(
                        queue.get(), 
                        timeout=1.0
                    )
                    local_data.append(item)
                    
                    # Process and clear periodically
                    if len(local_data) >= 10:
                        # Process batch
                        await process_batch(local_data)
                        local_data.clear()  # Clear to prevent growth
                        
                except asyncio.TimeoutError:
                    continue
                    
        except asyncio.CancelledError:
            # Proper cleanup on cancellation
            if local_data:
                await process_batch(local_data)
            print(f"Worker {worker_id} cleaned up")
            raise

async def process_batch(items):
    """Process a batch of items"""
    await asyncio.sleep(0.1)  # Simulate processing
    print(f"Processed {len(items)} items")

async def main_fixed():
    """Main function with proper cleanup"""
    queue = asyncio.Queue()
    
    async with TaskManager().task_scope() as manager:
        # Create workers
        for i in range(5):
            manager.create_task(worker(i, queue))
        
        # Add work items
        for i in range(50):
            await queue.put(f"item_{i}")
        
        # Wait for queue to be processed
        await queue.join()
        
        # Tasks automatically cleaned up when exiting context

asyncio.run(main_fixed())


Solution 2: Breaking Reference Cycles

import asyncio
import weakref

class FixedConnection:
    """Connection with proper cleanup and weak references"""
    
    def __init__(self):
        self._loop = asyncio.get_running_loop()
        self._callbacks = []
        self._data_buffer = []
        self._handle = None
        self._closed = False
        
        # Use weak reference to avoid cycle
        weak_self = weakref.ref(self)
        
        def callback():
            self_ref = weak_self()
            if self_ref and not self_ref._closed:
                self_ref.process_data()
        
        self._callback = callback
        self._schedule_next()
        
    def _schedule_next(self):
        """Schedule next processing"""
        if not self._closed:
            self._handle = self._loop.call_later(0.1, self._callback)
    
    def process_data(self):
        """Process data with size limits"""
        if self._closed:
            return
            
        # Add data with size limit
        if len(self._data_buffer) < 100:
            self._data_buffer.append("data")
        else:
            # Clear old data to prevent unbounded growth
            self._data_buffer = self._data_buffer[-50:]
        
        self._schedule_next()
    
    async def close(self):
        """Proper cleanup"""
        self._closed = True
        
        # Cancel pending callback
        if self._handle:
            self._handle.cancel()
            
        # Clear all references
        self._callbacks.clear()
        self._data_buffer.clear()
        self._callback = None
        self._handle = None

async def connection_pool_example():
    """Example of proper connection management"""
    
    class ConnectionPool:
        def __init__(self, max_connections: int = 10):
            self.connections = []
            self.max_connections = max_connections
            
        async def acquire(self):
            """Get a connection from pool"""
            if len(self.connections) < self.max_connections:
                conn = FixedConnection()
                self.connections.append(conn)
                return conn
            
            # Reuse existing connection
            return self.connections[0]
        
        async def close_all(self):
            """Close all connections properly"""
            close_tasks = [conn.close() for conn in self.connections]
            await asyncio.gather(*close_tasks)
            self.connections.clear()
    
    # Use the pool
    pool = ConnectionPool()
    
    try:
        # Acquire connections
        connections = []
        for i in range(5):
            conn = await pool.acquire()
            connections.append(conn)
        
        # Use connections
        await asyncio.sleep(2)
        
    finally:
        # Always cleanup
        await pool.close_all()
        print("All connections closed properly")

asyncio.run(connection_pool_example())


Solution 3: Async Generator Cleanup

import asyncio
from typing import AsyncGenerator

class AsyncGeneratorManager:
    """Manages async generators with guaranteed cleanup"""
    
    def __init__(self):
        self.generators = []
        
    async def safe_generator(self, data_size: int) -> AsyncGenerator:
        """Generator with automatic cleanup"""
        large_data = list(range(data_size))
        
        try:
            for item in large_data:
                yield item
                await asyncio.sleep(0.001)
        finally:
            # Always executes
            large_data.clear()
            print(f"Generator cleaned up {data_size} items")
    
    @asynccontextmanager
    async def managed_generator(self, data_size: int):
        """Context-managed generator"""
        gen = self.safe_generator(data_size)
        self.generators.append(gen)
        
        try:
            yield gen
        finally:
            # Ensure generator is closed
            await gen.aclose()
            self.generators.remove(gen)

async def fixed_generator_usage():
    """Proper async generator usage"""
    manager = AsyncGeneratorManager()
    
    # Method 1: Using async with for automatic cleanup
    async with manager.managed_generator(1000) as gen:
        count = 0
        async for item in gen:
            count += 1
            if count >= 10:
                break  # Generator still cleaned up properly
        print(f"Processed {count} items with automatic cleanup")
    
    # Method 2: Manual cleanup with try/finally
    gen = manager.safe_generator(1000)
    try:
        count = 0
        async for item in gen:
            count += 1
            if count >= 10:
                break
    finally:
        await gen.aclose()  # Explicit cleanup
        print(f"Manually cleaned up after {count} items")
    
    # Method 3: Using asyncio.timeout for bounded iteration
    gen = manager.safe_generator(100000)
    try:
        async with asyncio.timeout(1.0):  # Max 1 second
            async for item in gen:
                # Process items for limited time
                pass
    except asyncio.TimeoutError:
        print("Timed out - cleaning up")
    finally:
        await gen.aclose()

asyncio.run(fixed_generator_usage())


Step 5: Advanced Leak Prevention Patterns


Implement these patterns to prevent leaks from occurring:

import asyncio
from functools import wraps
import signal
import sys

def timeout_task(seconds: float):
    """Decorator to add timeout to async functions"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs), 
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                print(f"{func.__name__} timed out after {seconds}s")
                raise
        return wrapper
    return decorator

class GracefulShutdown:
    """Handle shutdown gracefully to prevent leaks"""
    
    def __init__(self):
        self.shutdown_event = asyncio.Event()
        self.tasks = set()
        
    def setup_handlers(self):
        """Setup signal handlers for graceful shutdown"""
        loop = asyncio.get_running_loop()
        
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig, 
                lambda: asyncio.create_task(self.shutdown())
            )
    
    async def shutdown(self):
        """Graceful shutdown procedure"""
        print("Shutting down gracefully...")
        self.shutdown_event.set()
        
        # Cancel all tracked tasks
        for task in self.tasks:
            task.cancel()
        
        # Wait for all tasks to complete
        await asyncio.gather(*self.tasks, return_exceptions=True)
        
        # Stop the event loop
        asyncio.get_running_loop().stop()
    
    def track_task(self, coro):
        """Create and track a task"""
        task = asyncio.create_task(coro)
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)
        return task

# Complete application with leak prevention
class LeakFreeApplication:
    """Application designed to prevent memory leaks"""
    
    def __init__(self):
        self.shutdown_handler = GracefulShutdown()
        self.resources = []
        
    @timeout_task(30.0)  # 30 second timeout
    async def long_running_task(self, task_id: int):
        """Task with built-in timeout"""
        try:
            while not self.shutdown_handler.shutdown_event.is_set():
                await asyncio.sleep(1)
                # Do work here
                
        except asyncio.CancelledError:
            print(f"Task {task_id} cancelled - cleaning up")
            raise
    
    async def resource_manager(self):
        """Manage resources with proper cleanup"""
        resource = await self.acquire_resource()
        self.resources.append(resource)
        
        try:
            # Use resource
            await self.use_resource(resource)
        finally:
            # Always release
            await self.release_resource(resource)
            self.resources.remove(resource)
    
    async def acquire_resource(self):
        """Acquire a resource (file, connection, etc)"""
        return {"id": len(self.resources), "data": []}
    
    async def use_resource(self, resource):
        """Use the resource"""
        resource["data"].append("used")
        await asyncio.sleep(0.5)
    
    async def release_resource(self, resource):
        """Release the resource"""
        resource["data"].clear()
        print(f"Resource {resource['id']} released")
    
    async def run(self):
        """Main application loop"""
        self.shutdown_handler.setup_handlers()
        
        # Start tasks
        for i in range(5):
            self.shutdown_handler.track_task(
                self.long_running_task(i)
            )
        
        # Start resource managers
        for i in range(3):
            self.shutdown_handler.track_task(
                self.resource_manager()
            )
        
        # Wait for shutdown signal
        await self.shutdown_handler.shutdown_event.wait()
        
        # Cleanup all resources
        for resource in self.resources:
            await self.release_resource(resource)

async def main():
    """Run the leak-free application"""
    app = LeakFreeApplication()
    
    try:
        await app.run()
    except KeyboardInterrupt:
        print("Interrupted - cleaning up")
    finally:
        # Final cleanup
        print("Application shut down cleanly")

# Run with proper cleanup
if __name__ == "__main__":
    asyncio.run(main())


Additional Tips and Common Pitfalls


Watch out for these subtle leak sources that often go unnoticed:

# Pitfall 1: Global task storage
global_tasks = []  # This grows indefinitely

async def bad_pattern():
    task = asyncio.create_task(some_work())
    global_tasks.append(task)  # Never cleaned

# Fix: Use weak references or cleanup regularly
import weakref
global_tasks_weak = weakref.WeakSet()

async def good_pattern():
    task = asyncio.create_task(some_work())
    global_tasks_weak.add(task)  # Automatically removed when done
# Pitfall 2: Event listeners not removed
class EventEmitter:
    def __init__(self):
        self.listeners = {}
    
    def on(self, event, callback):
        self.listeners.setdefault(event, []).append(callback)
        # Return removal function
        return lambda: self.listeners[event].remove(callback)

# Always store and call the removal function
emitter = EventEmitter()
remove_listener = emitter.on("data", handle_data)
# Later: remove_listener() to prevent leak
# Pitfall 3: Infinite queues without bounds
queue = asyncio.Queue()  # Unbounded - dangerous

# Fix: Use maxsize
queue = asyncio.Queue(maxsize=1000)  # Bounded queue


Memory leak prevention in asyncio requires discipline. Always ensure tasks are tracked and cancelled, resources are released in finally blocks, and generators are properly closed. Use the detection tools shown here to catch leaks early in development rather than discovering them in production.


How to Fix Unit Test Failures in Python: Mastering pytest Mocking and Fixtures