How to Check if a Thread Started and Synchronize in Python
Multi-threaded applications are non-deterministic: you cannot predict exactly when a thread will start or finish. To build reliable concurrent programs, you need synchronization primitives rather than relying on sleep() or simple status checks.
This guide covers thread status monitoring, proper synchronization techniques, and common patterns for coordinating threads.
Checking Thread Status with is_alive()
Use is_alive() to query whether a thread object is currently executing.
import threading
import time
def worker():
print("Worker running...")
time.sleep(1)
t = threading.Thread(target=worker)
print(f"Before start: {t.is_alive()}") # False
t.start()
print(f"After start: {t.is_alive()}") # True (usually)
t.join()
print(f"After join: {t.is_alive()}") # False
is_alive() is only a status check. It does not pause your code or guarantee the thread has reached a specific point in its execution. For coordination, use proper synchronization primitives.
Synchronization with threading.Event
To make one thread wait until another thread signals readiness, use an Event. This is a thread-safe boolean flag.
from threading import Thread, Event
import time
def database_connection(ready_flag):
print("Connecting to database...")
time.sleep(2) # Simulate connection time
print("Database connected!")
ready_flag.set() # Signal that connection is ready
# Create the event flag
db_ready = Event()
# Start the connection thread
connection_thread = Thread(target=database_connection, args=(db_ready,))
connection_thread.start()
# Main thread waits here until set() is called
print("Application waiting for database...")
db_ready.wait()
print("Application starting with database!")
Event with Timeout
You can specify a maximum wait time to prevent indefinite blocking:
from threading import Thread, Event
import time
def slow_service(ready_flag):
time.sleep(5)
ready_flag.set()
ready = Event()
Thread(target=slow_service, args=(ready,)).start()
# Wait maximum 2 seconds
if ready.wait(timeout=2):
print("Service is ready")
else:
print("Timeout: Service took too long")
Resettable Events
Events can be reset and reused for recurring signals:
from threading import Thread, Event
import time
def periodic_worker(stop_flag, tick_event):
count = 0
while not stop_flag.is_set():
count += 1
print(f"Tick {count}")
tick_event.set() # Signal tick occurred
tick_event.clear() # Reset for next tick
time.sleep(1)
stop = Event()
tick = Event()
worker = Thread(target=periodic_worker, args=(stop, tick))
worker.start()
# Wait for 3 ticks
for i in range(3):
tick.wait()
print(f"Main thread received tick {i + 1}")
stop.set()
worker.join()
Coordinating Multiple Threads with Barrier
When multiple threads must all reach a synchronization point before any can proceed, use a Barrier.
from threading import Barrier, Thread
import time
import random
def racer(barrier, name):
prep_time = random.uniform(0.5, 2.0)
print(f"{name} preparing... ({prep_time:.1f}s)")
time.sleep(prep_time)
print(f"{name} ready at starting line")
barrier.wait() # Wait for all racers
print(f"{name} GO!")
# Create barrier for 3 threads
start_line = Barrier(3)
threads = [
Thread(target=racer, args=(start_line, "Alice")),
Thread(target=racer, args=(start_line, "Bob")),
Thread(target=racer, args=(start_line, "Charlie")),
]
for t in threads:
t.start()
for t in threads:
t.join()
Barriers are reusable. After all threads pass the barrier, it resets automatically and can synchronize the same threads again.
Protecting Shared Resources with Lock
When multiple threads access shared data, use a Lock to prevent race conditions.
from threading import Thread, Lock
import time
class Counter:
def __init__(self):
self.value = 0
self.lock = Lock()
def increment(self):
with self.lock: # Only one thread can execute this block
current = self.value
time.sleep(0.001) # Simulate processing
self.value = current + 1
counter = Counter()
def worker(n):
for _ in range(100):
counter.increment()
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final count: {counter.value}") # Always 1000
Waiting for Thread Completion with join()
The join() method blocks until a thread terminates:
from threading import Thread
import time
def task(name, duration):
print(f"{name} starting")
time.sleep(duration)
print(f"{name} finished")
threads = [
Thread(target=task, args=("Fast", 1)),
Thread(target=task, args=("Slow", 3)),
]
for t in threads:
t.start()
print("Waiting for all threads...")
for t in threads:
t.join()
print("All threads completed")
Join with Timeout
from threading import Thread
import time
def long_task():
time.sleep(10)
t = Thread(target=long_task)
t.start()
t.join(timeout=2)
if t.is_alive():
print("Thread still running after timeout")
else:
print("Thread completed")
Common Anti-Patterns
Using time.sleep() to wait for threads is unreliable and wastes resources.
# import time
# from threading import Thread
# t = Thread(target=long_task)
# ⛔️ BAD: Unreliable and slow
t.start()
time.sleep(2) # Hope the thread is ready by now
do_something()
# ✅ GOOD: Proper synchronization
ready = Event()
t.start()
ready.wait() # Blocks until thread signals
do_something()
Summary
| Tool | Purpose | Use Case |
|---|---|---|
is_alive() | Status polling | Logging, debugging, monitoring |
Event | One-time or resettable signal | "Wait until ready" patterns |
Lock | Mutual exclusion | Protecting shared resources |
Barrier | Group synchronization | "Wait for everyone" patterns |
join() | Wait for completion | Ensuring threads finish before continuing |
Never use time.sleep() to coordinate threads. Use threading.Event for signaling between threads, Lock for protecting shared data, and join() for waiting on thread completion. Reserve is_alive() for status monitoring only.