Skip to main content

Python PySpark: How to Apply Functions to Each Row in PySpark in Python

PySpark excels at vectorized column operations, but real-world data engineering often requires row-level transformations: applying custom business logic, complex calculations, or conditional processing to individual records. Choosing the right approach for these transformations has a direct and significant impact on performance across distributed clusters.

This guide covers every major method for applying functions to rows in PySpark, from the fastest native operations to the most flexible RDD-based processing. Each approach is explained with complete examples, outputs, and clear guidance on when to use it.

Using Native Spark Functions (Always Try First)

Before writing any custom function, check whether PySpark's built-in functions can express your logic. Native functions run entirely within the JVM, are optimized by Spark's Catalyst query planner, and are always the fastest option:

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, concat, lit, upper

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", 28), ("Bob", 16), ("Charlie", 35), ("Dorothy", 70)]
df = spark.createDataFrame(data, ["name", "age"])

# Conditional logic with when()
df_native = df.withColumn(
"category",
when(col("age") < 18, "Minor")
.when(col("age") >= 65, "Senior")
.otherwise("Adult")
)

# String manipulation with native functions
df_native = df_native.withColumn(
"profile",
concat(upper(col("name")), lit(" - Age: "), col("age"))
)

df_native.show(truncate=False)

Output:

+-------+---+--------+--------------------+
|name |age|category|profile |
+-------+---+--------+--------------------+
|Alice |28 |Adult |ALICE - Age: 28 |
|Bob |16 |Minor |BOB - Age: 16 |
|Charlie|35 |Adult |CHARLIE - Age: 35 |
|Dorothy|70 |Senior |DOROTHY - Age: 70 |
+-------+---+--------+--------------------+
tip

PySpark provides hundreds of built-in functions in pyspark.sql.functions, including when(), concat(), regexp_replace(), date_format(), array(), and many more. Always check the documentation before writing a custom UDF, as there is often a native equivalent that runs significantly faster.

Using User-Defined Functions (UDFs)

When built-in functions cannot express your logic, User-Defined Functions (UDFs) are the standard approach for applying custom Python code to DataFrame columns on a per-row basis:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", 28), ("Bob", 16), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# Define custom row logic
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"

# Register as UDF with explicit return type
categorize_udf = udf(categorize_age, StringType())

# Apply to DataFrame
df_result = df.withColumn("category", categorize_udf(col("age")))
df_result.show()

Output:

+-------+---+--------+
| name|age|category|
+-------+---+--------+
| Alice| 28| Adult|
| Bob| 16| Minor|
|Charlie| 35| Adult|
+-------+---+--------+
warning

Python UDFs serialize data between the JVM and the Python interpreter for each row. This creates significant overhead and can slow processing by 5 to 10x compared to native Spark functions. Use UDFs only when built-in functions truly cannot handle your logic.

Multi-Column UDFs

A single UDF can accept multiple columns as input, allowing you to combine values from different fields:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", "Smith", 28), ("Bob", "Jones", 16)]
df = spark.createDataFrame(data, ["first_name", "last_name", "age"])

def create_profile(first, last, age):
return f"{first} {last} (Age: {age})"

profile_udf = udf(create_profile, StringType())

df_profiles = df.withColumn(
"profile",
profile_udf(col("first_name"), col("last_name"), col("age"))
)

df_profiles.show(truncate=False)

Output:

+----------+---------+---+----------------------+
|first_name|last_name|age|profile |
+----------+---------+---+----------------------+
|Alice |Smith |28 |Alice Smith (Age: 28) |
|Bob |Jones |16 |Bob Jones (Age: 16) |
+----------+---------+---+----------------------+

Returning Complex Types

UDFs are not limited to simple scalar values. They can return arrays, structs, and other complex types:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import (
ArrayType, StringType, StructType, StructField, IntegerType
)

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", 28), ("Bob", 16), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

# UDF that returns an array
@udf(ArrayType(StringType()))
def generate_tags(name, age):
tags = [name.lower()]
if age < 18:
tags.append("minor")
else:
tags.append("adult")
return tags

# UDF that returns a struct
info_schema = StructType([
StructField("category", StringType(), True),
StructField("birth_year", IntegerType(), True)
])

@udf(info_schema)
def create_info(age):
from datetime import datetime
category = "Minor" if age < 18 else "Adult"
birth_year = datetime.now().year - age
return (category, birth_year)

df_complex = df.withColumn("tags", generate_tags(col("name"), col("age")))
df_complex = df_complex.withColumn("info", create_info(col("age")))
df_complex.show(truncate=False)

