Tasks are one of the most powerful and commonly used high-level abstractions in asyncio. They are used to schedule coroutines to run concurrently on the event loop. Think of a Task as a wrapper around a coroutine that manages its execution state—pending, running, done, or cancelled—and provides the mechanisms to interact with it, such as waiting for its completion or requesting its cancellation.

Creating Tasks with asyncio.create_task()

The primary method for creating a Task is asyncio.create_task(coro, *, name=None). This function takes a coroutine object, schedules it to run on the event loop as soon as possible, and returns a Task object. The optional name argument, available in Python 3.8+, is invaluable for debugging, as it allows you to identify the task in logs and traces.

It is crucial to understand that create_task does not pause your current coroutine to wait for the task to finish. It merely schedules it. The task will run concurrently, and its result will be available whenever you explicitly await it or use a tool like gather.

import asyncio

async def background_work(n):
    print(f"Task {n}: starting")
    await asyncio.sleep(n)  # Simulate some I/O-bound work
    print(f"Task {n}: finished after {n}s")
    return n * 10

async def main():
    # Create two tasks, scheduling them to run immediately
    task1 = asyncio.create_task(background_work(1), name="Fast Task")
    task2 = asyncio.create_task(background_work(2), name="Slow Task")

    print("Main coroutine: Tasks are scheduled, now I can do other work...")
    await asyncio.sleep(0.5)  # The main coroutine does its own thing
    print("Main coroutine: Done with my work, now I'll wait for the tasks.")

    # Await the tasks to retrieve their results and ensure they are finished
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())

Output:

Main coroutine: Tasks are scheduled, now I can do other work...
Task 1: starting
Task 2: starting
Main coroutine: Done with my work, now I'll wait for the tasks.
Task 1: finished after 1s
Task 2: finished after 2s
Results: 10, 20

Gathering Multiple Tasks with asyncio.gather()

When you need to run multiple awaitable objects (like Tasks or coroutines) concurrently and wait for all of them to finish, asyncio.gather(*aws, return_exceptions=False) is the ideal tool. It returns a single future that aggregates the results from all the given awaitables into a list, preserving the order of the input objects, not the order of completion.

The return_exceptions parameter is critical for error handling. If False (the default), the first exception raised by any awaitable will be immediately propagated to the future returned by gather, causing it to fail and consequently cancel all other awaitables. If True, exceptions are treated the same as successful results and returned in the results list, allowing all other awaitables to run to completion.

import asyncio

async def fetch_data(id, delay, should_fail=False):
    await asyncio.sleep(delay)
    if should_fail:
        raise RuntimeError(f"Failed to fetch data {id}!")
    return f"Data from {id}"

async def main():
    # Run three tasks concurrently
    results = await asyncio.gather(
        fetch_data("A", 3),
        fetch_data("B", 1),
        fetch_data("C", 2),
    )
    print("All successful:", results)

    # Demonstrate error handling with return_exceptions=True
    mixed_results = await asyncio.gather(
        fetch_data("D", 1),
        fetch_data("E", 2, should_fail=True),
        fetch_data("F", 3),
        return_exceptions=True,  # Crucial for handling the error gracefully
    )
    print("With exceptions handled:", mixed_results)

asyncio.run(main())

Cancelling Tasks

Tasks can be cancelled using their cancel() method. This does not forcibly stop the task; instead, it politely requests cancellation. Internally, it raises a CancelledError exception inside the coroutine that the task is wrapping. The coroutine then has a chance to clean up resources by handling this exception (e.g., using try/finally or async with) before exiting. If the coroutine catches the CancelledError and suppresses it, the cancellation request is effectively ignored.

import asyncio

async def cancellable_work():
    try:
        print("Task started. Now sleeping for 10 seconds.")
        await asyncio.sleep(10)
        print("Task finished successfully.")  # This won't be printed if cancelled
    except asyncio.CancelledError:
        print("Task received cancellation signal! Cleaning up...")
        # Perform any necessary cleanup here
        raise  # It's best practice to re-raise the exception unless you intend to ignore the cancellation
    finally:
        print("This 'finally' block always runs, making it great for cleanup.")

async def main():
    task = asyncio.create_task(cancellable_work())
    await asyncio.sleep(1)  # Let the task run for a bit

    print("Main: Requesting task cancellation.")
    task.cancel()

    try:
        await task  # Awaiting a cancelled task will raise CancelledError
    except asyncio.CancelledError:
        print("Main: Confirmed that the task was cancelled.")

asyncio.run(main())

Best Practices and Common Pitfalls

  1. Hold a Reference: Always store the Task object returned by create_task if you intend to await it or cancel it later. Forgetting to hold a reference is a common mistake that makes the task effectively “fire-and-forget” and impossible to manage.
  2. Cancellation is Cooperative: Remember that task cancellation is not pre-emptive. Your coroutines must be written to handle CancelledError for cancellation to work effectively. Long-running CPU-bound code without await statements will not be interrupted.
  3. Avoid Creating Tasks in Non-Async Functions: create_task must be called within a running event loop, typically inside an async function. Creating a task at the top level of a script (outside of asyncio.run) will fail because no event loop is running.
  4. Name Your Tasks: Always use the name parameter for easier debugging. When an exception occurs or when inspecting running tasks, a descriptive name is immensely helpful.
  5. Understand gather Behavior: Be acutely aware of the return_exceptions flag. Letting exceptions propagate by default ensures errors are not silently ignored, while setting it to True is necessary for robust execution where partial failures are acceptable.