Skip to main content

How to Implement K-Nearest Neighbors (KNN) from Scratch in Python

The K-Nearest Neighbors (KNN) algorithm is one of the simplest and most intuitive classification algorithms in machine learning. Given a new data point, KNN finds the k closest points in the training set and assigns the class that appears most frequently among those neighbors. This guide walks through a complete implementation from scratch: reading data, calculating distances, classifying new items, and evaluating accuracy.

How KNN Works​

The algorithm follows these steps for each new item to classify:

  1. Calculate the distance between the new item and every item in the training set
  2. Select the k closest neighbors
  3. Count which class appears most often among those neighbors
  4. Assign that class to the new item
New Item: Height=1.74, Weight=67, Age=22

Training Data:
Programmer (distance: 2.3) ← neighbor
Builder (distance: 8.1)
Scientist (distance: 5.7)
Programmer (distance: 1.8) ← neighbor
Builder (distance: 3.5) ← neighbor

k=3 → 2 Programmers, 1 Builder → Classification: Programmer

Sample Data Format​

Create a file named data.csv with comma-separated features and a class label:

data.csv
Height,Weight,Age,Class
1.70,65,20,Programmer
1.90,85,33,Builder
1.78,76,31,Builder
1.73,74,24,Programmer
1.81,75,35,Builder
1.73,70,75,Scientist
1.80,71,63,Scientist
1.75,69,25,Programmer

Reading and Parsing the Data​

The first step is loading the CSV data into a list of dictionaries, where each dictionary represents one item with its features and class:

from random import shuffle


def read_data(file_name):
with open(file_name, 'r') as f:
lines = f.read().splitlines()

if not lines:
return []

# Extract feature names (all columns except the last one)
features = lines[0].split(',')[:-1]
items = []

for i in range(1, len(lines)):
line = lines[i]
if not line: continue # FIX: Skip empty lines to prevent crashes

values = line.split(',')
item = {'Class': values[-1]}
try:
for j in range(len(features)):
item[features[j]] = float(values[j])
items.append(item)
except ValueError:
continue # Skip lines that can't be parsed

shuffle(items)
return items

print(read_data("data.csv"))

Each item becomes a dictionary like:

[
{'Class': 'Builder', 'Height': 1.78, 'Weight': 76.0, 'Age': 31.0},
{'Class': 'Builder', 'Height': 1.9, 'Weight': 85.0, 'Age': 33.0},
{'Class': 'Programmer', 'Height': 1.7, 'Weight': 65.0, 'Age': 20.0},
{'Class': 'Builder', 'Height': 1.81, 'Weight': 75.0, 'Age': 35.0},
{'Class': 'Scientist', 'Height': 1.73, 'Weight': 70.0, 'Age': 75.0},
{'Class': 'Programmer', 'Height': 1.75, 'Weight': 69.0, 'Age': 25.0},
{'Class': 'Programmer', 'Height': 1.73, 'Weight': 74.0, 'Age': 24.0},
{'Class': 'Scientist', 'Height': 1.8, 'Weight': 71.0, 'Age': 63.0}
]
tip

Shuffling the data after reading is a safety measure. If the data is ordered by class (e.g., all Programmers first, then all Builders), this ordering could bias the results during cross-validation.

Calculating Euclidean Distance​

The Euclidean distance between two points in n-dimensional space is:

d = sqrt((x1 - y1)² + (x2 - y2)² + ... + (xn - yn)²)
import math


def euclidean_distance(x, y):
"""Calculate the Euclidean distance between two feature dictionaries."""
squared_sum = 0
for key in x:
if key in y:
squared_sum += (x[key] - y[key]) ** 2
return math.sqrt(squared_sum)

The function only receives feature dictionaries (without the 'Class' key), so it computes distance across all numerical features.

Building the Classifier​

Updating the Neighbor List​

As we iterate through the training data, we maintain a sorted list of the k closest neighbors:

