Skip to main content

How to Implement Queues in Multi-Threading in Python

Combining queues with multi-threading is one of the most powerful patterns in Python for concurrent task processing. Queues provide a thread-safe way to distribute work across multiple threads, ensuring tasks are processed in order without race conditions. This pattern is commonly used in web scraping, file processing, API request handling, data pipelines, and any scenario where work can be parallelized.

In this guide, you will learn how queues and threads work together, understand the architecture behind the pattern, and implement it step by step with practical examples.

Prerequisites

Before diving in, familiarity with these concepts will help:

  • Threading basics: creating and starting threads in Python.
  • Queue data structure: FIFO (First-In-First-Out) ordering.
  • Thread safety: why shared resources need synchronization.

Understanding the Pattern

Think of a restaurant kitchen:

  1. Orders (Queue): Customer orders arrive and are placed in a queue.
  2. Chefs (Threads): Multiple chefs work simultaneously, each picking up the next order.
  3. Cooking (Work Function): Each chef prepares the dish independently.

The queue ensures orders are processed in sequence, while multiple chefs (threads) work in parallel to get everything done faster.

Architecture Overview

┌─────────────────────────────────────────────┐
│ QUEUE │
│ [Task 1] [Task 2] [Task 3] ... [Task N] │
└─────────┬──────────┬──────────┬─────────────┘
│ │ │
┌────▼──┐ ┌────▼──┐ ┌───▼───┐
│Thread1│ │Thread2│ │Thread3│
│Worker │ │Worker │ │Worker │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
do_work() do_work() do_work()

Step-by-Step Implementation

The implementation follows four clear steps:

  1. Create a queue filled with tasks (work items).
  2. Spawn daemon threads pointed at a worker function.
  3. Define the worker function that pulls tasks from the queue.
  4. Define the work function that performs the actual task.

Basic Example: Parallel Number Processing

import queue
import threading

NUMBER_OF_THREADS = 4


# Step 4: The actual work to be done
def do_work(number):
"""Process a single task: in this case, just print it."""
print(f"{threading.current_thread().name}: processing {number}")


# Step 3: Worker function that dispatches tasks to threads
def worker(task_queue):
"""Continuously pull tasks from the queue and execute them."""
while True:
task = task_queue.get() # Block until a task is available
do_work(task) # Perform the work
task_queue.task_done() # Signal that this task is complete


if __name__ == "__main__":
# Step 1: Create the task queue
task_queue = queue.Queue()

for i in range(20):
task_queue.put(i)

# Step 2: Spawn worker threads
for i in range(NUMBER_OF_THREADS):
thread = threading.Thread(
name=f"Worker-{i}",
target=worker,
args=(task_queue,),
daemon=True, # Thread dies when main program exits
)
thread.start()
print(f"Started {thread.name}")

# Wait for all tasks to complete
task_queue.join()
print("\nAll tasks completed!")

Output (order varies between runs):

Started Worker-0
Started Worker-1
Started Worker-2
Started Worker-3
Worker-0: processing 0
Worker-1: processing 1
Worker-2: processing 2
Worker-3: processing 3
Worker-0: processing 4
Worker-1: processing 5
...
Worker-2: processing 19

All tasks completed!

Understanding the Key Components

  • queue.Queue(): A thread-safe FIFO queue. Multiple threads can safely call .get() and .put() simultaneously without data corruption.

  • task_queue.get(): Blocks the calling thread until an item is available. This means idle threads wait efficiently without consuming CPU.

  • task_queue.task_done(): Signals that a previously retrieved task has been completed. This is required for task_queue.join() to work correctly.

  • task_queue.join(): Blocks the main thread until all items in the queue have been retrieved and marked as done via task_done().

info

Why daemon threads? Daemon threads are automatically terminated when the main program exits. Without daemon=True, the worker threads would run indefinitely (because of the while True loop), and the program would never exit. Setting them as daemons means they are killed once task_queue.join() completes and the main thread finishes.

Practical Example: Concurrent File Writing

Here's a more realistic example that writes data to multiple files simultaneously using threads:

import queue
import threading

# Thread lock to prevent simultaneous file writes
write_lock = threading.Lock()


