Back to all posts

How to Read and Write CSV file into DataFrame by using Pyspark

PySpark Read CSV File into DataFrame: reading CSV files from disk using PySpark offers a versatile and efficient approach to data ingestion and processing.…

PySpark Read CSV File into DataFrame: reading CSV files from disk using PySpark offers a versatile and efficient approach to data ingestion and processing. you have learned the importance of specifying options such as schema, delimiter, and header handling to ensure accurate DataFrame creation. Also, you learned to read a CSV file multiple csv files, all files from a folder e.t.c

Bash
df = spark.read.csv(path="/content/drive/MyDrive/MyPC/PySpark/Data/zipcodes.csv",header="True",inferSchema="True")
df.printSchema()

# or...

df = spark.read.format("csv").load("/content/drive/MyDrive/MyPC/PySpark/Data/zipcodes.csv",header="True",inferSchema="True")
df.show(5)
SQL
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType

# Using custom schema
Schema = StructType([ \
            StructField("Xaxis",FloatType(),True), \
            StructField("Yaxis",FloatType(),True), \
            StructField("Country",StringType(),True) \
                      ])

# OR...

Schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)

df = spark.read.schema(Schema).csv("/content/drive/MyDrive/MyPC/PySpark/Data/zipcodes.csv",header="True")
df.printSchema()

Bash
# Read multiple CSV files
df = spark.read.csv("path/file1.csv,path/file2.csv,path/file3.csv")

# Read all files from a directory
df = spark.read.csv("Folder path")

Reading CSV File Options: PySpark CSV dataset provides multiple options to work with CSV files. Below are some of the most important options explained with examples.

Bash
# Using header option
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("/path/zipcodes.csv")
Python
# Save DataFrame to CSV File
df.write.option("header",True) \
 .csv("/tmp/spark_output/zipcodes")

Here are some commonly used options:

  1. header: Specifies whether to include a header row with column names in the CSV file. Example: option("header", "true").
  2. delimiter: Specifies the delimiter to use between fields in the CSV file. Example: option("delimiter", ",").
  3. quote: Specifies the character used for quoting fields in the CSV file. Example: option("quote", "\"").
  4. escape: Specifies the escape character used in the CSV file. Example: option("escape", "\\").
  5. nullValue: Specifies the string to represent null values in the CSV file. Example: option("nullValue", "NA").
  6. dateFormat: Specifies the date format to use for date columns. Example: option("dateFormat", "yyyy-MM-dd").
  7. mode: Specifies the write mode for the output. Options include “overwrite”, “append”, “ignore”, and “error”. Example: option("mode", "overwrite").
  8. compression: Specifies the compression codec to use for the output file. Example: option("compression", "gzip").

Keep building your data skillset

Explore more SQL, Python, analytics, and engineering tutorials.