How to Apply Transformations to Multiple Columns in PySpark in Python
Enterprise data engineering frequently requires applying identical transformations-such as trimming whitespace, normalizing case, handling nulls, or rounding numbers-across dozens or hundreds of columns. Writing repetitive code for each column is error-prone and unmaintainable. PySpark provides efficient patterns for bulk column transformations that produce optimized execution plans and scale effectively.
Using select with List Comprehension (Recommended)
The most performant approach generates transformation expressions for all columns and applies them in a single .select() call:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim
spark = SparkSession.builder.appName("BulkTransform").getOrCreate()
# Sample data with messy strings
data = [(" ALICE ", " NEW YORK "), (" BOB ", " LOS ANGELES ")]
df = spark.createDataFrame(data, ["name", "city"])
# Columns to transform
string_columns = ["name", "city"]
# Build transformation expressions
transformed_cols = [
lower(trim(col(c))).alias(c) if c in string_columns else col(c)
for c in df.columns
]
# Apply all transformations in single pass
df_clean = df.select(*transformed_cols)
df_clean.show()
Output:
+-----+-----------+
| name| city|
+-----+-----------+
|alice| new york|
| bob|los angeles|
+-----+-----------+
This approach creates a single projection in Spark's logical plan, allowing the Catalyst optimizer to process all transformations efficiently. Iterative approaches create nested plans that complicate optimization.
Dynamic Column Selection by Type
Apply transformations to columns based on their data type:
from pyspark.sql.functions import col, trim, round as spark_round
from pyspark.sql.types import StringType, DoubleType
# Sample data with mixed types
data = [(" Alice ", 25, 1234.5678), (" Bob ", 30, 9876.5432)]
df = spark.createDataFrame(data, ["name", "age", "salary"])
# Get columns by type
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DoubleType)]
# Build type-specific transformations
transformed = []
for c in df.columns:
if c in string_cols:
transformed.append(trim(col(c)).alias(c))
elif c in numeric_cols:
transformed.append(spark_round(col(c), 2).alias(c))
else:
transformed.append(col(c))
df_result = df.select(*transformed)
df_result.show()
Using reduce for Chained Transformations
The functional reduce pattern chains transformations cleanly:
from functools import reduce
from pyspark.sql.functions import col, upper
data = [("alice", "new york"), ("bob", "chicago")]
df = spark.createDataFrame(data, ["name", "city"])
columns_to_transform = ["name", "city"]
# Chain withColumn calls using reduce
df_upper = reduce(
lambda temp_df, column: temp_df.withColumn(column, upper(col(column))),
columns_to_transform,
df
)
df_upper.show()
The reduce pattern is cleaner for dynamic pipeline building where transformations are determined at runtime. For static transformations, prefer the select approach for better performance.
Applying Multiple Transformations Per Column
Chain multiple operations for comprehensive cleaning:
from pyspark.sql.functions import col, lower, trim, regexp_replace
data = [(" ALICE! ", " NEW@YORK "), (" BOB# ", " LA$ ")]
df = spark.createDataFrame(data, ["name", "city"])
def clean_string_column(column_name):
"""Apply multiple cleaning operations to a string column."""
return (
regexp_replace(
lower(trim(col(column_name))),
"[^a-z\\s]", # Remove non-alpha characters
""
).alias(column_name)
)
string_cols = ["name", "city"]
cleaned_cols = [
clean_string_column(c) if c in string_cols else col(c)
for c in df.columns
]
df_clean = df.select(*cleaned_cols)
df_clean.show()
Null Handling Across Columns
Apply null replacement or coalescing to multiple columns:
from pyspark.sql.functions import col, when, coalesce, lit
data = [(None, "NYC"), ("Bob", None), (None, None)]
df = spark.createDataFrame(data, ["name", "city"])
# Replace nulls with default values
defaults = {"name": "Unknown", "city": "N/A"}
null_handled = [
coalesce(col(c), lit(defaults.get(c, ""))).alias(c) if c in defaults else col(c)
for c in df.columns
]
df_filled = df.select(*null_handled)
df_filled.show()
Reusable Transformation Function
Create a utility function for common bulk operations:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from typing import List, Callable
def transform_columns(
df: DataFrame,
columns: List[str],
transformation: Callable
) -> DataFrame:
"""
Apply a transformation function to specified columns.
Args:
df: Source DataFrame
columns: List of column names to transform
transformation: Function that takes a column and returns transformed column
"""
transformed = [
transformation(col(c)).alias(c) if c in columns else col(c)
for c in df.columns
]
return df.select(*transformed)
# Usage examples
from pyspark.sql.functions import upper, trim, abs as spark_abs
df_upper = transform_columns(df, ["name", "city"], upper)
df_trimmed = transform_columns(df, ["name"], trim)
Method Comparison
| Approach | Plan Complexity | Performance | Use Case |
|---|---|---|---|
select + comprehension | O(1) - single projection | Excellent | Production ETL |
reduce + withColumn | O(N) - chained projections | Good | Dynamic pipelines |
For loop + withColumn | O(N) - nested projections | Poor | Avoid for many columns |
Calling .withColumn() in a loop creates deeply nested logical plans:
# ❌ Avoid: Creates complex nested plan
for c in columns:
df = df.withColumn(c, transform(col(c)))
# ✅ Prefer: Single optimized projection
df = df.select(*[transform(col(c)).alias(c) for c in columns])
With 100+ columns, nested plans can cause StackOverflowError during query planning.
By leveraging these bulk transformation patterns, you write maintainable, performant PySpark code that scales effectively across enterprise data volumes.