def write_to_file(filename, numbers):
"""Write a list of numbers to a file (thread-safe)."""
thread_name = threading.current_thread().name

with write_lock:
print(f"{thread_name}: writing {len(numbers)} numbers to {filename}")

with open(filename, "w") as f:
for num in numbers:
f.write(f"{num}\n")

with write_lock:
print(f"{thread_name}: finished writing {filename}")


def worker(task_queue):
"""Pull file-writing tasks from the queue."""
while True:
filename, numbers = task_queue.get()
write_to_file(filename, numbers)
task_queue.task_done()


if __name__ == "__main__":
task_queue = queue.Queue()

# Each task is a (filename, data) tuple
tasks = [
("output_1.txt", list(range(0, 100))),
("output_2.txt", list(range(100, 200))),
("output_3.txt", list(range(200, 300))),
("output_4.txt", list(range(300, 400))),
]

for task in tasks:
task_queue.put(task)

# Spawn 4 worker threads
for i in range(4):
thread = threading.Thread(
name=f"FileWriter-{i}",
target=worker,
args=(task_queue,),
daemon=True,
)
thread.start()

task_queue.join()
print("All files written successfully!")

Output:

FileWriter-0: writing 100 numbers to output_1.txt
FileWriter-1: writing 100 numbers to output_2.txt
FileWriter-2: writing 100 numbers to output_3.txt
FileWriter-3: writing 100 numbers to output_4.txt
FileWriter-0: finished writing output_1.txt
FileWriter-1: finished writing output_2.txt
FileWriter-2: finished writing output_3.txt
FileWriter-3: finished writing output_4.txt
All files written successfully!
caution

Thread locks (threading.Lock()) are essential when multiple threads access shared resources (like printing to the console or writing to the same file). Without locks, output can become garbled or data can be corrupted. In this example, each thread writes to a different file, so the lock is primarily used for clean console output.

Advanced Example: Producer-Consumer Pattern

A common multi-threading pattern is producer-consumer: where one or more threads generate tasks while others process them:

import queue
import threading
import time
import random


def producer(task_queue, num_tasks):
"""Generate tasks and add them to the queue."""
for i in range(num_tasks):
task = f"Task-{i}"
task_queue.put(task)
print(f"Producer: created {task}")
time.sleep(random.uniform(0.01, 0.05)) # Simulate task creation time

print("Producer: all tasks created")


def consumer(task_queue):
"""Process tasks from the queue."""
while True:
task = task_queue.get()
thread_name = threading.current_thread().name
print(f" {thread_name}: processing {task}")
time.sleep(random.uniform(0.02, 0.1)) # Simulate work
print(f" {thread_name}: completed {task}")
task_queue.task_done()


if __name__ == "__main__":
task_queue = queue.Queue(maxsize=5) # Limit queue size to 5

# Start consumer threads
for i in range(3):
thread = threading.Thread(
name=f"Consumer-{i}",
target=consumer,
args=(task_queue,),
daemon=True,
)
thread.start()

# Start producer thread
producer_thread = threading.Thread(
target=producer,
args=(task_queue, 10),
)
producer_thread.start()
producer_thread.join() # Wait for producer to finish creating tasks

task_queue.join() # Wait for all tasks to be processed
print("\nAll tasks produced and consumed!")

Output:

Producer: created Task-0  Consumer-0: processing Task-0

Producer: created Task-1 Consumer-1: processing Task-1

Consumer-0: completed Task-0
Producer: created Task-2 Consumer-2: processing Task-2

Consumer-0: processing Task-3
Producer: created Task-3
Producer: created Task-4
Consumer-0: completed Task-3
Consumer-0: processing Task-4
Producer: created Task-5
Consumer-1: completed Task-1
Consumer-1: processing Task-5
Consumer-2: completed Task-2
Producer: created Task-6 Consumer-2: processing Task-6

Consumer-0: completed Task-4
Consumer-0: processing Task-7
Producer: created Task-7
Producer: created Task-8
Consumer-1: completed Task-5
Consumer-1: processing Task-8
Consumer-1: completed Task-8Producer: created Task-9

