Skip to main content

How to Apply Transformations to Multiple Columns in PySpark in Python

Enterprise data engineering frequently requires applying identical transformations-such as trimming whitespace, normalizing case, handling nulls, or rounding numbers-across dozens or hundreds of columns. Writing repetitive code for each column is error-prone and unmaintainable. PySpark provides efficient patterns for bulk column transformations that produce optimized execution plans and scale effectively.

The most performant approach generates transformation expressions for all columns and applies them in a single .select() call:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim

spark = SparkSession.builder.appName("BulkTransform").getOrCreate()

# Sample data with messy strings
data = [(" ALICE ", " NEW YORK "), (" BOB ", " LOS ANGELES ")]
df = spark.createDataFrame(data, ["name", "city"])

# Columns to transform
string_columns = ["name", "city"]

# Build transformation expressions
transformed_cols = [
lower(trim(col(c))).alias(c) if c in string_columns else col(c)
for c in df.columns
]

# Apply all transformations in single pass
df_clean = df.select(*transformed_cols)

df_clean.show()

Output:

+-----+-----------+
| name| city|
+-----+-----------+
|alice| new york|
| bob|los angeles|
+-----+-----------+
Optimized Execution Plan

This approach creates a single projection in Spark's logical plan, allowing the Catalyst optimizer to process all transformations efficiently. Iterative approaches create nested plans that complicate optimization.

Dynamic Column Selection by Type

Apply transformations to columns based on their data type:

from pyspark.sql.functions import col, trim, round as spark_round
from pyspark.sql.types import StringType, DoubleType

# Sample data with mixed types
data = [(" Alice ", 25, 1234.5678), (" Bob ", 30, 9876.5432)]
df = spark.createDataFrame(data, ["name", "age", "salary"])

# Get columns by type
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, DoubleType)]

# Build type-specific transformations
transformed = []
for c in df.columns:
if c in string_cols:
transformed.append(trim(col(c)).alias(c))
elif c in numeric_cols:
transformed.append(spark_round(col(c), 2).alias(c))
else:
transformed.append(col(c))

df_result = df.select(*transformed)
df_result.show()

Using reduce for Chained Transformations

The functional reduce pattern chains transformations cleanly:

from functools import reduce
from pyspark.sql.functions import col, upper

data = [("alice", "new york"), ("bob", "chicago")]
df = spark.createDataFrame(data, ["name", "city"])

columns_to_transform = ["name", "city"]

# Chain withColumn calls using reduce
df_upper = reduce(
lambda temp_df, column: temp_df.withColumn(column, upper(col(column))),
columns_to_transform,
df
)

df_upper.show()
When to Use reduce

The reduce pattern is cleaner for dynamic pipeline building where transformations are determined at runtime. For static transformations, prefer the select approach for better performance.

Applying Multiple Transformations Per Column

Chain multiple operations for comprehensive cleaning:

from pyspark.sql.functions import col, lower, trim, regexp_replace

data = [(" ALICE! ", " NEW@YORK "), (" BOB# ", " LA$ ")]
df = spark.createDataFrame(data, ["name", "city"])

def clean_string_column(column_name):
"""Apply multiple cleaning operations to a string column."""
return (
regexp_replace(
lower(trim(col(column_name))),
"[^a-z\\s]", # Remove non-alpha characters
""
).alias(column_name)
)

string_cols = ["name", "city"]

cleaned_cols = [
clean_string_column(c) if c in string_cols else col(c)
for c in df.columns
]

df_clean = df.select(*cleaned_cols)
df_clean.show()

Null Handling Across Columns

Apply null replacement or coalescing to multiple columns:

from pyspark.sql.functions import col, when, coalesce, lit

data = [(None, "NYC"), ("Bob", None), (None, None)]
df = spark.createDataFrame(data, ["name", "city"])

# Replace nulls with default values
defaults = {"name": "Unknown", "city": "N/A"}

null_handled = [
coalesce(col(c), lit(defaults.get(c, ""))).alias(c) if c in defaults else col(c)
for c in df.columns
]

df_filled = df.select(*null_handled)
df_filled.show()

Reusable Transformation Function

Create a utility function for common bulk operations:

from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from typing import List, Callable

def transform_columns(
df: DataFrame,
columns: List[str],
transformation: Callable
) -> DataFrame:
"""
Apply a transformation function to specified columns.

Args:
df: Source DataFrame
columns: List of column names to transform
transformation: Function that takes a column and returns transformed column
"""
transformed = [
transformation(col(c)).alias(c) if c in columns else col(c)
for c in df.columns
]
return df.select(*transformed)


# Usage examples
from pyspark.sql.functions import upper, trim, abs as spark_abs

df_upper = transform_columns(df, ["name", "city"], upper)
df_trimmed = transform_columns(df, ["name"], trim)

Method Comparison

ApproachPlan ComplexityPerformanceUse Case
select + comprehensionO(1) - single projectionExcellentProduction ETL
reduce + withColumnO(N) - chained projectionsGoodDynamic pipelines
For loop + withColumnO(N) - nested projectionsPoorAvoid for many columns
Avoid Iterative withColumn

Calling .withColumn() in a loop creates deeply nested logical plans:

# ❌ Avoid: Creates complex nested plan
for c in columns:
df = df.withColumn(c, transform(col(c)))

# ✅ Prefer: Single optimized projection
df = df.select(*[transform(col(c)).alias(c) for c in columns])

With 100+ columns, nested plans can cause StackOverflowError during query planning.

By leveraging these bulk transformation patterns, you write maintainable, performant PySpark code that scales effectively across enterprise data volumes.