How to Delete Rows Based on Multiple Conditions in PySpark
PySpark DataFrames are immutable-you cannot modify or delete rows in place. Instead, "deleting" rows means creating a new DataFrame that excludes unwanted records by filtering them out. This guide demonstrates efficient techniques for removing rows based on complex, multi-condition logic while respecting Spark's distributed architecture.
Filtering with Multiple Conditions
Use .filter() or .where() with bitwise operators to combine conditions: & (AND), | (OR), ~ (NOT):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("DeleteRows").getOrCreate()
# Sample inventory data
data = [
(1, "Laptop", 50, "Active"),
(2, "Mouse", 0, "Active"),
(3, "Keyboard", 25, "Discontinued"),
(4, "Monitor", 15, "Active"),
(5, "Webcam", 0, "Discontinued")
]
df = spark.createDataFrame(data, ["id", "product", "stock", "status"])
# "Delete" rows where status is Discontinued OR stock is 0
# Logic: KEEP rows where status is Active AND stock > 0
df_filtered = df.filter(
(col("status") == "Active") & (col("stock") > 0)
)
df_filtered.show()
Output:
+---+-------+-----+------+
| id|product|stock|status|
+---+-------+-----+------+
| 1| Laptop| 50|Active|
| 4|Monitor| 15|Active|
+---+-------+-----+------+
Each condition must be wrapped in parentheses when using bitwise operators. Missing parentheses cause syntax errors or incorrect logic:
# ❌ Wrong: Missing parentheses
df.filter(col("status") == "Active" & col("stock") > 0)
# ✅ Correct: Each condition wrapped
df.filter((col("status") == "Active") & (col("stock") > 0))
Using NOT Logic for Deletion
Sometimes it's easier to define what to remove, then negate the condition:
from pyspark.sql.functions import col
# Define condition for rows TO DELETE
rows_to_delete = (col("status") == "Discontinued") | (col("stock") == 0)
# Keep everything EXCEPT those rows
df_clean = df.filter(~rows_to_delete)
df_clean.show()
This approach is clearer when deletion logic is complex:
# Remove invalid records (multiple criteria)
invalid_records = (
(col("stock") < 0) | # Negative stock
(col("status") == "Discontinued") | # Discontinued items
(col("product").isNull()) # Missing product name
)
df_valid = df.filter(~invalid_records)
Removing Rows Using isin()
For filtering based on a list of values, use .isin():
from pyspark.sql.functions import col
# IDs to remove
invalid_ids = [2, 3, 5]
# Keep rows NOT in the invalid list
df_filtered = df.filter(~col("id").isin(invalid_ids))
df_filtered.show()
Output:
+---+-------+-----+------+
| id|product|stock|status|
+---+-------+-----+------+
| 1| Laptop| 50|Active|
| 4|Monitor| 15|Active|
+---+-------+-----+------+
For including rows in a list (inverse operation):
# Keep only specific categories
valid_statuses = ["Active", "Pending"]
df_active = df.filter(col("status").isin(valid_statuses))
SQL String Syntax
For complex conditions, SQL syntax can be more readable:
# SQL-style filtering
df_clean = df.filter("stock > 0 AND status = 'Active'")
# With NOT IN
df_filtered = df.filter("id NOT IN (2, 3, 5)")
# Complex conditions
df_valid = df.filter("""
status != 'Discontinued'
AND stock > 0
AND product IS NOT NULL
""")
SQL strings are often clearer for complex logic, while column expressions offer better IDE support and compile-time checking. Choose based on your team's preference and use case.
Removing Nulls and Duplicates
Common deletion patterns for data cleaning:
from pyspark.sql.functions import col
# Remove rows with null in specific columns
df_no_nulls = df.filter(
col("product").isNotNull() & col("stock").isNotNull()
)
# Using na.drop() for null removal
df_cleaned = df.na.drop(subset=["product", "stock"])
# Remove duplicate rows
df_unique = df.dropDuplicates()
# Remove duplicates based on specific columns
df_unique_products = df.dropDuplicates(["product"])
Removing Rows Based on Another DataFrame
Exclude rows that exist in a reference DataFrame:
# Products to remove
remove_df = spark.createDataFrame([(2,), (5,)], ["id"])
# Anti-join: keep rows from df NOT in remove_df
df_filtered = df.join(remove_df, on="id", how="left_anti")
df_filtered.show()
Pattern Summary
| Goal | Pattern |
|---|---|
| Multiple AND conditions | df.filter((cond1) & (cond2)) |
| Multiple OR conditions | df.filter((cond1) | (cond2)) |
| Negate/invert condition | df.filter(~condition) |
| Exclude list of values | df.filter(~col("x").isin([...])) |
| SQL syntax | df.filter("col != 'value' AND ...") |
| Remove nulls | df.na.drop(subset=["col"]) |
| Remove duplicates | df.dropDuplicates(["col"]) |
| Exclude from another DF | df.join(other, "key", "left_anti") |
Practical Example: Data Cleaning Pipeline
from pyspark.sql.functions import col
def clean_inventory(df):
"""Remove invalid inventory records."""
# Define all invalid conditions
invalid_conditions = (
col("stock").isNull() |
(col("stock") < 0) |
(col("status") == "Discontinued") |
col("product").isNull() |
(col("product") == "")
)
# Return cleaned DataFrame
return df.filter(~invalid_conditions)
df_clean = clean_inventory(df)
df_clean.show()
By thinking in terms of "filtering in" rather than "deleting out," you work naturally with Spark's immutable DataFrame model while building efficient, maintainable data transformation pipelines.