Skip to main content

How to Build a Multi-threaded Port Scanner in Python

Port scanning is a fundamental technique used by system administrators to verify security policies and by security professionals to identify open services on a network. While tools like Nmap exist, building your own scanner in Python is an excellent way to understand TCP/IP networking, socket programming, and concurrency.

This guide covers how to build a command-line port scanner that uses multi-threading to scan ranges of ports efficiently.

Understanding the Mechanism

A port scanner works by attempting to establish a TCP handshake with a specific port on a target IP.

  1. Open Port: The target responds with a SYN-ACK packet. The connection succeeds.
  2. Closed/Filtered Port: The target responds with RST (Reset) or drops the packet (Timeout).

Scanning ports one by one (sequentially) is incredibly slow. To make our tool practical, we will use threading to scan hundreds of ports simultaneously.

Step 1: Establishing TCP Connections

We use Python's built-in socket library. The key is to use connect_ex instead of connect.

The Connection Logic:

from socket import AF_INET, SOCK_STREAM, socket

def basic_check(ip, port):
sock = socket(AF_INET, SOCK_STREAM)

# ⛔️ Incorrect: connect() raises an exception on failure,
# making logic messy for scanning. It also lacks a default timeout.
try:
sock.connect((ip, port))
return True
except:
return False
finally:
sock.close()

def robust_check(ip, port):
# ✅ Correct: Using context manager and connect_ex
with socket(AF_INET, SOCK_STREAM) as sock:
# Set a timeout so we don't hang on filtered ports
sock.settimeout(1)

# connect_ex returns 0 if successful, error code otherwise
result = sock.connect_ex((ip, port))
if result == 0:
return True
return False
note

Setting sock.settimeout(1) is crucial. Without it, the scanner will hang for a long time on every firewall-blocked port, making the scan extremely slow.

Step 2: Implementing Concurrency with Queues

To handle threading safely, we use a queue.Queue. The main thread fills the queue with port numbers, and worker threads consume them.

from queue import Queue
import threading

def worker(target_ip, queue):
while not queue.empty():
port = queue.get()

# Call the check function defined previously
if tcp_test(port, target_ip):
print(f"Opened Port: {port}")

queue.task_done()

Step 3: Handling Command Line Arguments

We want to run the tool like this: python scanner.py 127.0.0.1 5000-9000. We use argparse to handle inputs cleanly.

import argparse

# ✅ Correct: defining arguments for host and port range
parser = argparse.ArgumentParser(description='TCP Port Scanner')
parser.add_argument('host', help='Target Host IP')
parser.add_argument('ports', help='Port range (e.g., 50-100)')
args = parser.parse_args()

# Parsing "50-100" into integers
start_port, end_port = map(int, args.ports.split('-'))

Complete Source Code

Here is the complete port_scanner.py script combining sockets, threading, and argument parsing.

import argparse
import threading
from queue import Queue
from socket import AF_INET, gethostbyname, socket, SOCK_STREAM
import sys

def tcp_test(port: int, target_ip: str) -> bool:
"""
Attempts to connect to a specific port.
Returns True if the port is open, False otherwise.
"""
with socket(AF_INET, SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((target_ip, port))
if result == 0:
return True
return False

def worker(target_ip: str, queue: Queue) -> None:
"""
Thread worker function to process ports from the queue.
"""
while not queue.empty():
port = queue.get()
if tcp_test(port, target_ip):
print(f"Opened Port: {port}")
queue.task_done()

def main(host: str, start_port: int, end_port: int) -> None:
print(f"Scanning {host} from port {start_port} to {end_port}...")

try:
target_ip = gethostbyname(host)
except Exception as e:
print(f"Error resolving host: {e}")
return

queue = Queue()

# Fill the queue with the range of ports
for port in range(start_port, end_port + 1):
queue.put(port)

# Spawn threads
# 100 threads allows scanning 100 ports simultaneously
thread_list = []
for _ in range(100):
t = threading.Thread(target=worker, args=(target_ip, queue,))
t.daemon = True
t.start()
thread_list.append(t)

# Wait for the queue to be empty
queue.join()
print("Scanning completed.")

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Multi-threaded TCP Port Scanner')
parser.add_argument('host', help='Host to scan (e.g., 127.0.0.1)')
parser.add_argument('ports', help='Port range to scan (e.g., 5000-9000)')

args = parser.parse_args()

try:
start_port, end_port = map(int, args.ports.split('-'))
main(args.host, start_port, end_port)
except ValueError:
print("Invalid port format. Use format: start-end (e.g., 20-80)")

Running the Scanner

To run the scanner, execute the following command in your terminal:

python port_scanner.py 127.0.0.1 8000-8100

Output:

Scanning 127.0.0.1 from port 8000 to 8100...
Opened Port: 8081
Scanning completed.
warning

Scanning ports on servers you do not own or do not have explicit permission to audit is illegal in many jurisdictions. Only use this tool on your own localhost or authorized infrastructure.

Conclusion

You have successfully built a multi-threaded network tool.

  1. Sockets: You used connect_ex for non-raising connection tests.
  2. Concurrency: You implemented a Queue based worker system to speed up the process significantly.
  3. CLI: You created a user-friendly interface using argparse.