Skip to main content

How to Apply a Custom Schema to a DataFrame in PySpark in Python

When working with PySpark DataFrames, the schema, which defines column names, data types, and nullability, is typically inferred automatically from the data source. However, automatic inference can produce incorrect types (such as reading numeric IDs as integers when they should be strings), assign unwanted column names, or lack descriptive metadata. In these cases, you need to apply a custom schema to take full control over your DataFrame's structure.

This guide covers three methods for applying custom schemas in PySpark: changing column names, changing column types, and adding metadata to columns. Each method includes complete code examples with outputs, along with common mistakes to watch out for.

What Is a Schema in PySpark?

A schema defines the structure of a DataFrame: the column names, their data types, and whether they accept null values. You can inspect any DataFrame's schema using the printSchema() method:

df.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: integer (nullable = true)

PySpark provides two core classes for building custom schemas:

  • StructType: Represents the overall schema as a collection of fields.
  • StructField: Defines a single column with its name, data type, nullability, and optional metadata.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
StructField("column_name", StringType(), True),
# ^name ^type ^nullable
])
info

Common PySpark data types include StringType(), IntegerType(), FloatType(), DoubleType(), BooleanType(), DateType(), TimestampType(), and ArrayType(). All are available in pyspark.sql.types.

Method 1: Applying a Custom Schema by Changing Column Names

The most common use case is renaming columns while loading data. This is particularly useful when the source file has unclear or inconsistent column headers. You define a StructType schema with your desired column names and pass it to the .schema() method when reading the file.

Example

Suppose you have a CSV file (employee_data.csv) with columns name, age, department, level, and salary. You want to rename them to more descriptive names:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.getOrCreate()

# Define a custom schema with new column names
custom_schema = StructType([
StructField("Employee_Name", StringType(), True),
StructField("Employee_Age", IntegerType(), True),
StructField("Employee_Department", StringType(), True),
StructField("Employee_Level", IntegerType(), True),
StructField("Employee_Salary", IntegerType(), True),
])

# Load CSV with the custom schema
df = (
spark.read.format("csv")
.schema(custom_schema)
.option("header", True)
.load("employee_data.csv")
)

df.printSchema()
df.show()

Output:

root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: integer (nullable = true)

+-------------+------------+-------------------+--------------+---------------+
|Employee_Name|Employee_Age|Employee_Department|Employee_Level|Employee_Salary|
+-------------+------------+-------------------+--------------+---------------+
| John Smith| 32| Engineering| 3| 95000|
| Sarah Jones| 28| Marketing| 2| 72000|
| Mike Johnson| 35| Engineering| 4| 110000|
| Emily Carter| 26| Sales| 2| 68000|
| David Williams| 31| Marketing| 3| 85000|
+-------------+------------+-------------------+--------------+---------------+
caution

When using .schema() with .option("header", True), PySpark ignores the header names in the CSV file and uses the names from your custom schema instead. The columns are mapped by position, not by name. Make sure the order and number of fields in your schema match the columns in the file exactly.

Method 2: Applying a Custom Schema by Changing Column Types

Sometimes the column names are fine, but the data types need correction. For example, Spark might infer a column as integer when you need it as float, or a numeric ID column might be better represented as a string. You can change a column's type using the cast() function with withColumn().

Example

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read CSV with automatic schema inference
df = spark.read.csv("employee_data.csv", sep=",", inferSchema=True, header=True)

print("Original schema:")
df.printSchema()

# Change the 'salary' column from Integer to Float
df_updated = df.withColumn("salary", df["salary"].cast("float"))

print("Updated schema:")
df_updated.printSchema()
df_updated.show()

Output:

Original schema:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- department: string (nullable = true)
|-- level: integer (nullable = true)
|-- salary: integer (nullable = true)

Updated schema:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- department: string (nullable = true)
|-- level: integer (nullable = true)
|-- salary: float (nullable = true)

+--------------+---+-----------+-----+--------+
| name|age| department|level| salary|
+--------------+---+-----------+-----+--------+
| John Smith| 32|Engineering| 3| 95000.0|
| Sarah Jones| 28| Marketing| 2| 72000.0|
| Mike Johnson| 35|Engineering| 4|110000.0|
| Emily Carter| 26| Sales| 2| 68000.0|
|David Williams| 31| Marketing| 3| 85000.0|
+--------------+---+-----------+-----+--------+