Consumer-1: processing Task-9
Consumer-2: completed Task-6
Producer: all tasks created
Consumer-0: completed Task-7
Consumer-1: completed Task-9

All tasks produced and consumed!
info

Setting maxsize=5 on the queue means that put() will block when the queue is full, preventing the producer from getting too far ahead of the consumers. This is called backpressure and helps manage memory usage in data pipelines.

Error Handling in Threaded Workers

Production code should handle exceptions within worker threads to prevent silent failures:

import queue
import threading
import traceback


def do_work(task):
"""Simulate work that might fail."""
if task % 5 == 0:
raise ValueError(f"Task {task} failed!")
print(f"{threading.current_thread().name}: completed task {task}")


def worker(task_queue, error_queue):
"""Worker with error handling."""
while True:
task = task_queue.get()
try:
do_work(task)
except Exception as e:
error_queue.put((task, str(e)))
print(f"{threading.current_thread().name}: ERROR on task {task}: {e}")
finally:
task_queue.task_done()


if __name__ == "__main__":
task_queue = queue.Queue()
error_queue = queue.Queue() # Collect errors for later review

for i in range(15):
task_queue.put(i)

for i in range(4):
thread = threading.Thread(
target=worker,
args=(task_queue, error_queue),
daemon=True,
)
thread.start()

task_queue.join()

# Review errors
errors = []
while not error_queue.empty():
errors.append(error_queue.get())

print(f"\nCompleted with {len(errors)} errors:")
for task, error in errors:
print(f" Task {task}: {error}")

Output:

Thread-1 (worker): ERROR on task 0: Task 0 failed!
Thread-1 (worker): completed task 1
Thread-1 (worker): completed task 2
Thread-1 (worker): completed task 3
Thread-1 (worker): completed task 4
Thread-1 (worker): ERROR on task 5: Task 5 failed!
Thread-1 (worker): completed task 6
Thread-1 (worker): completed task 7
Thread-1 (worker): completed task 8
Thread-1 (worker): completed task 9
Thread-1 (worker): ERROR on task 10: Task 10 failed!
Thread-1 (worker): completed task 11
Thread-1 (worker): completed task 12
Thread-1 (worker): completed task 13
Thread-1 (worker): completed task 14

Completed with 3 errors:
Task 0: Task 0 failed!
Task 5: Task 5 failed!
Task 10: Task 10 failed!
caution

Exceptions in daemon threads are silently ignored by default: they don't crash the main program or appear in the console unless explicitly caught. Always wrap worker logic in try/except and report errors through an error queue or logging.

Common Mistake: Forgetting task_done()

If you forget to call task_queue.task_done(), the task_queue.join() call will block forever: because it waits for every put() to be matched by a task_done().

Wrong: missing task_done()

def worker_broken(task_queue):
while True:
task = task_queue.get()
do_work(task)
# Missing task_queue.task_done()!

# task_queue.join() will NEVER return: program hangs!

Correct: always call task_done(), even on errors

def worker_correct(task_queue):
while True:
task = task_queue.get()
try:
do_work(task)
finally:
task_queue.task_done() # Always signal completion

Key Concepts Summary

ConceptPurpose
queue.Queue()Thread-safe task storage with FIFO ordering
queue.put(item)Add a task to the queue (blocks if maxsize reached)
queue.get()Retrieve and remove the next task (blocks if empty)
queue.task_done()Signal that a retrieved task has been completed
queue.join()Block until all tasks are completed
daemon=TrueAuto-kill threads when the main program exits
threading.Lock()Prevent concurrent access to shared resources

Summary

Using queues with multi-threading in Python provides a clean, thread-safe way to parallelize work. Key takeaways:

  1. Create a queue.Queue() to hold tasks: it's inherently thread-safe.
  2. Spawn daemon threads with a worker function that pulls from the queue.
  3. Always call task_done() after completing each task to prevent join() from blocking forever.
  4. Use task_queue.join() in the main thread to wait for all work to finish.
  5. Handle exceptions in worker threads explicitly: they are silently ignored otherwise.
  6. Use thread locks when multiple threads access shared resources like files or global variables.
  7. Set maxsize on the queue to apply backpressure in producer-consumer patterns.