def update_neighbors(neighbors, item, distance, k):
if len(neighbors) < k:
# List not full yet: add and sort
neighbors.append([distance, item['Class']])
neighbors.sort()
elif distance < neighbors[-1][0]:
# New item is closer than the farthest current neighbor
neighbors[-1] = [distance, item['Class']]
neighbors.sort()
return neighbors

The list is sorted by distance (ascending). Thus, neighbors[-1] always holds the farthest neighbor. When a closer item is found, it replaces the farthest one.

The Main Classification Function​

def classify(new_item, k, training_data):
if k > len(training_data):
raise ValueError("k exceeds training set size")

neighbors = []
for item in training_data:
# Create a dict of features only (excluding 'Class')
features = {key: val for key, val in item.items() if key != 'Class'}

distance = euclidean_distance(new_item, features)
neighbors = update_neighbors(neighbors, item, distance, k)

counts = {}
for i in range(len(neighbors)):
cls = neighbors[i][1]
counts[cls] = counts.get(cls, 0) + 1

if not counts:
return None, 0

best_class = max(counts, key=counts.get)
return best_class, counts[best_class]

Classifying a New Item​

# Load the data
items = read_data('data.csv')

# Define a new item to classify
new_item = {'Height': 1.74, 'Weight': 67, 'Age': 22}

# Classify with k=3
result, confidence = classify(new_item, 3, items)
print(f"Classification: {result} (appeared {confidence} times in {3} neighbors)")

Output:

Classification: Programmer (appeared 2 times in 3 neighbors)

Evaluating Accuracy with K-Fold Cross-Validation​

To measure how well the algorithm performs, use k-fold cross-validation. This means splitting the data into K groups, training on K-1 groups, and testing on the remaining group:

def k_fold_validation(K, k, items):
if K > len(items):
return -1

correct = 0
total = 0
n = len(items)

for i in range(K):
# FIX: Calculate start/end indices dynamically to handle odd dataset sizes
# Your previous logic (len // K) dropped items if they didn't divide evenly.
start = int(i * n / K)
end = int((i + 1) * n / K)

test_set = items[start:end]
training_set = items[:start] + items[end:]

for item in test_set:
features = {key: val for key, val in item.items() if key != 'Class'}

# We assume the 'item' in test_set has the correct Class label
predicted, _ = classify(features, k, training_set)

if predicted == item['Class']:
correct += 1
total += 1

return correct / total if total > 0 else 0


def evaluate(K, k, items, iterations=10):
"""Run cross-validation multiple times and return average accuracy."""
total_accuracy = 0
for _ in range(iterations):
shuffle(items)
total_accuracy += k_fold_validation(K, k, items)

avg = total_accuracy / iterations
print(f"Accuracy: {avg:.4f}")
return avg

Complete Code​

import math
from random import shuffle

def read_data(file_name):
with open(file_name, 'r') as f:
lines = f.read().splitlines()

if not lines:
return []

# Extract feature names (all columns except the last one)
features = lines[0].split(',')[:-1]
items = []

for i in range(1, len(lines)):
line = lines[i]
if not line: continue # FIX: Skip empty lines to prevent crashes

values = line.split(',')
item = {'Class': values[-1]}
try:
for j in range(len(features)):
item[features[j]] = float(values[j])
items.append(item)
except ValueError:
continue # Skip lines that can't be parsed

shuffle(items)
return items


def euclidean_distance(x, y):
squared_sum = 0
for key in x:
if key in y:
squared_sum += (x[key] - y[key]) ** 2
return math.sqrt(squared_sum)


def update_neighbors(neighbors, item, distance, k):
if len(neighbors) < k:
neighbors.append([distance, item['Class']])
neighbors.sort()
elif distance < neighbors[-1][0]:
# FIX: Only replace if new distance is strictly smaller than the farthest neighbor
neighbors[-1] = [distance, item['Class']]
neighbors.sort()
return neighbors


