49.5 Chaining Futures and Callbacks
While concurrent.futures provides a straightforward path to parallelism, its true power emerges when you orchestrate complex workflows by chaining operations and reacting to results as they complete. This is achieved through the Future.add_done_callback() method and the composition of Future objects, enabling a reactive, event-driven style of concurrent programming.
The add_done_callback() Mechanism
A Future object represents a computation that may not have finished yet. The add_done_callback() method allows you to register a callable (a function) that will be invoked immediately after the Future is resolved—meaning it has either completed with a result, been canceled, or raised an exception. The callback function must accept exactly one argument: the Future object itself.
from concurrent.futures import ThreadPoolExecutor
import time
def simulate_fetch(url):
"""Simulate a long-running network request."""
time.sleep(2) # Simulate network delay
return f"Data from {url}"
def process_data(future):
"""Callback function to process the fetched data."""
try:
data = future.result() # This will not block; the result is ready.
print(f"Processing: {data.upper()}")
except Exception as exc:
print(f"Fetching data generated an exception: {exc}")
# Create a ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
# Submit the task to the executor, which returns a Future
future = executor.submit(simulate_fetch, "http://example.com")
# Register the callback to be run upon completion
future.add_done_callback(process_data)
# The main thread can continue doing other work while waiting.
print("Main thread can continue executing...")
time.sleep(3) # Simulate other work
# Output (after ~2 seconds):
# Main thread can continue executing...
# Processing: DATA FROM HTTP://EXAMPLE.COM
The key insight here is that future.result() inside the callback does not block. Since the callback is only invoked once the Future is done, the result is immediately available. This makes callbacks highly efficient for post-processing.
Chaining Futures for Complex Workflows
Callbacks are powerful, but nesting them can lead to the infamous “callback hell.” A more elegant and Pythonic approach is to chain Future objects together. You can submit a new task from within a callback that depends on the result of the previous one. This creates a chain of operations executed in the background.
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_data(url):
time.sleep(1)
return f"Raw data from {url}"
def parse_data(raw_data):
time.sleep(0.5)
return f"Parsed: [{raw_data}]"
def save_data(parsed_data):
time.sleep(0.5)
print(f"Saved: {parsed_data}")
return "Save successful"
def chain_futures(executor, url):
"""Demonstrates chaining three async operations: fetch, parse, save."""
def start_save(future):
# This callback is attached to the 'parse' future.
parsed_result = future.result()
# Submit the next stage in the chain and return its Future.
return executor.submit(save_data, parsed_result)
def start_parse(future):
# This callback is attached to the 'fetch' future.
fetch_result = future.result()
# Submit the next stage and add a callback to *its* result.
parse_future = executor.submit(parse_data, fetch_result)
parse_future.add_done_callback(start_save)
return parse_future
# Kick off the entire chain
fetch_future = executor.submit(fetch_data, url)
fetch_future.add_done_callback(start_parse)
return fetch_future
with ThreadPoolExecutor(max_workers=3) as executor:
future = chain_futures(executor, "http://api.example.com")
# We can wait for the final future in the chain if needed.
final_result = future.result() # This waits for the entire chain to complete?
print(f"Final result: {final_result}") # Oops, this won't be what we expect!
Important Note: The above code has a common pitfall. The chain_futures function returns the initial fetch_future, but the final result of the entire chain is held by the Future returned from executor.submit(save_data, ...). The main thread waits for fetch_future.result(), which is just the raw data, not the final “Save successful” message. To properly handle this, you would need to manage and return the final Future in the chain.
Exception Handling in Callbacks
A critical best practice is to always handle exceptions inside your callbacks using future.result(). If an exception was raised in the wrapped function, calling result() will re-raise it inside the callback. If this exception is not caught, it will be logged but otherwise ignored, potentially silently failing your task.
def faulty_task():
raise ValueError("Something went terribly wrong!")
def callback_that_handles(future):
try:
result = future.result()
print(f"Success: {result}")
except ValueError as e:
print(f"Handled error in callback: {e}")
except Exception as e:
print(f"Handled a different error: {e}")
with ThreadPoolExecutor() as executor:
future = executor.submit(faulty_task)
future.add_done_callback(callback_that_handles)
# Output:
# Handled error in callback: Something went terribly wrong!
Pitfalls and Best Practices
State Capture in Callbacks: Be cautious of closures. A callback function captures its surrounding scope at definition time. If variables change after the callback is defined but before it’s executed, it might use the unexpected value. Use default arguments to bind values immediately.
# POTENTIAL BUG for i in range(5): future = executor.submit(task) future.add_done_callback(lambda f: print(f"Done with task {i}")) # Will print '5' every time # FIX: Use a default argument to capture the value of 'i' immediately. for i in range(5): future = executor.submit(task) future.add_done_callback(lambda f, idx=i: print(f"Done with task {idx}"))Keeping Executors Alive: Callbacks are executed in the same thread that set the
Future’s result. This is usually a worker thread from theThreadPoolExecutor. Your executor must still be running when theFuturecompletes for the callback to execute. If you use awithblock, the main thread will wait for all submitted futures to complete before shutting down the executor, which is the correct behavior.Order of Execution: Callbacks are called in the order they were added. However, if you add a callback to a
Futurethat is already done, the callback will be called immediately in the current thread.For CPU-bound Tasks: While these patterns work with
ProcessPoolExecutor, the overhead of pickling and unpickling data between processes for each step in a chain can be significant. This approach is generally more effective for I/O-bound tasks where the data being passed is relatively small.