Apache Spark DataKickstart: First PySpark Application

Get hands on with Python and PySpark to build your first data pipeline. In this video I walk you through how to read, transform, and write the NYC Taxi dataset which can be found on Databricks, Azure Synapse, or downloaded from the web to wherever you run Apache Spark. Once you have watched and followed along with this tutorial, go find a free dataset and try to write your own PySpark application. Pro tip: Search for the Spark equivalent of functions you use in other programming languages (including SQL). Many will exist in the pyspark.sql.functions module.

Example code

The code used in this part of the course can be found at https://github.com/datakickstart/ApacheSparkDataKickstart.

To highlight a few code segments, here is how you can download NYC Taxi data locally (tested on Windows Subsystem for Linux). NOTE: To use with Mac you may need to install wget, which you can install using homebrew.

mkdir -p /tmp/datasets/nyctaxi/taxizone
cd /tmp/datasets/nyctaxi/taxizone
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
mv taxi+_zone_lookup.csv taxi_zone_lookup.csv

mkdir -p /tmp/datasets/nyctaxi/tables/nyctaxi_yellow
cd /tmp/datasets/nyctaxi/tables/nyctaxi_yellow
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet

Once you have the data locally you can run these PySpark commands using the pyspark repl (which gets installed when you run pip install pyspark or can be setup in other ways).

#Yellow Trip Read

trip_format = "parquet"

trip_input_df = (spark.read
      .format(trip_format)
      .load("/tmp/datasets/nyctaxi/tables/nyctaxi_yellow"))

trip_input_df.show()

#Zone Read
zone_df = (spark.read
  .option("header", "true")
  .csv("/tmp/datasets/nyctaxi/taxizone/taxi_zone_lookup.csv")
)

zone_df.show()

from pyspark.sql.functions import col, substring, to_date, regexp_replace

trip_df = (trip_input_df
    .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_'))
    .withColumn("pickup_dt", to_date("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")) 
    .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("tip_pct", col("tip_amount") / col("total_amount"))
    .limit(1000)
)

full_df = (trip_df
  .join(zone_df,
        trip_df.PULocationID == zone_df.LocationID,
        how="left")
  .drop("LocationID")
)

(full_df.write

Conclusion

This page is meant as a resource to go with the YouTube video. I may add to this as questions come up, so stay tuned. And more importantly, keep an eye out for the next videos in this series.

1 Comment

Leave a Reply