Asyncio’s debugging facilities are essential for diagnosing the complex issues that arise in concurrent programming. The framework provides a dedicated debug mode that surfaces hidden problems and instruments the event loop to provide detailed insights. Understanding how to leverage this mode and recognizing common antipatterns are critical skills for developing robust asynchronous applications.

Enabling and Configuring Debug Mode

Debug mode is enabled by setting the PYTHONASYNCIODEBUG environment variable to 1 or by explicitly configuring the event loop. When active, it performs expensive but invaluable runtime checks. The most straightforward method is through environment variables before starting your application.

PYTHONASYNCIODEBUG=1 python my_async_app.py

For more precise control, you can enable it programmatically. This is the recommended approach when programmatically creating your event loop, as it ensures debug mode is active before any coroutines are scheduled.

import asyncio
import logging

# Configure logging to see debug output
logging.basicConfig(level=logging.DEBUG)

async def main():
    # Your application code here
    await asyncio.sleep(0.1)

# Create a new event loop with debug enabled
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(True)  # Enable debug mode

try:
    loop.run_until_complete(main())
finally:
    loop.close()

When enabled, debug mode instruments the event loop to:

  • Log slow callbacks: Any coroutine, callback, or task that takes longer than a threshold (default 100ms) to execute is logged. This is crucial for identifying functions that are blocking the event loop. The threshold can be modified with loop.slow_callback_duration.
  • Check for unawaited coroutines: It raises a RuntimeWarning if a coroutine is created but never awaited. This catches mistakes like forgetting the await keyword.
  • Validate thread safety: It throws errors if the event loop is accessed from a thread other than the one it is running in, preventing subtle and hard-to-reproduce concurrency bugs.
  • Monitor resource creation: It keeps track of active tasks and transports, helping to identify resource leaks.

Common Pitfall: Forgetting the await Keyword

This is perhaps the most frequent mistake made by developers new to async/await. Omitting the await keyword before a coroutine call does not cause an immediate syntax error. Instead, the interpreter treats the coroutine like any other function call and returns a coroutine object without actually executing the code inside it. This often leads to code that appears to run but does nothing, or worse, fails mysteriously later.

import asyncio

async def fetch_data(delay, id):
    print(f"Task {id} starting")
    await asyncio.sleep(delay)
    print(f"Task {id} finished")
    return f"data-{id}"

async def incorrect_main():
    """Common mistake: forgetting 'await'."""
    # This creates a coroutine object but does not run it.
    # No error is raised here.
    coro_object = fetch_data(1.0, 1)

    # The print statement executes, but fetch_data does not.
    print("This prints, but fetch_data never runs.")
    # The function returns without completing the work.

async def correct_main():
    """Correct approach: using 'await'."""
    result = await fetch_data(1.0, 1)  # Note the 'await'
    print(f"Got result: {result}")

# Running the incorrect version: notice the lack of output from fetch_data
asyncio.run(incorrect_main())
print("---")
# Running the correct version
asyncio.run(correct_main())

Common Pitfall: Blocking the Event Loop

The golden rule of asyncio is to never block the event loop with synchronous I/O or CPU-bound work. The event loop is single-threaded; a blocking call halts all other tasks, concurrency, and I/O multiplexing entirely.

import asyncio
import time

async def blocking_task():
    """A task that mistakenly uses a blocking call."""
    print("Blocking task started")
    time.sleep(2)  # This blocks the entire event loop for 2 seconds!
    print("Blocking task finished")  # Nothing else runs during the sleep

async def async_task():
    """A proper non-blocking task."""
    print("Async task started")
    await asyncio.sleep(2)  # This yields control back to the event loop
    print("Async task finished")

async def main():
    # Schedule both tasks to run concurrently
    task1 = asyncio.create_task(blocking_task())
    task2 = asyncio.create_task(async_task())
    await asyncio.gather(task1, task2)

# With debug mode on, the time.sleep(2) would be flagged as a severe slow callback.
asyncio.run(main())

The correct approach is to offload blocking functions to a thread pool using loop.run_in_executor.

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def non_blocking_task():
    print("Offloaded task started")
    loop = asyncio.get_running_loop()
    # Offload the blocking call to a thread pool
    await loop.run_in_executor(None, time.sleep, 2)
    print("Offloaded task finished")

async def main():
    await asyncio.gather(non_blocking_task(), async_task())  # Now they run truly concurrently

Common Pitfall: Improper Task Handling and Resource Cleanup

Tasks are powerful but require careful management. A common error is firing off tasks with asyncio.create_task() but not retaining a reference to them. If the task encounters an exception after your code has moved on, the exception has nowhere to propagate to and is silently logged instead of crashing the program, making debugging extremely difficult. This is often called a “ghost task.”

async def risky_task():
    await asyncio.sleep(0.1)
    raise ValueError("This is a hidden exception!")

async def main_ghost_task():
    # Creating a task without storing a reference to it
    asyncio.create_task(risky_task())
    await asyncio.sleep(0.2)  # The program finishes without crashing

asyncio.run(main_ghost_task())  # No visible error, but one occurred.

The best practice is to explicitly manage your tasks. Use asyncio.gather or asyncio.wait to collect results and handle exceptions, or store task references and await them later to ensure proper completion and exception propagation.

async def main_proper_task_handling():
    # Properly manage the task by storing it
    task = asyncio.create_task(risky_task())
    try:
        # Wait for it to complete to handle any potential exception
        await task
    except ValueError as e:
        print(f"Caught expected exception: {e}")