Skip to main content

How to Check if a Thread Started and Synchronize in Python

Multi-threaded applications are non-deterministic: you cannot predict exactly when a thread will start or finish. To build reliable concurrent programs, you need synchronization primitives rather than relying on sleep() or simple status checks.

This guide covers thread status monitoring, proper synchronization techniques, and common patterns for coordinating threads.

Checking Thread Status with is_alive()

Use is_alive() to query whether a thread object is currently executing.

import threading
import time

def worker():
print("Worker running...")
time.sleep(1)

t = threading.Thread(target=worker)

print(f"Before start: {t.is_alive()}") # False
t.start()
print(f"After start: {t.is_alive()}") # True (usually)
t.join()
print(f"After join: {t.is_alive()}") # False
warning

is_alive() is only a status check. It does not pause your code or guarantee the thread has reached a specific point in its execution. For coordination, use proper synchronization primitives.

Synchronization with threading.Event

To make one thread wait until another thread signals readiness, use an Event. This is a thread-safe boolean flag.

from threading import Thread, Event
import time

def database_connection(ready_flag):
print("Connecting to database...")
time.sleep(2) # Simulate connection time
print("Database connected!")
ready_flag.set() # Signal that connection is ready

# Create the event flag
db_ready = Event()

# Start the connection thread
connection_thread = Thread(target=database_connection, args=(db_ready,))
connection_thread.start()

# Main thread waits here until set() is called
print("Application waiting for database...")
db_ready.wait()
print("Application starting with database!")

Event with Timeout

You can specify a maximum wait time to prevent indefinite blocking:

from threading import Thread, Event
import time

def slow_service(ready_flag):
time.sleep(5)
ready_flag.set()

ready = Event()
Thread(target=slow_service, args=(ready,)).start()

# Wait maximum 2 seconds
if ready.wait(timeout=2):
print("Service is ready")
else:
print("Timeout: Service took too long")

Resettable Events

Events can be reset and reused for recurring signals:

from threading import Thread, Event
import time

def periodic_worker(stop_flag, tick_event):
count = 0
while not stop_flag.is_set():
count += 1
print(f"Tick {count}")
tick_event.set() # Signal tick occurred
tick_event.clear() # Reset for next tick
time.sleep(1)

stop = Event()
tick = Event()

worker = Thread(target=periodic_worker, args=(stop, tick))
worker.start()

# Wait for 3 ticks
for i in range(3):
tick.wait()
print(f"Main thread received tick {i + 1}")

stop.set()
worker.join()

Coordinating Multiple Threads with Barrier

When multiple threads must all reach a synchronization point before any can proceed, use a Barrier.

from threading import Barrier, Thread
import time
import random

def racer(barrier, name):
prep_time = random.uniform(0.5, 2.0)
print(f"{name} preparing... ({prep_time:.1f}s)")
time.sleep(prep_time)

print(f"{name} ready at starting line")
barrier.wait() # Wait for all racers

print(f"{name} GO!")

# Create barrier for 3 threads
start_line = Barrier(3)

threads = [
Thread(target=racer, args=(start_line, "Alice")),
Thread(target=racer, args=(start_line, "Bob")),
Thread(target=racer, args=(start_line, "Charlie")),
]

for t in threads:
t.start()

for t in threads:
t.join()
tip

Barriers are reusable. After all threads pass the barrier, it resets automatically and can synchronize the same threads again.

Protecting Shared Resources with Lock

When multiple threads access shared data, use a Lock to prevent race conditions.

from threading import Thread, Lock
import time

class Counter:
def __init__(self):
self.value = 0
self.lock = Lock()

def increment(self):
with self.lock: # Only one thread can execute this block
current = self.value
time.sleep(0.001) # Simulate processing
self.value = current + 1

counter = Counter()

def worker(n):
for _ in range(100):
counter.increment()

threads = [Thread(target=worker, args=(i,)) for i in range(10)]

for t in threads:
t.start()
for t in threads:
t.join()

print(f"Final count: {counter.value}") # Always 1000

Waiting for Thread Completion with join()

The join() method blocks until a thread terminates:

from threading import Thread
import time

def task(name, duration):
print(f"{name} starting")
time.sleep(duration)
print(f"{name} finished")

threads = [
Thread(target=task, args=("Fast", 1)),
Thread(target=task, args=("Slow", 3)),
]

for t in threads:
t.start()

print("Waiting for all threads...")
for t in threads:
t.join()

print("All threads completed")

Join with Timeout

from threading import Thread
import time

def long_task():
time.sleep(10)

t = Thread(target=long_task)
t.start()

t.join(timeout=2)

if t.is_alive():
print("Thread still running after timeout")
else:
print("Thread completed")

Common Anti-Patterns

Avoid Using sleep() for Synchronization

Using time.sleep() to wait for threads is unreliable and wastes resources.

# import time
# from threading import Thread
# t = Thread(target=long_task)

# ⛔️ BAD: Unreliable and slow
t.start()
time.sleep(2) # Hope the thread is ready by now
do_something()

# ✅ GOOD: Proper synchronization
ready = Event()
t.start()
ready.wait() # Blocks until thread signals
do_something()

Summary

ToolPurposeUse Case
is_alive()Status pollingLogging, debugging, monitoring
EventOne-time or resettable signal"Wait until ready" patterns
LockMutual exclusionProtecting shared resources
BarrierGroup synchronization"Wait for everyone" patterns
join()Wait for completionEnsuring threads finish before continuing
Best Practice

Never use time.sleep() to coordinate threads. Use threading.Event for signaling between threads, Lock for protecting shared data, and join() for waiting on thread completion. Reserve is_alive() for status monitoring only.