Output:

+-------+---+---------------+-------------+
|name |age|tags |info |
+-------+---+---------------+-------------+
|Alice |28 |[alice, adult] |{Adult, 1996}|
|Bob |16 |[bob, minor] |{Minor, 2008}|
|Charlie|35 |[charlie, adult]|{Adult, 1989}|
+-------+---+---------------+-------------+

Pandas UDFs for Better Performance

Pandas UDFs (also called Vectorized UDFs) process data in batches using Apache Arrow instead of row by row. This dramatically reduces serialization overhead and can be 10 to 100x faster than regular Python UDFs:

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import StringType
import pandas as pd

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", 28), ("Bob", 16), ("Charlie", 35), ("Dorothy", 70)]
df = spark.createDataFrame(data, ["name", "age"])

@pandas_udf(StringType())
def categorize_vectorized(ages: pd.Series) -> pd.Series:
return ages.apply(
lambda x: "Minor" if x < 18 else ("Senior" if x >= 65 else "Adult")
)

df_vectorized = df.withColumn("category", categorize_vectorized(col("age")))
df_vectorized.show()

Output:

+-------+---+--------+
| name|age|category|
+-------+---+--------+
| Alice| 28| Adult|
| Bob| 16| Minor|
|Charlie| 35| Adult|
|Dorothy| 70| Senior|
+-------+---+--------+
info

Pandas UDFs are faster than regular Python UDFs because they:

  • Transfer data in columnar Arrow format instead of row-by-row serialization
  • Process entire batches rather than individual rows
  • Minimize the number of Python-JVM boundary crossings

Use Pandas UDFs whenever your custom logic can be expressed as a vectorized operation on a Pandas Series.

RDD-Based Row Processing

For complete control over row structure, including adding, removing, or fundamentally restructuring columns, you can drop down to PySpark's RDD API and use .map():

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RowFunctions").getOrCreate()

data = [("Alice", 28), ("Bob", 16), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])

def transform_row(row):
return (
row.name.upper(),
row.age,
"Adult" if row.age >= 18 else "Minor"
)

transformed_rdd = df.rdd.map(transform_row)
df_transformed = transformed_rdd.toDF(["upper_name", "age", "category"])
df_transformed.show()

Output:

+----------+---+--------+
|upper_name|age|category|
+----------+---+--------+
| ALICE| 28| Adult|
| BOB| 16| Minor|
| CHARLIE| 35| Adult|
+----------+---+--------+
warning

RDD operations bypass Spark's Catalyst optimizer entirely and require full data serialization between the JVM and Python. This makes them the slowest option. Use RDD processing only when DataFrame operations and UDFs genuinely cannot express your transformation logic.

Performance Comparison

The following table summarizes the relative performance and ideal use case for each method:

MethodPerformanceBest Use Case
Native functions (when, concat, etc.)ExcellentSimple to moderate logic; always the first choice
Pandas UDF (@pandas_udf)GoodComplex logic that can be vectorized with Pandas
Python UDF (udf())FairNon-vectorizable custom logic that native functions cannot handle
RDD .map()PoorComplete row restructuring or highly complex transformations

Decision Guide

When you need to apply a function to each row, follow this priority order:

  1. Check for native Spark functions first. Functions like when(), concat(), regexp_replace(), and coalesce() cover the majority of row-level operations and run entirely within the JVM.
  2. Use Pandas UDFs for complex vectorizable logic. If your operation can be expressed as a function on a Pandas Series, a Pandas UDF gives you custom logic with near-native performance.
  3. Use Python UDFs for non-vectorizable custom logic. When your function depends on conditional branching, external lookups, or other patterns that do not vectorize cleanly, a standard UDF is the right tool.
  4. Use RDD operations as a last resort. Drop to the RDD level only when you need to fundamentally restructure rows in ways that the DataFrame API cannot express.

Summary

PySpark offers multiple ways to apply functions to individual rows, each with different performance characteristics.

  • Native Spark functions should always be your first choice because they execute within the JVM and benefit from Spark's query optimizer.
  • When custom logic is unavoidable, Pandas UDFs provide the best performance by processing data in vectorized batches through Apache Arrow.
  • Standard Python UDFs offer maximum flexibility for non-vectorizable logic but come with significant serialization overhead.
  • RDD-based processing gives you complete control over row structure but bypasses all of Spark's optimizations and should be reserved for cases where no other approach works.