def classify(new_item, k, training_data):
if k > len(training_data):
raise ValueError("k exceeds training set size")

neighbors = []
for item in training_data:
# Create a dict of features only (excluding 'Class')
features = {key: val for key, val in item.items() if key != 'Class'}

distance = euclidean_distance(new_item, features)
neighbors = update_neighbors(neighbors, item, distance, k)

counts = {}
for i in range(len(neighbors)):
cls = neighbors[i][1]
counts[cls] = counts.get(cls, 0) + 1

if not counts:
return None, 0

best_class = max(counts, key=counts.get)
return best_class, counts[best_class]


def k_fold_validation(K, k, items):
if K > len(items):
return -1

correct = 0
total = 0
n = len(items)

for i in range(K):
# FIX: Calculate start/end indices dynamically to handle odd dataset sizes
# Your previous logic (len // K) dropped items if they didn't divide evenly.
start = int(i * n / K)
end = int((i + 1) * n / K)

test_set = items[start:end]
training_set = items[:start] + items[end:]

for item in test_set:
features = {key: val for key, val in item.items() if key != 'Class'}

# We assume the 'item' in test_set has the correct Class label
predicted, _ = classify(features, k, training_set)

if predicted == item['Class']:
correct += 1
total += 1

return correct / total if total > 0 else 0


def evaluate(K, k, items, iterations=100):
total_accuracy = 0
for _ in range(iterations):
shuffle(items)
total_accuracy += k_fold_validation(K, k, items)

avg = total_accuracy / iterations
print(f"Accuracy: {avg:.4f}")
return avg


if __name__ == '__main__':
items = read_data('data.csv')

# Ensure we have enough data to run
if len(items) > 0:
# Classify a new item
new_item = {'Height': 1.74, 'Weight': 67, 'Age': 22}
result, count = classify(new_item, 3, items)
print(f"New item classified as: {result}")

# Evaluate accuracy
# Note: If you only have 8 items, K cannot be greater than 8
folds = 5 if len(items) >= 5 else len(items)
evaluate(folds, 3, items, 100)
else:
print("No data loaded. Check data.csv")

Output:

New item classified as: Programmer
Accuracy: 0.6250
info

The accuracy may vary between runs due to the random shuffling. Running more iterations in evaluate() produces a more stable average. Note that with only 8 data points in our sample dataset, accuracy will be limited.

Common Mistake: Not Normalizing Features​

When features have different scales (e.g., Height in meters vs. Weight in kilograms), larger-scale features dominate the distance calculation:

# Height: 1.70 vs 1.90 → difference of 0.20
# Weight: 65 vs 85 → difference of 20.0
# Age: 20 vs 33 → difference of 13.0

Weight and Age differences are 100x and 65x larger than Height, so Height has almost no influence on the classification.

The correct approach is to normalize features to a common scale before calculating distances:

def normalize(items, features):
"""Normalize feature values to the range [0, 1]."""
min_vals = {f: min(item[f] for item in items) for f in features}
max_vals = {f: max(item[f] for item in items) for f in features}

for item in items:
for f in features:
if max_vals[f] - min_vals[f] > 0:
item[f] = (item[f] - min_vals[f]) / (max_vals[f] - min_vals[f])

return items, min_vals, max_vals
warning

Always normalize or standardize your features before applying KNN. Without normalization, features with larger numeric ranges will disproportionately influence the distance calculation, effectively ignoring smaller-scale features.

Complexity​

OperationTime Complexity
Classifying one itemO(n Ɨ d) where n = training set size, d = number of features
K-fold validationO(K Ɨ n² Ɨ d)

KNN is computationally expensive for large datasets because it calculates distances to every training point for each classification. For production use, consider optimized libraries like scikit-learn which use spatial data structures (KD-trees, Ball trees) to speed up neighbor searches.