Step 1: Understanding AsyncIO Memory Leaks
Memory leaks in asyncio applications occur when coroutines, tasks, or resources aren't properly cleaned up. These leaks gradually consume more memory until your application crashes or becomes unresponsive.
Here's a typical scenario that creates a memory leak:
import asyncio
import weakref
# Track all running tasks - this will show the leak
running_tasks = weakref.WeakSet()
async def leaky_background_task(data):
"""This task runs forever without proper cleanup"""
while True:
# Simulate work with the data
await asyncio.sleep(1)
result = data * 2 # Keep reference to data
async def create_leaky_tasks():
"""Creates tasks without proper cleanup"""
for i in range(100):
# Creating tasks without storing references
task = asyncio.create_task(leaky_background_task(f"data_{i}"))
running_tasks.add(task)
# Main function exits, but tasks keep running
await asyncio.sleep(2)
print(f"Active tasks: {len(running_tasks)}")
# Run this and watch memory usage grow
asyncio.run(create_leaky_tasks())
Running this code shows tasks accumulating in memory. The program exits but leaves unfinished tasks, causing warnings:
$ python leak_example.py
Active tasks: 100
sys:1: RuntimeWarning: coroutine 'leaky_background_task' was never awaited
Step 2: Identifying Common Memory Leak Patterns
AsyncIO memory leaks typically follow these patterns. Let's examine each with detection code:
Pattern 1: Uncancelled Background Tasks
import asyncio
import tracemalloc
import gc
# Start memory tracking
tracemalloc.start()
async def monitor_memory():
"""Helper to track memory usage"""
while True:
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory: {current / 1024 / 1024:.2f} MB")
await asyncio.sleep(2)
async def leaky_worker(queue):
"""Worker that doesn't clean up properly"""
local_cache = [] # This grows indefinitely
while True:
try:
item = await queue.get()
local_cache.append(item) # Never cleared
# Process item...
await asyncio.sleep(0.1)
except asyncio.CancelledError:
# Should clean up here but doesn't
raise
async def main_with_leak():
"""Demonstrates task leak pattern"""
queue = asyncio.Queue()
# Start memory monitor
monitor_task = asyncio.create_task(monitor_memory())
# Create workers without proper tracking
workers = []
for i in range(10):
task = asyncio.create_task(leaky_worker(queue))
workers.append(task)
# Add items to queue
for i in range(100):
await queue.put(f"item_{i}")
# Wait a bit then exit without cleanup
await asyncio.sleep(5)
# This causes memory leak - tasks aren't cancelled
monitor_task.cancel()
# This will show growing memory usage
asyncio.run(main_with_leak())
Pattern 2: Event Loop Reference Cycles
import asyncio
import sys
class LeakyConnection:
"""Simulates a connection that creates reference cycles"""
def __init__(self, loop):
self.loop = loop
self.callbacks = []
self.data_buffer = []
# Creating circular reference
self.handler = lambda: self.process_data()
self.loop.call_soon(self.handler)
def process_data(self):
"""Keeps adding to buffer without cleanup"""
self.data_buffer.append("data" * 1000)
# Re-schedule itself, creating more references
self.loop.call_later(0.1, self.handler)
async def close(self):
"""Cleanup method that's often forgotten"""
self.callbacks.clear()
self.data_buffer.clear()
self.handler = None
async def create_leak_via_references():
"""Shows how reference cycles cause leaks"""
loop = asyncio.get_running_loop()
connections = []
for i in range(50):
conn = LeakyConnection(loop)
connections.append(conn)
await asyncio.sleep(3)
# Forgot to call close() on connections
# References remain in memory
print(f"Created {len(connections)} leaky connections")
print(f"Reference count: {sys.getrefcount(connections[0])}")
asyncio.run(create_leak_via_references())
Pattern 3: Unfinished Async Generators
import asyncio
async def leaky_generator():
"""Generator that holds resources"""
large_data = [i for i in range(1000000)] # 1M integers
try:
for item in large_data:
yield item
await asyncio.sleep(0.001)
finally:
# Cleanup code here, but might not be reached
print("Generator cleanup called")
async def consume_partially():
"""Only consumes part of generator"""
gen = leaky_generator()
# Only take first 10 items
count = 0
async for item in gen:
count += 1
if count >= 10:
break # Generator not fully consumed
# Generator still holds reference to large_data
# No explicit cleanup
print(f"Consumed {count} items")
# Memory isn't freed properly
asyncio.run(consume_partially())
Step 3: Implementing Detection Tools
Before fixing leaks, you need to detect them. Here's a comprehensive detection setup:
import asyncio
import gc
import tracemalloc
import weakref
from typing import Dict, Set
import psutil
import os
class MemoryLeakDetector:
"""Comprehensive memory leak detection for asyncio"""
def __init__(self):
self.baseline_memory = 0
self.task_refs = weakref.WeakSet()
self.snapshots = []
tracemalloc.start()
def take_snapshot(self, label: str):
"""Capture memory snapshot with label"""
gc.collect() # Force garbage collection
snapshot = tracemalloc.take_snapshot()
self.snapshots.append((label, snapshot))
# Also track process memory
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return {
'label': label,
'rss': mem_info.rss / 1024 / 1024, # MB
'vms': mem_info.vms / 1024 / 1024, # MB
'snapshot': snapshot
}
def compare_snapshots(self, earlier_label: str, later_label: str):
"""Compare two snapshots to find leaks"""
earlier = next((s for l, s in self.snapshots if l == earlier_label), None)
later = next((s for l, s in self.snapshots if l == later_label), None)
if not earlier or not later:
return None
top_stats = later.compare_to(earlier, 'lineno')
print(f"\n=== Memory changes from '{earlier_label}' to '{later_label}' ===")
for stat in top_stats[:10]:
print(stat)
return top_stats
def track_task(self, task):
"""Track a task for leak detection"""
self.task_refs.add(task)
def get_active_tasks(self):
"""Get currently active tasks"""
return [task for task in self.task_refs if not task.done()]
async def monitor_loop(self, interval: float = 1.0):
"""Continuously monitor for leaks"""
while True:
active_tasks = self.get_active_tasks()
# Check for suspicious patterns
if len(active_tasks) > 100:
print(f"WARNING: {len(active_tasks)} active tasks!")
# Monitor memory growth
process = psutil.Process(os.getpid())
current_memory = process.memory_info().rss / 1024 / 1024
if self.baseline_memory == 0:
self.baseline_memory = current_memory
elif current_memory > self.baseline_memory * 1.5:
print(f"WARNING: Memory grown by 50%! Current: {current_memory:.2f} MB")
await asyncio.sleep(interval)
# Example usage of detector
async def test_with_detector():
"""Demonstrate leak detection"""
detector = MemoryLeakDetector()
# Start monitoring
monitor = asyncio.create_task(detector.monitor_loop())
# Take initial snapshot
detector.take_snapshot("start")
# Create potential leak
tasks = []
for i in range(50):
task = asyncio.create_task(asyncio.sleep(100))
detector.track_task(task)
tasks.append(task)
await asyncio.sleep(2)
detector.take_snapshot("after_creating_tasks")
# Cancel some tasks
for task in tasks[:25]:
task.cancel()
await asyncio.sleep(1)
detector.take_snapshot("after_partial_cleanup")
# Compare snapshots
detector.compare_snapshots("start", "after_creating_tasks")
detector.compare_snapshots("after_creating_tasks", "after_partial_cleanup")
# Cleanup
monitor.cancel()
for task in tasks:
task.cancel()
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
pass
asyncio.run(test_with_detector())
Step 4: Fixing Memory Leaks - Complete Solutions
Here are proven solutions for each leak pattern:
Solution 1: Proper Task Management
import asyncio
from contextlib import asynccontextmanager
from typing import Set
class TaskManager:
"""Manages async tasks with proper cleanup"""
def __init__(self):
self.tasks: Set[asyncio.Task] = set()
def create_task(self, coro):
"""Create and track a task"""
task = asyncio.create_task(coro)
self.tasks.add(task)
# Remove task when done
task.add_done_callback(self.tasks.discard)
return task
async def cancel_all(self):
"""Cancel all tracked tasks properly"""
# Cancel all tasks
for task in self.tasks:
task.cancel()
# Wait for cancellation to complete
results = await asyncio.gather(*self.tasks, return_exceptions=True)
# Process cancellation results
for task, result in zip(self.tasks, results):
if isinstance(result, asyncio.CancelledError):
pass # Expected
elif isinstance(result, Exception):
print(f"Task failed with: {result}")
self.tasks.clear()
@asynccontextmanager
async def task_scope(self):
"""Context manager for automatic cleanup"""
try:
yield self
finally:
await self.cancel_all()
# Fixed version with proper cleanup
async def fixed_background_tasks():
"""Demonstrates proper task management"""
async def worker(worker_id: int, queue: asyncio.Queue):
"""Worker with proper cleanup"""
local_data = []
try:
while True:
try:
# Use timeout to avoid infinite wait
item = await asyncio.wait_for(
queue.get(),
timeout=1.0
)
local_data.append(item)
# Process and clear periodically
if len(local_data) >= 10:
# Process batch
await process_batch(local_data)
local_data.clear() # Clear to prevent growth
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
# Proper cleanup on cancellation
if local_data:
await process_batch(local_data)
print(f"Worker {worker_id} cleaned up")
raise
async def process_batch(items):
"""Process a batch of items"""
await asyncio.sleep(0.1) # Simulate processing
print(f"Processed {len(items)} items")
async def main_fixed():
"""Main function with proper cleanup"""
queue = asyncio.Queue()
async with TaskManager().task_scope() as manager:
# Create workers
for i in range(5):
manager.create_task(worker(i, queue))
# Add work items
for i in range(50):
await queue.put(f"item_{i}")
# Wait for queue to be processed
await queue.join()
# Tasks automatically cleaned up when exiting context
asyncio.run(main_fixed())
Solution 2: Breaking Reference Cycles
import asyncio
import weakref
class FixedConnection:
"""Connection with proper cleanup and weak references"""
def __init__(self):
self._loop = asyncio.get_running_loop()
self._callbacks = []
self._data_buffer = []
self._handle = None
self._closed = False
# Use weak reference to avoid cycle
weak_self = weakref.ref(self)
def callback():
self_ref = weak_self()
if self_ref and not self_ref._closed:
self_ref.process_data()
self._callback = callback
self._schedule_next()
def _schedule_next(self):
"""Schedule next processing"""
if not self._closed:
self._handle = self._loop.call_later(0.1, self._callback)
def process_data(self):
"""Process data with size limits"""
if self._closed:
return
# Add data with size limit
if len(self._data_buffer) < 100:
self._data_buffer.append("data")
else:
# Clear old data to prevent unbounded growth
self._data_buffer = self._data_buffer[-50:]
self._schedule_next()
async def close(self):
"""Proper cleanup"""
self._closed = True
# Cancel pending callback
if self._handle:
self._handle.cancel()
# Clear all references
self._callbacks.clear()
self._data_buffer.clear()
self._callback = None
self._handle = None
async def connection_pool_example():
"""Example of proper connection management"""
class ConnectionPool:
def __init__(self, max_connections: int = 10):
self.connections = []
self.max_connections = max_connections
async def acquire(self):
"""Get a connection from pool"""
if len(self.connections) < self.max_connections:
conn = FixedConnection()
self.connections.append(conn)
return conn
# Reuse existing connection
return self.connections[0]
async def close_all(self):
"""Close all connections properly"""
close_tasks = [conn.close() for conn in self.connections]
await asyncio.gather(*close_tasks)
self.connections.clear()
# Use the pool
pool = ConnectionPool()
try:
# Acquire connections
connections = []
for i in range(5):
conn = await pool.acquire()
connections.append(conn)
# Use connections
await asyncio.sleep(2)
finally:
# Always cleanup
await pool.close_all()
print("All connections closed properly")
asyncio.run(connection_pool_example())
Solution 3: Async Generator Cleanup
import asyncio
from typing import AsyncGenerator
class AsyncGeneratorManager:
"""Manages async generators with guaranteed cleanup"""
def __init__(self):
self.generators = []
async def safe_generator(self, data_size: int) -> AsyncGenerator:
"""Generator with automatic cleanup"""
large_data = list(range(data_size))
try:
for item in large_data:
yield item
await asyncio.sleep(0.001)
finally:
# Always executes
large_data.clear()
print(f"Generator cleaned up {data_size} items")
@asynccontextmanager
async def managed_generator(self, data_size: int):
"""Context-managed generator"""
gen = self.safe_generator(data_size)
self.generators.append(gen)
try:
yield gen
finally:
# Ensure generator is closed
await gen.aclose()
self.generators.remove(gen)
async def fixed_generator_usage():
"""Proper async generator usage"""
manager = AsyncGeneratorManager()
# Method 1: Using async with for automatic cleanup
async with manager.managed_generator(1000) as gen:
count = 0
async for item in gen:
count += 1
if count >= 10:
break # Generator still cleaned up properly
print(f"Processed {count} items with automatic cleanup")
# Method 2: Manual cleanup with try/finally
gen = manager.safe_generator(1000)
try:
count = 0
async for item in gen:
count += 1
if count >= 10:
break
finally:
await gen.aclose() # Explicit cleanup
print(f"Manually cleaned up after {count} items")
# Method 3: Using asyncio.timeout for bounded iteration
gen = manager.safe_generator(100000)
try:
async with asyncio.timeout(1.0): # Max 1 second
async for item in gen:
# Process items for limited time
pass
except asyncio.TimeoutError:
print("Timed out - cleaning up")
finally:
await gen.aclose()
asyncio.run(fixed_generator_usage())
Step 5: Advanced Leak Prevention Patterns
Implement these patterns to prevent leaks from occurring:
import asyncio
from functools import wraps
import signal
import sys
def timeout_task(seconds: float):
"""Decorator to add timeout to async functions"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds
)
except asyncio.TimeoutError:
print(f"{func.__name__} timed out after {seconds}s")
raise
return wrapper
return decorator
class GracefulShutdown:
"""Handle shutdown gracefully to prevent leaks"""
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks = set()
def setup_handlers(self):
"""Setup signal handlers for graceful shutdown"""
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(self.shutdown())
)
async def shutdown(self):
"""Graceful shutdown procedure"""
print("Shutting down gracefully...")
self.shutdown_event.set()
# Cancel all tracked tasks
for task in self.tasks:
task.cancel()
# Wait for all tasks to complete
await asyncio.gather(*self.tasks, return_exceptions=True)
# Stop the event loop
asyncio.get_running_loop().stop()
def track_task(self, coro):
"""Create and track a task"""
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
# Complete application with leak prevention
class LeakFreeApplication:
"""Application designed to prevent memory leaks"""
def __init__(self):
self.shutdown_handler = GracefulShutdown()
self.resources = []
@timeout_task(30.0) # 30 second timeout
async def long_running_task(self, task_id: int):
"""Task with built-in timeout"""
try:
while not self.shutdown_handler.shutdown_event.is_set():
await asyncio.sleep(1)
# Do work here
except asyncio.CancelledError:
print(f"Task {task_id} cancelled - cleaning up")
raise
async def resource_manager(self):
"""Manage resources with proper cleanup"""
resource = await self.acquire_resource()
self.resources.append(resource)
try:
# Use resource
await self.use_resource(resource)
finally:
# Always release
await self.release_resource(resource)
self.resources.remove(resource)
async def acquire_resource(self):
"""Acquire a resource (file, connection, etc)"""
return {"id": len(self.resources), "data": []}
async def use_resource(self, resource):
"""Use the resource"""
resource["data"].append("used")
await asyncio.sleep(0.5)
async def release_resource(self, resource):
"""Release the resource"""
resource["data"].clear()
print(f"Resource {resource['id']} released")
async def run(self):
"""Main application loop"""
self.shutdown_handler.setup_handlers()
# Start tasks
for i in range(5):
self.shutdown_handler.track_task(
self.long_running_task(i)
)
# Start resource managers
for i in range(3):
self.shutdown_handler.track_task(
self.resource_manager()
)
# Wait for shutdown signal
await self.shutdown_handler.shutdown_event.wait()
# Cleanup all resources
for resource in self.resources:
await self.release_resource(resource)
async def main():
"""Run the leak-free application"""
app = LeakFreeApplication()
try:
await app.run()
except KeyboardInterrupt:
print("Interrupted - cleaning up")
finally:
# Final cleanup
print("Application shut down cleanly")
# Run with proper cleanup
if __name__ == "__main__":
asyncio.run(main())
Additional Tips and Common Pitfalls
Watch out for these subtle leak sources that often go unnoticed:
# Pitfall 1: Global task storage
global_tasks = [] # This grows indefinitely
async def bad_pattern():
task = asyncio.create_task(some_work())
global_tasks.append(task) # Never cleaned
# Fix: Use weak references or cleanup regularly
import weakref
global_tasks_weak = weakref.WeakSet()
async def good_pattern():
task = asyncio.create_task(some_work())
global_tasks_weak.add(task) # Automatically removed when done
# Pitfall 2: Event listeners not removed
class EventEmitter:
def __init__(self):
self.listeners = {}
def on(self, event, callback):
self.listeners.setdefault(event, []).append(callback)
# Return removal function
return lambda: self.listeners[event].remove(callback)
# Always store and call the removal function
emitter = EventEmitter()
remove_listener = emitter.on("data", handle_data)
# Later: remove_listener() to prevent leak
# Pitfall 3: Infinite queues without bounds
queue = asyncio.Queue() # Unbounded - dangerous
# Fix: Use maxsize
queue = asyncio.Queue(maxsize=1000) # Bounded queue
Memory leak prevention in asyncio requires discipline. Always ensure tasks are tracked and cancelled, resources are released in finally blocks, and generators are properly closed. Use the detection tools shown here to catch leaks early in development rather than discovering them in production.