How to Implement K-Nearest Neighbors (KNN) from Scratch in Python
The K-Nearest Neighbors (KNN) algorithm is one of the simplest and most intuitive classification algorithms in machine learning. Given a new data point, KNN finds the k closest points in the training set and assigns the class that appears most frequently among those neighbors. This guide walks through a complete implementation from scratch: reading data, calculating distances, classifying new items, and evaluating accuracy.
How KNN Worksā
The algorithm follows these steps for each new item to classify:
- Calculate the distance between the new item and every item in the training set
- Select the k closest neighbors
- Count which class appears most often among those neighbors
- Assign that class to the new item
New Item: Height=1.74, Weight=67, Age=22
Training Data:
Programmer (distance: 2.3) ā neighbor
Builder (distance: 8.1)
Scientist (distance: 5.7)
Programmer (distance: 1.8) ā neighbor
Builder (distance: 3.5) ā neighbor
k=3 ā 2 Programmers, 1 Builder ā Classification: Programmer
Sample Data Formatā
Create a file named data.csv with comma-separated features and a class label:
Height,Weight,Age,Class
1.70,65,20,Programmer
1.90,85,33,Builder
1.78,76,31,Builder
1.73,74,24,Programmer
1.81,75,35,Builder
1.73,70,75,Scientist
1.80,71,63,Scientist
1.75,69,25,Programmer
Reading and Parsing the Dataā
The first step is loading the CSV data into a list of dictionaries, where each dictionary represents one item with its features and class:
from random import shuffle
def read_data(file_name):
with open(file_name, 'r') as f:
lines = f.read().splitlines()
if not lines:
return []
# Extract feature names (all columns except the last one)
features = lines[0].split(',')[:-1]
items = []
for i in range(1, len(lines)):
line = lines[i]
if not line: continue # FIX: Skip empty lines to prevent crashes
values = line.split(',')
item = {'Class': values[-1]}
try:
for j in range(len(features)):
item[features[j]] = float(values[j])
items.append(item)
except ValueError:
continue # Skip lines that can't be parsed
shuffle(items)
return items
print(read_data("data.csv"))
Each item becomes a dictionary like:
[
{'Class': 'Builder', 'Height': 1.78, 'Weight': 76.0, 'Age': 31.0},
{'Class': 'Builder', 'Height': 1.9, 'Weight': 85.0, 'Age': 33.0},
{'Class': 'Programmer', 'Height': 1.7, 'Weight': 65.0, 'Age': 20.0},
{'Class': 'Builder', 'Height': 1.81, 'Weight': 75.0, 'Age': 35.0},
{'Class': 'Scientist', 'Height': 1.73, 'Weight': 70.0, 'Age': 75.0},
{'Class': 'Programmer', 'Height': 1.75, 'Weight': 69.0, 'Age': 25.0},
{'Class': 'Programmer', 'Height': 1.73, 'Weight': 74.0, 'Age': 24.0},
{'Class': 'Scientist', 'Height': 1.8, 'Weight': 71.0, 'Age': 63.0}
]
Shuffling the data after reading is a safety measure. If the data is ordered by class (e.g., all Programmers first, then all Builders), this ordering could bias the results during cross-validation.
Calculating Euclidean Distanceā
The Euclidean distance between two points in n-dimensional space is:
d = sqrt((x1 - y1)² + (x2 - y2)² + ... + (xn - yn)²)
import math
def euclidean_distance(x, y):
"""Calculate the Euclidean distance between two feature dictionaries."""
squared_sum = 0
for key in x:
if key in y:
squared_sum += (x[key] - y[key]) ** 2
return math.sqrt(squared_sum)
The function only receives feature dictionaries (without the 'Class' key), so it computes distance across all numerical features.
Building the Classifierā
Updating the Neighbor Listā
As we iterate through the training data, we maintain a sorted list of the k closest neighbors:
def update_neighbors(neighbors, item, distance, k):
if len(neighbors) < k:
# List not full yet: add and sort
neighbors.append([distance, item['Class']])
neighbors.sort()
elif distance < neighbors[-1][0]:
# New item is closer than the farthest current neighbor
neighbors[-1] = [distance, item['Class']]
neighbors.sort()
return neighbors
The list is sorted by distance (ascending). Thus, neighbors[-1] always holds the farthest neighbor. When a closer item is found, it replaces the farthest one.
The Main Classification Functionā
def classify(new_item, k, training_data):
if k > len(training_data):
raise ValueError("k exceeds training set size")
neighbors = []
for item in training_data:
# Create a dict of features only (excluding 'Class')
features = {key: val for key, val in item.items() if key != 'Class'}
distance = euclidean_distance(new_item, features)
neighbors = update_neighbors(neighbors, item, distance, k)
counts = {}
for i in range(len(neighbors)):
cls = neighbors[i][1]
counts[cls] = counts.get(cls, 0) + 1
if not counts:
return None, 0
best_class = max(counts, key=counts.get)
return best_class, counts[best_class]
Classifying a New Itemā
# Load the data
items = read_data('data.csv')
# Define a new item to classify
new_item = {'Height': 1.74, 'Weight': 67, 'Age': 22}
# Classify with k=3
result, confidence = classify(new_item, 3, items)
print(f"Classification: {result} (appeared {confidence} times in {3} neighbors)")
Output:
Classification: Programmer (appeared 2 times in 3 neighbors)
Evaluating Accuracy with K-Fold Cross-Validationā
To measure how well the algorithm performs, use k-fold cross-validation. This means splitting the data into K groups, training on K-1 groups, and testing on the remaining group:
def k_fold_validation(K, k, items):
if K > len(items):
return -1
correct = 0
total = 0
n = len(items)
for i in range(K):
# FIX: Calculate start/end indices dynamically to handle odd dataset sizes
# Your previous logic (len // K) dropped items if they didn't divide evenly.
start = int(i * n / K)
end = int((i + 1) * n / K)
test_set = items[start:end]
training_set = items[:start] + items[end:]
for item in test_set:
features = {key: val for key, val in item.items() if key != 'Class'}
# We assume the 'item' in test_set has the correct Class label
predicted, _ = classify(features, k, training_set)
if predicted == item['Class']:
correct += 1
total += 1
return correct / total if total > 0 else 0
def evaluate(K, k, items, iterations=10):
"""Run cross-validation multiple times and return average accuracy."""
total_accuracy = 0
for _ in range(iterations):
shuffle(items)
total_accuracy += k_fold_validation(K, k, items)
avg = total_accuracy / iterations
print(f"Accuracy: {avg:.4f}")
return avg
Complete Codeā
import math
from random import shuffle
def read_data(file_name):
with open(file_name, 'r') as f:
lines = f.read().splitlines()
if not lines:
return []
# Extract feature names (all columns except the last one)
features = lines[0].split(',')[:-1]
items = []
for i in range(1, len(lines)):
line = lines[i]
if not line: continue # FIX: Skip empty lines to prevent crashes
values = line.split(',')
item = {'Class': values[-1]}
try:
for j in range(len(features)):
item[features[j]] = float(values[j])
items.append(item)
except ValueError:
continue # Skip lines that can't be parsed
shuffle(items)
return items
def euclidean_distance(x, y):
squared_sum = 0
for key in x:
if key in y:
squared_sum += (x[key] - y[key]) ** 2
return math.sqrt(squared_sum)
def update_neighbors(neighbors, item, distance, k):
if len(neighbors) < k:
neighbors.append([distance, item['Class']])
neighbors.sort()
elif distance < neighbors[-1][0]:
# FIX: Only replace if new distance is strictly smaller than the farthest neighbor
neighbors[-1] = [distance, item['Class']]
neighbors.sort()
return neighbors
def classify(new_item, k, training_data):
if k > len(training_data):
raise ValueError("k exceeds training set size")
neighbors = []
for item in training_data:
# Create a dict of features only (excluding 'Class')
features = {key: val for key, val in item.items() if key != 'Class'}
distance = euclidean_distance(new_item, features)
neighbors = update_neighbors(neighbors, item, distance, k)
counts = {}
for i in range(len(neighbors)):
cls = neighbors[i][1]
counts[cls] = counts.get(cls, 0) + 1
if not counts:
return None, 0
best_class = max(counts, key=counts.get)
return best_class, counts[best_class]
def k_fold_validation(K, k, items):
if K > len(items):
return -1
correct = 0
total = 0
n = len(items)
for i in range(K):
# FIX: Calculate start/end indices dynamically to handle odd dataset sizes
# Your previous logic (len // K) dropped items if they didn't divide evenly.
start = int(i * n / K)
end = int((i + 1) * n / K)
test_set = items[start:end]
training_set = items[:start] + items[end:]
for item in test_set:
features = {key: val for key, val in item.items() if key != 'Class'}
# We assume the 'item' in test_set has the correct Class label
predicted, _ = classify(features, k, training_set)
if predicted == item['Class']:
correct += 1
total += 1
return correct / total if total > 0 else 0
def evaluate(K, k, items, iterations=100):
total_accuracy = 0
for _ in range(iterations):
shuffle(items)
total_accuracy += k_fold_validation(K, k, items)
avg = total_accuracy / iterations
print(f"Accuracy: {avg:.4f}")
return avg
if __name__ == '__main__':
items = read_data('data.csv')
# Ensure we have enough data to run
if len(items) > 0:
# Classify a new item
new_item = {'Height': 1.74, 'Weight': 67, 'Age': 22}
result, count = classify(new_item, 3, items)
print(f"New item classified as: {result}")
# Evaluate accuracy
# Note: If you only have 8 items, K cannot be greater than 8
folds = 5 if len(items) >= 5 else len(items)
evaluate(folds, 3, items, 100)
else:
print("No data loaded. Check data.csv")
Output:
New item classified as: Programmer
Accuracy: 0.6250
The accuracy may vary between runs due to the random shuffling. Running more iterations in evaluate() produces a more stable average. Note that with only 8 data points in our sample dataset, accuracy will be limited.
Common Mistake: Not Normalizing Featuresā
When features have different scales (e.g., Height in meters vs. Weight in kilograms), larger-scale features dominate the distance calculation:
# Height: 1.70 vs 1.90 ā difference of 0.20
# Weight: 65 vs 85 ā difference of 20.0
# Age: 20 vs 33 ā difference of 13.0
Weight and Age differences are 100x and 65x larger than Height, so Height has almost no influence on the classification.
The correct approach is to normalize features to a common scale before calculating distances:
def normalize(items, features):
"""Normalize feature values to the range [0, 1]."""
min_vals = {f: min(item[f] for item in items) for f in features}
max_vals = {f: max(item[f] for item in items) for f in features}
for item in items:
for f in features:
if max_vals[f] - min_vals[f] > 0:
item[f] = (item[f] - min_vals[f]) / (max_vals[f] - min_vals[f])
return items, min_vals, max_vals
Always normalize or standardize your features before applying KNN. Without normalization, features with larger numeric ranges will disproportionately influence the distance calculation, effectively ignoring smaller-scale features.
Complexityā
| Operation | Time Complexity |
|---|---|
| Classifying one item | O(n Ć d) where n = training set size, d = number of features |
| K-fold validation | O(K à n² à d) |
KNN is computationally expensive for large datasets because it calculates distances to every training point for each classification. For production use, consider optimized libraries like scikit-learn which use spatial data structures (KD-trees, Ball trees) to speed up neighbor searches.