Skip to main content

How to Delete Rows Based on Multiple Conditions in PySpark

PySpark DataFrames are immutable-you cannot modify or delete rows in place. Instead, "deleting" rows means creating a new DataFrame that excludes unwanted records by filtering them out. This guide demonstrates efficient techniques for removing rows based on complex, multi-condition logic while respecting Spark's distributed architecture.

Filtering with Multiple Conditions

Use .filter() or .where() with bitwise operators to combine conditions: & (AND), | (OR), ~ (NOT):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("DeleteRows").getOrCreate()

# Sample inventory data
data = [
(1, "Laptop", 50, "Active"),
(2, "Mouse", 0, "Active"),
(3, "Keyboard", 25, "Discontinued"),
(4, "Monitor", 15, "Active"),
(5, "Webcam", 0, "Discontinued")
]
df = spark.createDataFrame(data, ["id", "product", "stock", "status"])

# "Delete" rows where status is Discontinued OR stock is 0
# Logic: KEEP rows where status is Active AND stock > 0
df_filtered = df.filter(
(col("status") == "Active") & (col("stock") > 0)
)

df_filtered.show()

Output:

+---+-------+-----+------+
| id|product|stock|status|
+---+-------+-----+------+
| 1| Laptop| 50|Active|
| 4|Monitor| 15|Active|
+---+-------+-----+------+
Parentheses Are Mandatory

Each condition must be wrapped in parentheses when using bitwise operators. Missing parentheses cause syntax errors or incorrect logic:

# ❌ Wrong: Missing parentheses
df.filter(col("status") == "Active" & col("stock") > 0)

# ✅ Correct: Each condition wrapped
df.filter((col("status") == "Active") & (col("stock") > 0))

Using NOT Logic for Deletion

Sometimes it's easier to define what to remove, then negate the condition:

from pyspark.sql.functions import col

# Define condition for rows TO DELETE
rows_to_delete = (col("status") == "Discontinued") | (col("stock") == 0)

# Keep everything EXCEPT those rows
df_clean = df.filter(~rows_to_delete)

df_clean.show()

This approach is clearer when deletion logic is complex:

# Remove invalid records (multiple criteria)
invalid_records = (
(col("stock") < 0) | # Negative stock
(col("status") == "Discontinued") | # Discontinued items
(col("product").isNull()) # Missing product name
)

df_valid = df.filter(~invalid_records)

Removing Rows Using isin()

For filtering based on a list of values, use .isin():

from pyspark.sql.functions import col

# IDs to remove
invalid_ids = [2, 3, 5]

# Keep rows NOT in the invalid list
df_filtered = df.filter(~col("id").isin(invalid_ids))

df_filtered.show()

Output:

+---+-------+-----+------+
| id|product|stock|status|
+---+-------+-----+------+
| 1| Laptop| 50|Active|
| 4|Monitor| 15|Active|
+---+-------+-----+------+

For including rows in a list (inverse operation):

# Keep only specific categories
valid_statuses = ["Active", "Pending"]
df_active = df.filter(col("status").isin(valid_statuses))

SQL String Syntax

For complex conditions, SQL syntax can be more readable:

# SQL-style filtering
df_clean = df.filter("stock > 0 AND status = 'Active'")

# With NOT IN
df_filtered = df.filter("id NOT IN (2, 3, 5)")

# Complex conditions
df_valid = df.filter("""
status != 'Discontinued'
AND stock > 0
AND product IS NOT NULL
""")
SQL vs Column Expressions

SQL strings are often clearer for complex logic, while column expressions offer better IDE support and compile-time checking. Choose based on your team's preference and use case.

Removing Nulls and Duplicates

Common deletion patterns for data cleaning:

from pyspark.sql.functions import col

# Remove rows with null in specific columns
df_no_nulls = df.filter(
col("product").isNotNull() & col("stock").isNotNull()
)

# Using na.drop() for null removal
df_cleaned = df.na.drop(subset=["product", "stock"])

# Remove duplicate rows
df_unique = df.dropDuplicates()

# Remove duplicates based on specific columns
df_unique_products = df.dropDuplicates(["product"])

Removing Rows Based on Another DataFrame

Exclude rows that exist in a reference DataFrame:

# Products to remove
remove_df = spark.createDataFrame([(2,), (5,)], ["id"])

# Anti-join: keep rows from df NOT in remove_df
df_filtered = df.join(remove_df, on="id", how="left_anti")

df_filtered.show()

Pattern Summary

GoalPattern
Multiple AND conditionsdf.filter((cond1) & (cond2))
Multiple OR conditionsdf.filter((cond1) | (cond2))
Negate/invert conditiondf.filter(~condition)
Exclude list of valuesdf.filter(~col("x").isin([...]))
SQL syntaxdf.filter("col != 'value' AND ...")
Remove nullsdf.na.drop(subset=["col"])
Remove duplicatesdf.dropDuplicates(["col"])
Exclude from another DFdf.join(other, "key", "left_anti")

Practical Example: Data Cleaning Pipeline

from pyspark.sql.functions import col

def clean_inventory(df):
"""Remove invalid inventory records."""

# Define all invalid conditions
invalid_conditions = (
col("stock").isNull() |
(col("stock") < 0) |
(col("status") == "Discontinued") |
col("product").isNull() |
(col("product") == "")
)

# Return cleaned DataFrame
return df.filter(~invalid_conditions)

df_clean = clean_inventory(df)
df_clean.show()

By thinking in terms of "filtering in" rather than "deleting out," you work naturally with Spark's immutable DataFrame model while building efficient, maintainable data transformation pipelines.