49.2 ProcessPoolExecutor: CPU-Bound Parallelism
The ProcessPoolExecutor class within the concurrent.futures module is a powerful abstraction for achieving parallelism on CPU-bound tasks in Python. It manages a pool of worker processes, distributing tasks (callables) among them to leverage multiple CPU cores. This is fundamentally different from the ThreadPoolExecutor, which uses threads. Due to the Global Interpreter Lock (GIL) in CPython, threads cannot execute Python bytecode in parallel, making them unsuitable for CPU-intensive work. ProcessPoolExecutor side-steps the GIL by creating separate Python interpreter processes, each with its own memory space and its own GIL, allowing true parallel execution on multi-core systems.
The primary trade-off for this parallelism is the overhead of process creation and inter-process communication (IPC). Data exchanged between the main process and the worker processes must be serialized (pickled), sent through a queue, and then deserialized. This makes ProcessPoolExecutor ideal for scenarios where the computational workload (the task itself) is significantly heavier than the cost of transferring the input data and results.
Initialization and the max_workers Parameter
When creating a ProcessPoolExecutor, the most important decision is the number of worker processes, specified by the max_workers parameter. If max_workers is None or not given, it defaults to the number of processors on the machine, as determined by os.cpu_count(). This is generally a sensible default, as it creates one process per logical CPU core. However, the optimal number can vary. For tasks that are purely CPU-bound and spend little time waiting, setting max_workers to os.cpu_count() is ideal. For tasks that also involve some I/O (e.g., reading from a disk cache), you might benefit from slightly oversubscribing the CPU (e.g., max_workers=os.cpu_count() + 2). Oversubscribing a purely CPU-bound task will, however, lead to increased context-switching overhead and potentially worse performance.
from concurrent.futures import ProcessPoolExecutor
import os
# Use the default (number of CPUs)
with ProcessPoolExecutor() as executor:
# ... submit tasks
# Explicitly set the number of workers
with ProcessPoolExecutor(max_workers=4) as executor:
# ... submit tasks
print(f"Number of available CPUs: {os.cpu_count()}")
Submitting Tasks and Retrieving Results
The submit() method schedules a callable to be executed and immediately returns a Future object. This Future is a handle, a promise that you can use to check the status of the task (future.running(), future.done()) or retrieve its result (or exception) once it completes. The result() method is blocking; it will wait until the task is finished and then return its output or re-raise any exception that occurred in the worker process. The map() method provides a simpler interface for applying the same function to an iterable of arguments, yielding results in the order the tasks were submitted.
import math
from concurrent.futures import ProcessPoolExecutor
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
# Using submit() and Futures
with ProcessPoolExecutor() as executor:
future = executor.submit(is_prime, 112272535095293)
# ... other work can be done here while the prime check runs
result = future.result() # Blocks until the result is available
print(f"Is prime? {result}")
# Using map() for multiple inputs
numbers = [112272535095293, 112582705942171, 115280095190773, 115797848077099]
with ProcessPoolExecutor() as executor:
for number, result in zip(numbers, executor.map(is_prime, numbers)):
print(f"{number} is prime: {result}")
Pitfalls: Serialization and Shared State
A common pitfall involves the serialization of function arguments and return values. Everything passed to submit() or map() must be picklable. This includes the function itself if you are using lambdas or functions defined in the __main__ block (hence the common use of if __name__ == '__main__': guards). Furthermore, each worker process operates in its own memory space. Modifying a global variable in a worker will not affect the global variable in the main process or in other workers. This lack of shared state prevents race conditions but means you cannot use global variables for communication or aggregation.
# INCORRECT: This will not work as intended.
global_counter = 0
def task(data):
global global_counter
# ... process data ...
global_counter += 1 # This modifies the worker's COPY of global_counter
if __name__ == '__main__':
data_chunks = [ [...] ]
with ProcessPoolExecutor() as executor:
executor.map(task, data_chunks)
print(global_counter) # This will still be 0, not the number of chunks.
Best Practices and the Context Manager
The ProcessPoolExecutor should always be used as a context manager (the with statement). This ensures that the pool is properly shut down, calling executor.shutdown(wait=True), which waits for all outstanding futures to complete and then terminates all worker processes. Failing to do this can lead to orphaned processes. For long-running tasks or interactive environments, you can instantiate the executor directly and call shutdown(wait=False) to attempt to terminate workers immediately, though this may leave tasks unfinished.
# Preferred method: using a context manager
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(expensive_function, large_list))
# The pool is now cleanly shut down. You cannot submit more tasks.
# executor.submit(...) # This would raise a RuntimeError
Handling Exceptions in Worker Processes
Exceptions raised within a worker process are not propagated immediately to the main program. Instead, they are captured and attached to the Future object. When you call future.result(), the exception is re-raised in the main process, allowing you to handle it with a standard try...except block. It is crucial to handle these exceptions; if left uncaught, they can cause your program to fail silently for that particular task.
def task_that_might_fail(x):
if x == 0:
raise ValueError("Division by zero is imminent!")
return 10 / x
with ProcessPoolExecutor() as executor:
future = executor.submit(task_that_might_fail, 0)
try:
result = future.result()
print(result)
except ValueError as e:
print(f"A worker process crashed with error: {e}")