A schema in PySpark (and generally in data processing) defines the structure of a DataFrame, including the names and data types of each column. It serves as a blueprint for the DataFrame, ensuring that data is organized and processed consistently.
Key Concepts of a Schema
- Structure: A schema specifies the number of columns and their respective data types.
- Data Types: It defines the type of data each column can hold (e.g., integer, string, boolean).
- Column Names: It includes the names of the columns in the DataFrame.
- Nullability: It indicates whether a column can contain null values.
Importance of a Schema
- Data Integrity: Ensures that the data conforms to the expected format.
- Performance: Improves performance by avoiding the need to infer the schema at runtime.
- Data Quality: Helps in validating and cleaning the data by specifying constraints.
Common Data Types in PySpark
StructType: Represents a structure containing multiple fields.
StringType: Represents string data.
IntegerType: Represents integer data.
FloatType: Represents floating-point data.
DoubleType: Represents double precision floating-point data.
BooleanType: Represents boolean data.
TimestampType: Represents timestamp data.
DateType: Represents date data.
ArrayType: Represents an array of elements.
# Using StructType and StructField
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
# Define the schema using DDL
schema_ddl = "name STRING, age INT, city STRING"
# Inferring the Schema from Data
# Sample data
data = [("Alice", 29, True), ("Bob", 31, False), ("Cathy", 25, True)]
# Create a DataFrame and infer the schema
df = spark.createDataFrame(data)
# Using JSON Schema
import json
from pyspark.sql.types import StructType
# JSON schema string
json_schema_str = '''
{
"fields": [
{"name": "name", "type": "string", "nullable": true},
{"name": "age", "type": "integer", "nullable": true},
{"name": "is_student", "type": "boolean", "nullable": true}
],
"type": "struct"
}
'''
# Parse the JSON string to create the schema
schema_json = StructType.fromJson(json.loads(json_schema_str))
Explanation of the Example
- Explicit Schema: Clearly defines the schema using
StructTypeandStructField. - DDL Schema: Uses a DDL-formatted string for schema definition.
- Inferred Schema: Infers schema directly from the data.
- JSON Schema: Defines schema using a JSON string.
Summary
A schema in PySpark defines the structure of a DataFrame, including column names, data types, and nullability. It ensures data integrity, improves performance, and aids in data quality. Schemas can be created explicitly using StructType and StructField, inferred from data, defined using DDL strings, or specified in JSON format.
https://spark.apache.org/docs/latest/sql-programming-guide.html
Handling corrupt data in PySpark involves detecting, managing, and potentially cleaning data that does not conform to the expected schema or contains errors. This can be done using different modes while reading data and applying further transformations to manage the corrupt records.
Modes for Handling Corrupt Data
- FAILFAST: Immediately fails when encountering corrupt records.
- DROPMALFORMED: Drops all rows containing corrupt records.
- PERMISSIVE: Includes corrupt records in a special column (
_corrupt_record) and processes the rest of the data.
Using PERMISSIVE Mode to Handle Corrupt Data
The PERMISSIVE mode allows you to load all data, including corrupt rows, which are stored in a special column (_corrupt_record). You can then filter out or inspect these corrupt rows to clean the data.
Example: Handling Corrupt Data in CSV File
schema = StructType().add(StructField("station_id", IntegerType(), nullable=True)) \
.add(StructField("name", StringType(), nullable=True)) \
.add(StructField("lat", FloatType(), nullable=True)) \
.add(StructField("long", FloatType(), nullable=True)) \
.add(StructField("dockcount", StringType(), nullable=True)) \
.add(StructField("landmark", StringType(), nullable=True)) \
.add(StructField("installation", StringType(), nullable=True)) \
.add(StructField("columnNameOfCorruptRecord", IntegerType(), nullable=True)) \
df = spark.read.format("csv") \
.schema(schema) \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.load("/FileStore/tables/201508_station_data.csv")
df.display()
# Filter out corrupt records
df_clean = df_permissive.filter(isnull(col("_corrupt_record")))
# Show the cleaned DataFrame
print("Cleaned DataFrame without corrupt records:")
df_clean.show(truncate=False)
# Extract corrupt records for inspection
df_corrupt = df_permissive.filter(col("_corrupt_record").isNotNull())
# Show the corrupt records
print("Corrupt records:")
df_corrupt.select("_corrupt_record").show(truncate=False)
Using the badRecordsPath option allows you to automatically redirect corrupt or malformed records to a specified path when reading data with Spark. This feature is particularly useful when dealing with large datasets, as it helps in segregating bad records from the good ones without needing extensive custom filtering and handling.
# Path to the data
path_to_csv = "path/to/your/file.csv"
# Path to store bad records
bad_records_path = "path/to/store/bad_records"
# Read the CSV file with PERMISSIVE mode and badRecordsPath
df = spark.read.format("csv") \
.option("header", "true") \
.option("mode", "PERMISSIVE") \
.option("badRecordsPath", bad_records_path) \
.schema(schema) \
.load(path_to_csv)
# Show the DataFrame
df.show(truncate=False)
# Read bad records from the badRecordsPath
# bad records store in json format
bad_records_df = spark.read.format("json").load(bad_records_path)
# Show the bad records DataFrame
print("Bad records:")
bad_records_df.show(truncate=False)