Back to all posts

How to Read and Write file into DataFrame by using Pyspark

# dataframe reader API.... spark.read.format("") \ .option("key":"value") \ .schema(schemavariable) \ .load() # dataframe write API...... spark.write.mode(…

CSS
# dataframe reader API....
spark.read.format("") \
          .option("key":"value") \
          .schema(schemavariable) \
          .load()

# dataframe write API......
spark.write.mode().format("") \
          .option("key":"value") \
          .schema(schemavariable) \
          .load()

When reading data in PySpark, you can specify how to handle corrupt or malformed records using different modes: failfast, dropmalformed, and permissive. These modes determine the behaviour when encountering bad records in the data source.

Modes for Handling Corrupt Records

1.failfast

  • Behavior: Immediately fails when encountering corrupt records.
  • Use Case: Use this mode when data quality is critical and you want to ensure no bad records are processed.

2.dropmalformed

  • Behavior: Drops all rows containing corrupt records.
  • Use Case: Use this mode when you want to process only clean records and discard any bad ones.

3.permissive (default)

  • Behavior: When encountering corrupt records, includes them in a special column (_corrupt_record) and processes the rest of the data.
  • Use Case: Use this mode when you want to keep track of corrupt records and still process the rest of the data.
Bash
.option("mode", "FAILFAST")
# or
.option("mode", "DROPMALFORMED")
# or
.option("mode", "PERMISSIVE")

Spark SQL offers spark.read.json("path") for efficiently parsing both single-line and multiline JSON files into Spark DataFrames. Conversely, you can employ dataframe.write.json("path") to seamlessly store DataFrame content into JSON format at the specified path.

Python
dfJson = spark.read.json("/content/drive/MyDrive/MyPC/PySpark/zipcodes.json")
#or....
dfJson = spark.read.format("json").load("/content/drive/MyDrive/MyPC/PySpark/zipcodes.json")
dfJson.show(5)

#Read multiple line

spark.read.option("multiline", "true")  \
.json("/content/drive/MyDrive/MyPC/PySpark/multiline-zipcode.json").show()

Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame.

Spark SQL provides StructType & StructField classes to programmatically specify the structure to the DataFrame.

Python
JsonSchema = 'City string, Country string'

# or...

Schema = StructType([ \
    StructField("City",StringType(),True), \
    StructField("Country",StringType(),True)
  ])
Bash
dfJson = spark.read.schema(JsonSchema).json("/content/drive/MyDrive/MyPC/PySpark/JsonFile/zipcodes.json")
dfJson.show(5)
dfJson.printSchema()
Bash
dfJson.write.mode("overwrite").json("/content/drive/MyDrive/MyPC/PySpark/JsonFile/Ouput/zipcodes.json")

Note: Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists', 'default'.

overwrite – mode is used to overwrite the existing file, alternatively, you can use Overwrite.

append – To add the data to the existing file, alternatively, you can use Append.

ignore – Ignores write operation when the file already exists, alternatively you can use Ignore.

errorifexists or error – This is a default option when the file already exists, it returns an error, alternatively, you can use ErrorIfExists.

Reading Multiple Files at a Time: you can also read multiple JSON files from different paths, just pass all file names with fully qualified paths by separating comma

Java
//read multiple files
val df2 = spark.read.json(
     "src/main/resources/zipcodes_streaming/zipcode1.json",
     "src/main/resources/zipcodes_streaming/zipcode2.json")
df2.show(false)

Reading all Files in a Directory: We can read all JSON files from a directory into DataFrame just by passing directory as a path to the json() method. Below snippet, “zipcodes_streaming” is a folder that contains multiple JSON files.

PHP
//read all files from a folder
val df3 = spark.read.json("src/main/resources/zipcodes_streaming")
df3.show(false)

# if folder have multiple file format than used *.json

val df3 = spark.read.json("src/main/resources/zipcodes_streaming*.json")
df3.show(false)

Keep building your data skillset

Explore more SQL, Python, analytics, and engineering tutorials.