Casting Multiple Columns at Once

If you need to change the types of several columns, you can use a dictionary and a loop to apply all changes efficiently:

type_changes = {
"age": "string",
"salary": "double",
"level": "string",
}

for col_name, new_type in type_changes.items():
df_updated = df_updated.withColumn(col_name, df_updated[col_name].cast(new_type))

df_updated.printSchema()

Output:

root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- department: string (nullable = true)
|-- level: string (nullable = true)
|-- salary: double (nullable = true)

Common Mistake: Invalid Casts Producing Silent Nulls

If you cast a string column that contains non-numeric text to an integer type, PySpark silently replaces those values with null instead of raising an error. This can lead to data loss that goes unnoticed:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
[("Alice", "twenty"), ("Bob", "30")],
["name", "age"]
)

# Casting non-numeric strings to integer produces nulls silently
df_cast = df.withColumn("age", df["age"].cast("integer"))
df_cast.show()

Output:

+-----+----+
| name| age|
+-----+----+
|Alice|null|
| Bob| 30|
+-----+----+

The value "twenty" could not be converted to an integer, so it became null without any warning.

caution

PySpark does not raise an error for invalid casts. It silently returns null. Always validate your data after type casting, especially when working with messy or user-generated input. You can check for unexpected nulls using:

from pyspark.sql.functions import col
df_cast.filter(col("age").isNull()).show()

Method 3: Applying a Custom Schema with Metadata

Beyond column names and types, StructField supports an optional metadata parameter: a dictionary that can store descriptions, units, data lineage information, or any other key-value pairs. This is useful for documentation, data governance, and building self-describing DataFrames.

Example

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.getOrCreate()

# Define schema with metadata for each column
schema = StructType([
StructField("Employee_Name", StringType(), True,
metadata={"desc": "Full legal name of the employee"}),
StructField("Employee_Age", IntegerType(), True,
metadata={"desc": "Age in years", "unit": "years"}),
StructField("Employee_Department", StringType(), True,
metadata={"desc": "Department the employee belongs to"}),
StructField("Employee_Level", IntegerType(), True,
metadata={"desc": "Job level or seniority grade"}),
StructField("Employee_Salary", IntegerType(), True,
metadata={"desc": "Annual base salary", "currency": "USD"}),
])

# Load CSV with the custom schema
df = (
spark.read.format("csv")
.schema(schema)
.option("header", True)
.load("employee_data.csv")
)

df.printSchema()

# Access and display metadata for each column
print("Column Metadata:")
for field in df.schema.fields:
desc = field.metadata.get("desc", "No description")
print(f" {field.name}: {desc}")

Output:

root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: integer (nullable = true)

Column Metadata:
Employee_Name: Full legal name of the employee
Employee_Age: Age in years
Employee_Department: Department the employee belongs to
Employee_Level: Job level or seniority grade
Employee_Salary: Annual base salary
tip

Metadata is preserved throughout transformations and can be accessed programmatically, making it valuable for:

  • Data catalogs: attaching descriptions and tags to columns
  • ML pipelines: storing feature engineering details
  • Data validation: embedding expected ranges or formats

Bonus: Defining Schema Using DDL Strings

For a more concise approach, PySpark allows you to define schemas using DDL (Data Definition Language) strings, similar to SQL CREATE TABLE syntax:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Define schema using a DDL string
ddl_schema = (
"Employee_Name STRING, Employee_Age INT, "
"Employee_Department STRING, Employee_Level INT, "
"Employee_Salary DOUBLE"
)

df = (
spark.read.format("csv")
.schema(ddl_schema)
.option("header", True)
.load("employee_data.csv")
)

df.printSchema()

Output:

root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: double (nullable = true)

This is a quick and readable alternative when you do not need metadata or fine-grained nullability control.

Summary

MethodUse CaseApproach
Change column namesRename columns during file loadingDefine StructType schema and pass to .schema()
Change column typesFix or convert data types after loadingUse .withColumn() with .cast()
Add metadataAttach descriptions or tags to columnsInclude metadata dict in StructField
DDL stringQuick, concise schema definitionPass DDL string to .schema()
  • Use StructType with StructField for full control over names, types, nullability, and metadata.
  • Use .cast() on existing DataFrames to change individual column types after loading.
  • Use DDL strings for a concise, SQL-like schema definition when metadata is not needed.
  • Always validate data after casting to catch silent null conversions from invalid values.