Beyond the basic Lock and RLock, the threading module provides several higher-level synchronization primitives that allow for more complex coordination between threads. These tools—Event, Condition, and Semaphore—enable patterns like signaling, waiting for specific state changes, and controlling access to a limited pool of resources.

The Event Object

An Event is a simple but powerful communication mechanism between threads. It manages an internal flag that can be set to True with set() or reset to False with clear(). Other threads can wait for the flag to be set using wait(). The key feature is that any number of threads blocked on wait() will all be awakened immediately when another thread calls set().

This makes an Event ideal for one-time notifications, such as signaling that a resource is now available, a configuration has been loaded, or that it’s time for all worker threads to shut down.

import threading
import time

# Shared event object
data_loaded_event = threading.Event()

def load_configuration():
    print("Worker thread: Loading large configuration...")
    time.sleep(2)  # Simulate a long I/O operation
    print("Worker thread: Configuration loaded!")
    data_loaded_event.set()  # Signal that the data is ready

def process_data():
    print("Main thread: Waiting for configuration to be loaded...")
    data_loaded_event.wait()  # Block until the event is set
    print("Main thread: Processing data with the new configuration!")

# Create and start the worker thread
loader_thread = threading.Thread(target=load_configuration)
loader_thread.start()

# Do some other work on the main thread
process_data()

loader_thread.join()

Common Pitfall: A common mistake is reusing an Event without calling clear(). Once an Event is set, all subsequent calls to wait() will return immediately. If you need a reusable notification mechanism, you should use a Condition object instead.

The Condition Object

A Condition is a more advanced and versatile synchronization primitive that combines a lock (it owns an RLock by default) with the signaling functionality of an Event. It is used when a thread needs to wait for a specific state change in shared data, not just a simple signal.

The typical pattern involves:

  1. Acquiring the Condition’s lock (via with statement).
  2. Checking some shared state in a loop. If the state isn’t as desired, call wait(), which releases the lock and blocks the thread.
  3. When another thread changes the state, it calls notify() or notify_all() while holding the lock. This wakes up the waiting thread(s).
  4. The waiting thread re-acquires the lock upon waking and re-checks the condition in the loop to guard against spurious wakeups.
import threading
import collections

class TaskQueue:
    def __init__(self, max_size):
        self._queue = collections.deque()
        self._lock = threading.RLock()
        self._not_empty = threading.Condition(self._lock)
        self._not_full = threading.Condition(self._lock)
        self._max_size = max_size

    def put(self, task):
        with self._not_full:  # Acquires the internal RLock
            # Wait until the queue is not full
            while len(self._queue) >= self._max_size:
                self._not_full.wait()
            self._queue.append(task)
            self._not_empty.notify()  # Notify a waiting consumer

    def get(self):
        with self._not_empty:  # Acquires the same internal RLock
            # Wait until the queue is not empty
            while not self._queue:
                self._not_empty.wait()
            task = self._queue.popleft()
            self._not_full.notify()  # Notify a waiting producer
            return task

# Usage example with producer and consumer threads
def producer(queue):
    for i in range(10):
        queue.put(f"Task_{i}")
        print(f"Produced: Task_{i}")

def consumer(queue):
    for _ in range(10):
        task = queue.get()
        print(f"Consumed: {task}")

queue = TaskQueue(max_size=2)
prod_thread = threading.Thread(target=producer, args=(queue,))
cons_thread = threading.Thread(target=consumer, args=(queue,))

prod_thread.start()
cons_thread.start()
prod_thread.join()
cons_thread.join()

Why the loop? The condition check is in a while loop to protect against spurious wakeups—a situation where a thread might wake up from wait() without having been notified. This is a rare but documented behavior in some systems. The loop ensures the thread only proceeds when the condition it is waiting for is genuinely met.

The Semaphore Object

A Semaphore is a counter that controls access to a resource pool of finite size. It is initialized with a value n. Each call to acquire() decrements the counter. If the counter is zero, acquire() blocks until a different thread calls release(), which increments the counter. A BoundedSemaphore is a variant that will raise a ValueError if its value is incremented past its initial value, helping to catch bugs where release() is called too many times.

Semaphores are perfect for limiting concurrency, such as controlling the number of threads that can simultaneously access a database connection pool or perform a CPU-intensive operation.

import threading
import time

# A semaphore to limit database connections to 3
db_semaphore = threading.BoundedSemaphore(3)

def database_query(user_id):
    # Acquire a permit. Block if all 3 are in use.
    db_semaphore.acquire()
    try:
        print(f"User {user_id} is querying the database...")
        time.sleep(1)  # Simulate a database operation
        print(f"User {user_id} has finished.")
    finally:
        # Always release the permit in a finally block
        db_semaphore.release()

# Simulate 10 users trying to connect simultaneously
user_threads = []
for user_id in range(10):
    thread = threading.Thread(target=database_query, args=(user_id,))
    thread.start()
    user_threads.append(thread)

for thread in user_threads:
    thread.join()

Best Practice: Always use a try...finally block when working with Semaphore.acquire() to guarantee the semaphore is released even if an exception occurs within the critical section. Failure to do so will cause a “semaphore leak,” permanently reducing the size of your resource pool and eventually deadlocking your application. For simpler cases, the with statement handles this automatically.