How to Implement Queues in Multi-Threading in Python
Combining queues with multi-threading is one of the most powerful patterns in Python for concurrent task processing. Queues provide a thread-safe way to distribute work across multiple threads, ensuring tasks are processed in order without race conditions. This pattern is commonly used in web scraping, file processing, API request handling, data pipelines, and any scenario where work can be parallelized.
In this guide, you will learn how queues and threads work together, understand the architecture behind the pattern, and implement it step by step with practical examples.
Prerequisites
Before diving in, familiarity with these concepts will help:
- Threading basics: creating and starting threads in Python.
- Queue data structure: FIFO (First-In-First-Out) ordering.
- Thread safety: why shared resources need synchronization.
Understanding the Pattern
Think of a restaurant kitchen:
- Orders (Queue): Customer orders arrive and are placed in a queue.
- Chefs (Threads): Multiple chefs work simultaneously, each picking up the next order.
- Cooking (Work Function): Each chef prepares the dish independently.
The queue ensures orders are processed in sequence, while multiple chefs (threads) work in parallel to get everything done faster.
Architecture Overview
┌─────────────────────────────────────────────┐
│ QUEUE │
│ [Task 1] [Task 2] [Task 3] ... [Task N] │
└─────────┬─────── ───┬──────────┬─────────────┘
│ │ │
┌────▼──┐ ┌────▼──┐ ┌───▼───┐
│Thread1│ │Thread2│ │Thread3│
│Worker │ │Worker │ │Worker │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
do_work() do_work() do_work()
Step-by-Step Implementation
The implementation follows four clear steps:
- Create a queue filled with tasks (work items).
- Spawn daemon threads pointed at a worker function.
- Define the worker function that pulls tasks from the queue.
- Define the work function that performs the actual task.
Basic Example: Parallel Number Processing
import queue
import threading
NUMBER_OF_THREADS = 4
# Step 4: The actual work to be done
def do_work(number):
"""Process a single task: in this case, just print it."""
print(f"{threading.current_thread().name}: processing {number}")
# Step 3: Worker function that dispatches tasks to threads
def worker(task_queue):
"""Continuously pull tasks from the queue and execute them."""
while True:
task = task_queue.get() # Block until a task is available
do_work(task) # Perform the work
task_queue.task_done() # Signal that this task is complete
if __name__ == "__main__":
# Step 1: Create the task queue
task_queue = queue.Queue()
for i in range(20):
task_queue.put(i)
# Step 2: Spawn worker threads
for i in range(NUMBER_OF_THREADS):
thread = threading.Thread(
name=f"Worker-{i}",
target=worker,
args=(task_queue,),
daemon=True, # Thread dies when main program exits
)
thread.start()
print(f"Started {thread.name}")
# Wait for all tasks to complete
task_queue.join()
print("\nAll tasks completed!")
Output (order varies between runs):
Started Worker-0
Started Worker-1
Started Worker-2
Started Worker-3
Worker-0: processing 0
Worker-1: processing 1
Worker-2: processing 2
Worker-3: processing 3
Worker-0: processing 4
Worker-1: processing 5
...
Worker-2: processing 19
All tasks completed!
Understanding the Key Components
-
queue.Queue(): A thread-safe FIFO queue. Multiple threads can safely call.get()and.put()simultaneously without data corruption. -
task_queue.get(): Blocks the calling thread until an item is available. This means idle threads wait efficiently without consuming CPU. -
task_queue.task_done(): Signals that a previously retrieved task has been completed. This is required fortask_queue.join()to work correctly. -
task_queue.join(): Blocks the main thread until all items in the queue have been retrieved and marked as done viatask_done().
Why daemon threads? Daemon threads are automatically terminated when the main program exits. Without daemon=True, the worker threads would run indefinitely (because of the while True loop), and the program would never exit. Setting them as daemons means they are killed once task_queue.join() completes and the main thread finishes.
Practical Example: Concurrent File Writing
Here's a more realistic example that writes data to multiple files simultaneously using threads:
import queue
import threading
# Thread lock to prevent simultaneous file writes
write_lock = threading.Lock()
def write_to_file(filename, numbers):
"""Write a list of numbers to a file (thread-safe)."""
thread_name = threading.current_thread().name
with write_lock:
print(f"{thread_name}: writing {len(numbers)} numbers to {filename}")
with open(filename, "w") as f:
for num in numbers:
f.write(f"{num}\n")
with write_lock:
print(f"{thread_name}: finished writing {filename}")
def worker(task_queue):
"""Pull file-writing tasks from the queue."""
while True:
filename, numbers = task_queue.get()
write_to_file(filename, numbers)
task_queue.task_done()
if __name__ == "__main__":
task_queue = queue.Queue()
# Each task is a (filename, data) tuple
tasks = [
("output_1.txt", list(range(0, 100))),
("output_2.txt", list(range(100, 200))),
("output_3.txt", list(range(200, 300))),
("output_4.txt", list(range(300, 400))),
]
for task in tasks:
task_queue.put(task)
# Spawn 4 worker threads
for i in range(4):
thread = threading.Thread(
name=f"FileWriter-{i}",
target=worker,
args=(task_queue,),
daemon=True,
)
thread.start()
task_queue.join()
print("All files written successfully!")
Output:
FileWriter-0: writing 100 numbers to output_1.txt
FileWriter-1: writing 100 numbers to output_2.txt
FileWriter-2: writing 100 numbers to output_3.txt
FileWriter-3: writing 100 numbers to output_4.txt
FileWriter-0: finished writing output_1.txt
FileWriter-1: finished writing output_2.txt
FileWriter-2: finished writing output_3.txt
FileWriter-3: finished writing output_4.txt
All files written successfully!
Thread locks (threading.Lock()) are essential when multiple threads access shared resources (like printing to the console or writing to the same file). Without locks, output can become garbled or data can be corrupted. In this example, each thread writes to a different file, so the lock is primarily used for clean console output.
Advanced Example: Producer-Consumer Pattern
A common multi-threading pattern is producer-consumer: where one or more threads generate tasks while others process them:
import queue
import threading
import time
import random
def producer(task_queue, num_tasks):
"""Generate tasks and add them to the queue."""
for i in range(num_tasks):
task = f"Task-{i}"
task_queue.put(task)
print(f"Producer: created {task}")
time.sleep(random.uniform(0.01, 0.05)) # Simulate task creation time
print("Producer: all tasks created")
def consumer(task_queue):
"""Process tasks from the queue."""
while True:
task = task_queue.get()
thread_name = threading.current_thread().name
print(f" {thread_name}: processing {task}")
time.sleep(random.uniform(0.02, 0.1)) # Simulate work
print(f" {thread_name}: completed {task}")
task_queue.task_done()
if __name__ == "__main__":
task_queue = queue.Queue(maxsize=5) # Limit queue size to 5
# Start consumer threads
for i in range(3):
thread = threading.Thread(
name=f"Consumer-{i}",
target=consumer,
args=(task_queue,),
daemon=True,
)
thread.start()
# Start producer thread
producer_thread = threading.Thread(
target=producer,
args=(task_queue, 10),
)
producer_thread.start()
producer_thread.join() # Wait for producer to finish creating tasks
task_queue.join() # Wait for all tasks to be processed
print("\nAll tasks produced and consumed!")
Output:
Producer: created Task-0 Consumer-0: processing Task-0
Producer: created Task-1 Consumer-1: processing Task-1
Consumer-0: completed Task-0
Producer: created Task-2 Consumer-2: processing Task-2
Consumer-0: processing Task-3
Producer: created Task-3
Producer: created Task-4
Consumer-0: completed Task-3
Consumer-0: processing Task-4
Producer: created Task-5
Consumer-1: completed Task-1
Consumer-1: processing Task-5
Consumer-2: completed Task-2
Producer: created Task-6 Consumer-2: processing Task-6
Consumer-0: completed Task-4
Consumer-0: processing Task-7
Producer: created Task-7
Producer: created Task-8
Consumer-1: completed Task-5
Consumer-1: processing Task-8
Consumer-1: completed Task-8Producer: created Task-9
Consumer-1: processing Task-9
Consumer-2: completed Task-6
Producer: all tasks created
Consumer-0: completed Task-7
Consumer-1: completed Task-9
All tasks produced and consumed!
Setting maxsize=5 on the queue means that put() will block when the queue is full, preventing the producer from getting too far ahead of the consumers. This is called backpressure and helps manage memory usage in data pipelines.
Error Handling in Threaded Workers
Production code should handle exceptions within worker threads to prevent silent failures:
import queue
import threading
import traceback
def do_work(task):
"""Simulate work that might fail."""
if task % 5 == 0:
raise ValueError(f"Task {task} failed!")
print(f"{threading.current_thread().name}: completed task {task}")
def worker(task_queue, error_queue):
"""Worker with error handling."""
while True:
task = task_queue.get()
try:
do_work(task)
except Exception as e:
error_queue.put((task, str(e)))
print(f"{threading.current_thread().name}: ERROR on task {task}: {e}")
finally:
task_queue.task_done()
if __name__ == "__main__":
task_queue = queue.Queue()
error_queue = queue.Queue() # Collect errors for later review
for i in range(15):
task_queue.put(i)
for i in range(4):
thread = threading.Thread(
target=worker,
args=(task_queue, error_queue),
daemon=True,
)
thread.start()
task_queue.join()
# Review errors
errors = []
while not error_queue.empty():
errors.append(error_queue.get())
print(f"\nCompleted with {len(errors)} errors:")
for task, error in errors:
print(f" Task {task}: {error}")
Output:
Thread-1 (worker): ERROR on task 0: Task 0 failed!
Thread-1 (worker): completed task 1
Thread-1 (worker): completed task 2
Thread-1 (worker): completed task 3
Thread-1 (worker): completed task 4
Thread-1 (worker): ERROR on task 5: Task 5 failed!
Thread-1 (worker): completed task 6
Thread-1 (worker): completed task 7
Thread-1 (worker): completed task 8
Thread-1 (worker): completed task 9
Thread-1 (worker): ERROR on task 10: Task 10 failed!
Thread-1 (worker): completed task 11
Thread-1 (worker): completed task 12
Thread-1 (worker): completed task 13
Thread-1 (worker): completed task 14
Completed with 3 errors:
Task 0: Task 0 failed!
Task 5: Task 5 failed!
Task 10: Task 10 failed!
Exceptions in daemon threads are silently ignored by default: they don't crash the main program or appear in the console unless explicitly caught. Always wrap worker logic in try/except and report errors through an error queue or logging.
Common Mistake: Forgetting task_done()
If you forget to call task_queue.task_done(), the task_queue.join() call will block forever: because it waits for every put() to be matched by a task_done().
Wrong: missing task_done()
def worker_broken(task_queue):
while True:
task = task_queue.get()
do_work(task)
# Missing task_queue.task_done()!
# task_queue.join() will NEVER return: program hangs!
Correct: always call task_done(), even on errors
def worker_correct(task_queue):
while True:
task = task_queue.get()
try:
do_work(task)
finally:
task_queue.task_done() # Always signal completion
Key Concepts Summary
| Concept | Purpose |
|---|---|
queue.Queue() | Thread-safe task storage with FIFO ordering |
queue.put(item) | Add a task to the queue (blocks if maxsize reached) |
queue.get() | Retrieve and remove the next task (blocks if empty) |
queue.task_done() | Signal that a retrieved task has been completed |
queue.join() | Block until all tasks are completed |
daemon=True | Auto-kill threads when the main program exits |
threading.Lock() | Prevent concurrent access to shared resources |
Summary
Using queues with multi-threading in Python provides a clean, thread-safe way to parallelize work. Key takeaways:
- Create a
queue.Queue()to hold tasks: it's inherently thread-safe. - Spawn daemon threads with a worker function that pulls from the queue.
- Always call
task_done()after completing each task to preventjoin()from blocking forever. - Use
task_queue.join()in the main thread to wait for all work to finish. - Handle exceptions in worker threads explicitly: they are silently ignored otherwise.
- Use thread locks when multiple threads access shared resources like files or global variables.
- Set
maxsizeon the queue to apply backpressure in producer-consumer patterns.