# dataframe reader API....
spark.read.format("") \
.option("key":"value") \
.schema(schemavariable) \
.load()
# dataframe write API......
spark.write.mode().format("") \
.option("key":"value") \
.schema(schemavariable) \
.load()
When reading data in PySpark, you can specify how to handle corrupt or malformed records using different modes: failfast, dropmalformed, and permissive. These modes determine the behaviour when encountering bad records in the data source.
Modes for Handling Corrupt Records
1.failfast
- Behavior: Immediately fails when encountering corrupt records.
- Use Case: Use this mode when data quality is critical and you want to ensure no bad records are processed.
2.dropmalformed
- Behavior: Drops all rows containing corrupt records.
- Use Case: Use this mode when you want to process only clean records and discard any bad ones.
3.permissive (default)
- Behavior: When encountering corrupt records, includes them in a special column (
_corrupt_record) and processes the rest of the data. - Use Case: Use this mode when you want to keep track of corrupt records and still process the rest of the data.
.option("mode", "FAILFAST")
# or
.option("mode", "DROPMALFORMED")
# or
.option("mode", "PERMISSIVE")
Spark SQL offers spark.read.json("path") for efficiently parsing both single-line and multiline JSON files into Spark DataFrames. Conversely, you can employ dataframe.write.json("path") to seamlessly store DataFrame content into JSON format at the specified path.
dfJson = spark.read.json("/content/drive/MyDrive/MyPC/PySpark/zipcodes.json")
#or....
dfJson = spark.read.format("json").load("/content/drive/MyDrive/MyPC/PySpark/zipcodes.json")
dfJson.show(5)
#Read multiple line
spark.read.option("multiline", "true") \
.json("/content/drive/MyDrive/MyPC/PySpark/multiline-zipcode.json").show()
Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame.
Spark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.
JsonSchema = 'City string, Country string'
# or...
Schema = StructType([ \
StructField("City",StringType(),True), \
StructField("Country",StringType(),True)
])
dfJson = spark.read.schema(JsonSchema).json("/content/drive/MyDrive/MyPC/PySpark/JsonFile/zipcodes.json")
dfJson.show(5)
dfJson.printSchema()
dfJson.write.mode("overwrite").json("/content/drive/MyDrive/MyPC/PySpark/JsonFile/Ouput/zipcodes.json")
Note: Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists', 'default'.
overwrite – mode is used to overwrite the existing file, alternatively, you can use Overwrite.
append – To add the data to the existing file, alternatively, you can use Append.
ignore – Ignores write operation when the file already exists, alternatively you can use Ignore.
errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use ErrorIfExists.
Reading Multiple Files at a Time: you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma
//read multiple files
val df2 = spark.read.json(
"src/main/resources/zipcodes_streaming/zipcode1.json",
"src/main/resources/zipcodes_streaming/zipcode2.json")
df2.show(false)
Reading all Files in a Directory: We can read all JSON files from a directory into DataFrame just by passing directory as a path to the json() method. Below snippet, “zipcodes_streaming” is a folder that contains multiple JSON files.
//read all files from a folder
val df3 = spark.read.json("src/main/resources/zipcodes_streaming")
df3.show(false)
# if folder have multiple file format than used *.json
val df3 = spark.read.json("src/main/resources/zipcodes_streaming*.json")
df3.show(false)