How to Apply a Custom Schema to a DataFrame in PySpark in Python
When working with PySpark DataFrames, the schema, which defines column names, data types, and nullability, is typically inferred automatically from the data source. However, automatic inference can produce incorrect types (such as reading numeric IDs as integers when they should be strings), assign unwanted column names, or lack descriptive metadata. In these cases, you need to apply a custom schema to take full control over your DataFrame's structure.
This guide covers three methods for applying custom schemas in PySpark: changing column names, changing column types, and adding metadata to columns. Each method includes complete code examples with outputs, along with common mistakes to watch out for.
What Is a Schema in PySpark?
A schema defines the structure of a DataFrame: the column names, their data types, and whether they accept null values. You can inspect any DataFrame's schema using the printSchema() method:
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: integer (nullable = true)
PySpark provides two core classes for building custom schemas:
StructType: Represents the overall schema as a collection of fields.StructField: Defines a single column with its name, data type, nullability, and optional metadata.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("column_name", StringType(), True),
# ^name ^type ^nullable
])
Common PySpark data types include StringType(), IntegerType(), FloatType(), DoubleType(), BooleanType(), DateType(), TimestampType(), and ArrayType(). All are available in pyspark.sql.types.
Method 1: Applying a Custom Schema by Changing Column Names
The most common use case is renaming columns while loading data. This is particularly useful when the source file has unclear or inconsistent column headers. You define a StructType schema with your desired column names and pass it to the .schema() method when reading the file.
Example
Suppose you have a CSV file (employee_data.csv) with columns name, age, department, level, and salary. You want to rename them to more descriptive names:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.getOrCreate()
# Define a custom schema with new column names
custom_schema = StructType([
StructField("Employee_Name", StringType(), True),
StructField("Employee_Age", IntegerType(), True),
StructField("Employee_Department", StringType(), True),
StructField("Employee_Level", IntegerType(), True),
StructField("Employee_Salary", IntegerType(), True),
])
# Load CSV with the custom schema
df = (
spark.read.format("csv")
.schema(custom_schema)
.option("header", True)
.load("employee_data.csv")
)
df.printSchema()
df.show()
Output:
root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: integer (nullable = true)
+-------------+------------+-------------------+--------------+---------------+
|Employee_Name|Employee_Age|Employee_Department|Employee_Level|Employee_Salary|
+-------------+------------+-------------------+--------------+---------------+
| John Smith| 32| Engineering| 3| 95000|
| Sarah Jones| 28| Marketing| 2| 72000|
| Mike Johnson| 35| Engineering| 4| 110000|
| Emily Carter| 26| Sales| 2| 68000|
| David Williams| 31| Marketing| 3| 85000|
+-------------+------------+-------------------+--------------+---------------+
When using .schema() with .option("header", True), PySpark ignores the header names in the CSV file and uses the names from your custom schema instead. The columns are mapped by position, not by name. Make sure the order and number of fields in your schema match the columns in the file exactly.
Method 2: Applying a Custom Schema by Changing Column Types
Sometimes the column names are fine, but the data types need correction. For example, Spark might infer a column as integer when you need it as float, or a numeric ID column might be better represented as a string. You can change a column's type using the cast() function with withColumn().
Example
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read CSV with automatic schema inference
df = spark.read.csv("employee_data.csv", sep=",", inferSchema=True, header=True)
print("Original schema:")
df.printSchema()
# Change the 'salary' column from Integer to Float
df_updated = df.withColumn("salary", df["salary"].cast("float"))
print("Updated schema:")
df_updated.printSchema()
df_updated.show()
Output:
Original schema:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- department: string (nullable = true)
|-- level: integer (nullable = true)
|-- salary: integer (nullable = true)
Updated schema:
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- department: string (nullable = true)
|-- level: integer (nullable = true)
|-- salary: float (nullable = true)
+--------------+---+-----------+-----+--------+
| name|age| department|level| salary|
+--------------+---+-----------+-----+--------+
| John Smith| 32|Engineering| 3| 95000.0|
| Sarah Jones| 28| Marketing| 2| 72000.0|
| Mike Johnson| 35|Engineering| 4|110000.0|
| Emily Carter| 26| Sales| 2| 68000.0|
|David Williams| 31| Marketing| 3| 85000.0|
+--------------+---+-----------+-----+--------+
Casting Multiple Columns at Once
If you need to change the types of several columns, you can use a dictionary and a loop to apply all changes efficiently:
type_changes = {
"age": "string",
"salary": "double",
"level": "string",
}
for col_name, new_type in type_changes.items():
df_updated = df_updated.withColumn(col_name, df_updated[col_name].cast(new_type))
df_updated.printSchema()
Output:
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- department: string (nullable = true)
|-- level: string (nullable = true)
|-- salary: double (nullable = true)
Common Mistake: Invalid Casts Producing Silent Nulls
If you cast a string column that contains non-numeric text to an integer type, PySpark silently replaces those values with null instead of raising an error. This can lead to data loss that goes unnoticed:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[("Alice", "twenty"), ("Bob", "30")],
["name", "age"]
)
# Casting non-numeric strings to integer produces nulls silently
df_cast = df.withColumn("age", df["age"].cast("integer"))
df_cast.show()
Output:
+-----+----+
| name| age|
+-----+----+
|Alice|null|
| Bob| 30|
+-----+----+
The value "twenty" could not be converted to an integer, so it became null without any warning.
PySpark does not raise an error for invalid casts. It silently returns null. Always validate your data after type casting, especially when working with messy or user-generated input. You can check for unexpected nulls using:
from pyspark.sql.functions import col
df_cast.filter(col("age").isNull()).show()
Method 3: Applying a Custom Schema with Metadata
Beyond column names and types, StructField supports an optional metadata parameter: a dictionary that can store descriptions, units, data lineage information, or any other key-value pairs. This is useful for documentation, data governance, and building self-describing DataFrames.
Example
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.getOrCreate()
# Define schema with metadata for each column
schema = StructType([
StructField("Employee_Name", StringType(), True,
metadata={"desc": "Full legal name of the employee"}),
StructField("Employee_Age", IntegerType(), True,
metadata={"desc": "Age in years", "unit": "years"}),
StructField("Employee_Department", StringType(), True,
metadata={"desc": "Department the employee belongs to"}),
StructField("Employee_Level", IntegerType(), True,
metadata={"desc": "Job level or seniority grade"}),
StructField("Employee_Salary", IntegerType(), True,
metadata={"desc": "Annual base salary", "currency": "USD"}),
])
# Load CSV with the custom schema
df = (
spark.read.format("csv")
.schema(schema)
.option("header", True)
.load("employee_data.csv")
)
df.printSchema()
# Access and display metadata for each column
print("Column Metadata:")
for field in df.schema.fields:
desc = field.metadata.get("desc", "No description")
print(f" {field.name}: {desc}")
Output:
root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: integer (nullable = true)
Column Metadata:
Employee_Name: Full legal name of the employee
Employee_Age: Age in years
Employee_Department: Department the employee belongs to
Employee_Level: Job level or seniority grade
Employee_Salary: Annual base salary
Metadata is preserved throughout transformations and can be accessed programmatically, making it valuable for:
- Data catalogs: attaching descriptions and tags to columns
- ML pipelines: storing feature engineering details
- Data validation: embedding expected ranges or formats
Bonus: Defining Schema Using DDL Strings
For a more concise approach, PySpark allows you to define schemas using DDL (Data Definition Language) strings, similar to SQL CREATE TABLE syntax:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Define schema using a DDL string
ddl_schema = (
"Employee_Name STRING, Employee_Age INT, "
"Employee_Department STRING, Employee_Level INT, "
"Employee_Salary DOUBLE"
)
df = (
spark.read.format("csv")
.schema(ddl_schema)
.option("header", True)
.load("employee_data.csv")
)
df.printSchema()
Output:
root
|-- Employee_Name: string (nullable = true)
|-- Employee_Age: integer (nullable = true)
|-- Employee_Department: string (nullable = true)
|-- Employee_Level: integer (nullable = true)
|-- Employee_Salary: double (nullable = true)
This is a quick and readable alternative when you do not need metadata or fine-grained nullability control.
Summary
| Method | Use Case | Approach |
|---|---|---|
| Change column names | Rename columns during file loading | Define StructType schema and pass to .schema() |
| Change column types | Fix or convert data types after loading | Use .withColumn() with .cast() |
| Add metadata | Attach descriptions or tags to columns | Include metadata dict in StructField |
| DDL string | Quick, concise schema definition | Pass DDL string to .schema() |
- Use
StructTypewithStructFieldfor full control over names, types, nullability, and metadata. - Use
.cast()on existing DataFrames to change individual column types after loading. - Use DDL strings for a concise, SQL-like schema definition when metadata is not needed.
- Always validate data after casting to catch silent
nullconversions from invalid values.