Python PySpark: How to Append Data to an Empty PySpark DataFrame in Python
PySpark DataFrames are immutable. Once created, they cannot be modified in place. This fundamental characteristic means that "appending" data does not work the way it does with a Python list or a Pandas DataFrame. Instead, you create a new DataFrame by combining the original with new records using union operations.
Understanding this pattern is essential for building accumulator workflows, batch processing pipelines, and incremental data loading in Spark.
This guide walks you through creating an empty DataFrame with a defined schema, appending data to it using different union strategies, and avoiding common performance pitfalls.
Creating an Empty DataFrame with a Schema
Before you can append anything, you need an empty DataFrame that defines the structure your data will follow. Always define the schema explicitly using StructType so that Spark knows the column names, data types, and nullability constraints from the start:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("DataAppend").getOrCreate()
# Define the schema explicitly
schema = StructType([
StructField("product_id", IntegerType(), False),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True)
])
# Create an empty DataFrame with the defined schema
empty_df = spark.createDataFrame([], schema)
empty_df.printSchema()
empty_df.show()
Output:
root
|-- product_id: integer (nullable = false)
|-- product_name: string (nullable = true)
|-- quantity: integer (nullable = true)
+----------+------------+--------+
|product_id|product_name|quantity|
+----------+------------+--------+
+----------+------------+--------+
The DataFrame has the correct structure with zero rows. This serves as the starting point for appending data.
Appending Data with union()
The .union() method combines two DataFrames by stacking the rows of one on top of the other. It matches columns by position, so both DataFrames must have the same number of columns in the same order with compatible data types:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("DataAppend").getOrCreate()
schema = StructType([
StructField("product_id", IntegerType(), False),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True)
])
empty_df = spark.createDataFrame([], schema)
# Create new data to append
new_data = [
(1, "Laptop", 50),
(2, "Mouse", 200),
(3, "Keyboard", 150)
]
new_df = spark.createDataFrame(new_data, schema)
# Append by creating a union
result_df = empty_df.union(new_df)
result_df.show()
Output:
+----------+------------+--------+
|product_id|product_name|quantity|
+----------+------------+--------+
| 1| Laptop| 50|
| 2| Mouse| 200|
| 3| Keyboard| 150|
+----------+------------+--------+
The original empty_df is unchanged. The result_df is a completely new DataFrame containing the combined rows.
The .union() operation requires both DataFrames to have identical schemas: the same column count, the same data types, and the same column order. Mismatched schemas cause runtime errors.
Using unionByName() for Flexible Merging
When the column order might differ between DataFrames, .unionByName() matches columns by name instead of by position. This is safer and more resilient to schema variations:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName("DataAppend").getOrCreate()
schema = StructType([
StructField("product_id", IntegerType(), False),
StructField("product_name", StringType(), True),
StructField("quantity", IntegerType(), True)
])
result_df = spark.createDataFrame([
(1, "Laptop", 50),
(2, "Mouse", 200),
(3, "Keyboard", 150)
], schema)
# DataFrame with different column order
reordered_data = spark.createDataFrame(
[(4, 75, "Monitor")],
["product_id", "quantity", "product_name"]
)
# unionByName matches by column name, not position
combined_df = result_df.unionByName(reordered_data)
combined_df.show()
Output:
+----------+------------+--------+
|product_id|product_name|quantity|
+----------+------------+--------+
| 1| Laptop| 50|
| 2| Mouse| 200|
| 3| Keyboard| 150|
| 4| Monitor| 75|
+----------+------------+--------+
If you had used .union() here, the quantity and product_name values for the Monitor row would have been swapped because .union() matches by position.
In Spark 3.1 and later, you can use unionByName(other_df, allowMissingColumns=True) to merge DataFrames that have different column sets. Missing columns are automatically filled with null.
Building an Accumulator Pattern for Batch Processing
A common real-world scenario is processing multiple batches of data and accumulating the results into a single DataFrame:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
spark = SparkSession.builder.appName("DataAppend").getOrCreate()
sales_schema = StructType([
StructField("date", DateType(), False),
StructField("region", StringType(), True),
StructField("sales", IntegerType(), True)
])
# Initialize the accumulator as an empty DataFrame
all_sales = spark.createDataFrame([], sales_schema)
# Simulate three daily batches
batch_data = [
[(date(2024, 1, 1), "North", 1000), (date(2024, 1, 1), "South", 1500)],
[(date(2024, 1, 2), "North", 1100), (date(2024, 1, 2), "South", 1400)],
[(date(2024, 1, 3), "North", 1200), (date(2024, 1, 3), "South", 1600)]
]
# Process each batch and accumulate
for batch in batch_data:
batch_df = spark.createDataFrame(batch, sales_schema)
all_sales = all_sales.union(batch_df)
all_sales.show()
Output:
+----------+------+-----+
| date|region|sales|
+----------+------+-----+
|2024-01-01| North| 1000|
|2024-01-01| South| 1500|
|2024-01-02| North| 1100|
|2024-01-02| South| 1400|
|2024-01-03| North| 1200|
|2024-01-03| South| 1600|
+----------+------+-----+
This works correctly, but iterative unions can become a performance problem at scale. The next section explains why and how to avoid it.
Avoiding Deep Union Chains
Calling .union() repeatedly in a loop creates a deeply nested execution plan. Each union adds a layer, and Spark's catalyst optimizer must analyze the entire tree before executing. With many iterations, this leads to slow planning, excessive driver memory usage, and even StackOverflowError.
The Inefficient Way
# Avoid this pattern with many batches
result = empty_df
for batch in batch_data:
batch_df = spark.createDataFrame(batch, schema)
result = result.union(batch_df)
The Better Way
Collect all DataFrames first, then combine them in a single operation using reduce:
from functools import reduce
from pyspark.sql import DataFrame
# Build all DataFrames first
dataframes = [spark.createDataFrame(batch, sales_schema) for batch in batch_data]
# Combine in one step
if dataframes:
result = reduce(DataFrame.union, dataframes)
result.show()
Output:
+----------+------+-----+
| date|region|sales|
+----------+------+-----+
|2024-01-01| North| 1000|
|2024-01-01| South| 1500|
|2024-01-02| North| 1100|
|2024-01-02| South| 1400|
|2024-01-03| North| 1200|
|2024-01-03| South| 1600|
+----------+------+-----+
Deeply nested union chains from iterative loops can cause:
- StackOverflowError during plan analysis with hundreds of iterations
- Significantly longer query optimization time
- Increased memory pressure on the driver node
When possible, collect all your data first and create a single DataFrame, or use reduce() to combine a list of DataFrames more cleanly.
Handling Duplicate Rows
An important detail to understand is that PySpark's .union() behaves like SQL's UNION ALL, meaning it preserves duplicate rows. It does not automatically deduplicate:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataAppend").getOrCreate()
df1 = spark.createDataFrame([(1, "A")], ["id", "value"])
df2 = spark.createDataFrame([(1, "A")], ["id", "value"])
# Union preserves all rows, including duplicates
with_dupes = df1.union(df2)
print("With duplicates:")
with_dupes.show()
# Use .distinct() to remove duplicates if needed
unique_df = df1.union(df2).distinct()
print("After deduplication:")
unique_df.show()
Output:
With duplicates:
+---+-----+
| id|value|
+---+-----+
| 1| A|
| 1| A|
+---+-----+
After deduplication:
+---+-----+
| id|value|
+---+-----+
| 1| A|
+---+-----+
If your workflow requires unique rows, chain .distinct() after the union. Be aware that .distinct() triggers a shuffle operation, so use it intentionally rather than as a default.
Method Comparison
| Method | Column Matching | Best Use Case |
|---|---|---|
.union() | By position | Guaranteed identical schemas in the same column order |
.unionByName() | By name | DataFrames with potentially different column orders |
.unionByName(..., allowMissingColumns=True) | By name, fills nulls | Evolving schemas where columns may be added over time |
Summary
Appending data in PySpark always means creating a new DataFrame through a union operation, because PySpark DataFrames are immutable.
Start with an empty DataFrame that has an explicitly defined schema, then use .union() when both DataFrames share the exact same column order, or .unionByName() when column order might differ.
For batch processing, avoid iterative union loops that create deeply nested execution plans.
Instead, collect your DataFrames into a list and combine them with reduce() for a cleaner and more efficient approach.
Finally, remember that .union() behaves like UNION ALL and preserves duplicates. Use .distinct() afterward if deduplication is needed.