47.2 The multiprocessing Module: Process, Queue, Pipe
The multiprocessing module is the cornerstone of process-based parallelism in Python. It sidesteps the Global Interpreter Lock (GIL) by spawning new operating system processes, each with its own private memory space and a dedicated Python interpreter. This allows for true parallel execution on multi-core systems. The Process class provides the fundamental building block for creating and managing these processes, while Queue and Pipe are the primary mechanisms for safe communication and data exchange between them.
The Process Class
The Process class is used to create and control a new process object. The target function, along with its arguments, is executed in the new process’s memory space. The key methods are start(), which launches the process and invokes the run() method, and join(), which blocks the calling process until the process whose join() method is called terminates.
import multiprocessing
import os
def worker(name, delay):
"""A simple worker function that runs in a separate process."""
print(f"Process {name} (PID: {os.getpid()}) started. Sleeping for {delay} seconds.")
# Simulate some work by sleeping
import time
time.sleep(delay)
print(f"Process {name} finished.")
if __name__ == '__main__': # Crucial for Windows compatibility
processes = []
# Create process objects
p1 = multiprocessing.Process(target=worker, args=('Process-1', 2))
p2 = multiprocessing.Process(target=worker, args=('Process-2', 1))
# Start the processes
p1.start()
p2.start()
# Wait for both processes to complete
p1.join()
p2.join()
print("All processes have finished execution.")
Why this works: Each Process object spawns a new OS process. The os.getpid() calls confirm they have different Process IDs. The join() calls in the main process ensure it doesn’t proceed until its children are finished, preventing orphaned processes. The if __name__ == '__main__': guard is essential on Windows because the spawning mechanism (spawn) imports the main module, and without this guard, the code would create new processes recursively.
Inter-Process Communication with Queue
Since processes do not share memory, they cannot simply use global variables to communicate. The multiprocessing.Queue is a FIFO (First-In, First-Out) data structure implemented using a pipe and locks/semaphores. It handles the serialization (pickling) and deserialization of objects, making it a thread- and process-safe way to pass data between processes.
import multiprocessing
def producer(queue, items):
"""Produce items and put them into the queue."""
for item in items:
print(f"Producing {item}")
queue.put(item) # This blocks if the queue is full
queue.put(None) # Sentinel value to signal completion
def consumer(queue):
"""Consume items from the queue until a sentinel is found."""
while True:
item = queue.get() # This blocks if the queue is empty
if item is None:
break # Exit on sentinel value
print(f"Consuming {item}")
print("Consumer done.")
if __name__ == '__main__':
# Create a shared queue
shared_queue = multiprocessing.Queue(maxsize=3) # Limit size to demonstrate blocking
# Create processes
prod = multiprocessing.Process(target=producer, args=(shared_queue, [1, 2, 3, 4, 5]))
cons = multiprocessing.Process(target=consumer, args=(shared_queue,))
prod.start()
cons.start()
prod.join()
cons.join()
print("Producer-consumer example finished.")
Why this works: The Queue object is created in the main process’s memory space. When passed as an argument to the child processes, the multiprocessing module transparently sets up the necessary plumbing (a pipe) so that the objects can be transferred. The get() and put() methods automatically handle blocking and synchronization. The sentinel value (None) is a common pattern to cleanly signal the end of a data stream.
Low-Level Communication with Pipe
A Pipe() returns a pair of connection objects connected by a pipe which by default is duplex (two-way). It offers a more lightweight and flexible communication channel than Queue but requires the programmer to manage the communication protocol (e.g., who sends and who receives) and synchronization more explicitly.
import multiprocessing
def sender(conn, messages):
"""Send messages through the connection."""
for message in messages:
conn.send(message)
print(f"Sent: {message}")
conn.close() # Important: close the sender's end
def receiver(conn):
"""Receive messages from the connection until it's closed."""
while True:
try:
message = conn.recv() # Blocks until data is available
print(f"Received: {message}")
except EOFError: # Raised when the other end closes the connection
print("No more data. Exiting.")
break
if __name__ == '__main__':
# Create a pipe, returns two ends: parent_conn and child_conn
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=receiver, args=(child_conn,))
p.start()
# Use the parent connection in the main process to send data
sender(parent_conn, ['Hello', 42, {'key': 'value'}])
parent_conn.close() # Close our end after sending
p.join()
Why this works: The Pipe establishes a direct OS-level pipe. Data sent with conn.send() is pickled, transmitted through the pipe, and unpickled on the other side with conn.recv(). The EOFError is the standard way to detect when the sending end has been closed, providing a clean termination mechanism. Forgetting to close connections is a common pitfall that can leave processes hanging.
Common Pitfalls and Best Practices
- Avoid Shared State: The primary goal is to avoid sharing state. Use
QueueandPipefor communication instead. If you must share state, useValueorArrayfrom themultiprocessingmodule, which use shared memory, but be prepared to use locks to synchronize access. - Joining Zombie Processes: Always call
join()(or setdaemon=Trueand accept that they may be terminated abruptly) on your processes. This ensures resources are cleaned up and prevents “zombie” processes. - Picklability: Objects passed through
QueueorPipemust be picklable. Lambdas, nested functions, and objects of certain types cannot be pickled and will cause errors. - Deadlocks: A major risk with queues. If a
put()is called on a full queue (if amaxsizeis set) or aget()on an empty queue, the operation will block indefinitely. Design your program’s logic to avoid this, often by using multiple processes or threads to handle production and consumption concurrently. - The
if __name__ == '__main__'Guard: This is not optional for Windows and is considered a best practice for all platforms. It ensures that the process creation code is only executed in the main module and not in each child process during import on Windows.