47.3 Pool: Map, Starmap, and Apply Async
The multiprocessing.Pool class provides a high-level interface for distributing work across multiple processes, abstracting away much of the complexity of manual process management. It creates a pool of worker processes, typically one per CPU core, which remain idle, ready to consume tasks from a job queue. This model is exceptionally efficient for embarrassingly parallel problems—where tasks are independent and require no communication between processes—as it avoids the overhead of repeatedly creating and destroying processes.
Initializing a Pool and Understanding processes, maxtasksperchild, and initializer
When you instantiate a Pool, the processes argument defines the number of worker processes to create. If None (the default), os.cpu_count() is used, which is generally optimal for CPU-bound tasks. For I/O-bound tasks, you might use a larger pool to keep CPUs busy while processes wait on I/O.
The maxtasksperchild argument is a powerful tool for managing resource leaks. It specifies the maximum number of tasks a worker process will execute before being replaced with a fresh one. This is crucial for long-running applications where third-party libraries or your own code might inadvertently leak memory or other resources. By periodically recycling workers, you can mitigate these leaks.
The initializer and initargs arguments allow you to specify a function and its arguments to be run when each worker process starts. This is the perfect place to load large, read-only data, establish database connections, or configure process-wide state, ensuring this expensive operation is done once per process, not once per task.
import multiprocessing as mp
import numpy as np
def init_worker(shared_array_info):
# This runs once when each worker process starts
print(f"Worker {mp.current_process().name} initializing...")
# Reconstruct the shared array in the worker's memory space
global large_data
shape, dtype = shared_array_info
large_data = np.zeros(shape, dtype=dtype) # Simulate loading data
def task_function(index):
# Simulate a task that uses the pre-loaded data
result = large_data[index] * index
return result
if __name__ == '__main__':
# Create a pool of 4 workers, each initializing with a large dataset
with mp.Pool(processes=4, initializer=init_worker, initargs=((1000,), np.float64), maxtasksperchild=100) as pool:
results = pool.map(task_function, range(10))
print(results)
map, starmap, and Their Async Counterparts
The map(func, iterable) method is the direct parallel analog of the built-in map(). It chops the iterable into chunks, sends each chunk to a worker process, and blocks until all results are ready, returned in the correct order. This simplicity is its greatest strength for straightforward applications.
starmap(func, iterable) is designed for functions that accept multiple arguments. Instead of each element in the iterable being a single argument, it expects each element to be an iterable of arguments (like a tuple) which are unpacked (*) when calling func. For example, starmap(func, [(1,2), (3,4)]) executes func(1,2) and func(3,4).
Their asynchronous counterparts, map_async and starmap_async, are non-blocking. They immediately return an AsyncResult object and submit the tasks to the pool in the background. This allows the main program to continue executing other code while the workers are busy. You must call AsyncResult.get() to retrieve the results; this call will block until the results are available. This method also allows you to specify a timeout for the wait and a callback function to be executed automatically when the results are ready.
import time
import multiprocessing as mp
def power(x, n):
time.sleep(0.1) # Simulate work
return x ** n
if __name__ == '__main__':
with mp.Pool(4) as pool:
# Standard map (blocks)
result_map = pool.map(power, [2, 3, 4], chunksize=2) # chunksize can improve performance
print("Map result:", result_map)
# Starmap (for multiple arguments)
args = [(2, 3), (3, 2), (4, 1)]
result_starmap = pool.starmap(power, args)
print("Starmap result:", result_starmap)
# Async version (non-blocking)
async_result = pool.starmap_async(power, args)
print("Main process can do other work here...")
# Eventually, we block until ready
result_async = async_result.get(timeout=2) # Wait max 2 seconds
print("Async result:", result_async)
apply and apply_async for Individual Tasks
While map is for parallelizing a function call over an iterable, apply(func, args=(), kwds={}) is for executing a single function call in the pool. It blocks until the result is ready. Its primary use is rare, as it submits only one job, underutilizing the pool.
apply_async(func, args=(), kwds={}, callback=None) is far more useful. It submits a single task to the pool without blocking, returning an AsyncResult object immediately. This is the building block for building custom, dynamic task queues where tasks are submitted in response to events or other conditions, rather than from a pre-defined list. The callback function, if provided, is called with the task’s result as its single argument as soon as it is ready, enabling a reactive programming style.
import multiprocessing as mp
def slow_task(name, duration):
print(f"{name} starting")
time.sleep(duration)
return f"{name} finished after {duration}s"
def result_callback(result):
print(f"Callback received: {result}")
if __name__ == '__main__':
with mp.Pool(2) as pool:
# Submit several tasks asynchronously
jobs = []
for i, dur in enumerate([2, 1, 3], 1):
# apply_async is perfect for dynamic task submission
async_job = pool.apply_async(slow_task, ('Task'+str(i), dur), callback=result_callback)
jobs.append(async_job)
print("All tasks submitted. Main process is free.")
# We can also choose to wait for specific jobs by calling get() on their AsyncResult
result_of_second_job = jobs[1].get()
print(f"Manually retrieved: {result_of_second_job}")
Best Practices, Chunksize, and Common Pitfalls
- Chunksize Optimization: The
chunksizeparameter inmapandstarmapdetermines how many items from the iterable are sent to a worker in one batch. A larger chunksize reduces inter-process communication overhead (serialization/deserialization and queueing) but can lead to poor load balancing if task times vary significantly. The default algorithm is a good starting point, but for very long iterables, experimenting with larger values can yield performance gains. - Avoid Large Data Transfer: The biggest performance killer is transferring large amounts of data between processes. Use
initializerto load read-only data once per worker. For mutable data that must be shared, usemultiprocessing.Arrayormultiprocessing.Valueinstead of passing it back and forth. - Exception Handling: Exceptions raised in worker processes are propagated when you call
get()on theAsyncResultobject. Always wrap yourget()calls in try-except blocks to handle potential worker errors gracefully. - The
if __name__ == '__main__':Guard: This guard is absolutely mandatory on Windows and good practice on other platforms. It prevents the child processes from recursively re-executing the entire script, which would lead to an infinite loop and a runtime error. - Pool with Context Manager: Always instantiate your
Poolinside awithstatement. This ensures thatpool.close()andpool.terminate()are called correctly, even if an error occurs, preventing orphaned processes.