Get hands on with Python and PySpark to build your first data pipeline. In this video I walk you through how to read, transform, and write the NYC Taxi dataset which can be found on Databricks, Azure Synapse, or downloaded from the web to wherever you run Apache Spark. Once you have watched and followed along with this tutorial, go find a free dataset and try to write your own PySpark application. Pro tip: Search for the Spark equivalent of functions you use in other programming languages (including SQL). Many will exist in the pyspark.sql.functions module.
The code used in this part of the course can be found at https://github.com/datakickstart/ApacheSparkDataKickstart.
To highlight a few code segments, here is how you can download NYC Taxi data locally (tested on Windows Subsystem for Linux). NOTE: To use with Mac you may need to install
wget, which you can install using homebrew.
mkdir -p /tmp/datasets/nyctaxi/taxizone cd /tmp/datasets/nyctaxi/taxizone wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv mv taxi+_zone_lookup.csv taxi_zone_lookup.csv mkdir -p /tmp/datasets/nyctaxi/tables/nyctaxi_yellow cd /tmp/datasets/nyctaxi/tables/nyctaxi_yellow wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet
Once you have the data locally you can run these PySpark commands using the pyspark repl (which gets installed when you run
pip install pyspark or can be setup in other ways).
#Yellow Trip Read trip_format = "parquet" trip_input_df = (spark.read .format(trip_format) .load("/tmp/datasets/nyctaxi/tables/nyctaxi_yellow")) trip_input_df.show() #Zone Read zone_df = (spark.read .option("header", "true") .csv("/tmp/datasets/nyctaxi/taxizone/taxi_zone_lookup.csv") ) zone_df.show() from pyspark.sql.functions import col, substring, to_date, regexp_replace trip_df = (trip_input_df .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_')) .withColumn("pickup_dt", to_date("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")) .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss")) .withColumn("tip_pct", col("tip_amount") / col("total_amount")) .limit(1000) ) full_df = (trip_df .join(zone_df, trip_df.PULocationID == zone_df.LocationID, how="left") .drop("LocationID") ) (full_df.write
This page is meant as a resource to go with the YouTube video. I may add to this as questions come up, so stay tuned. And more importantly, keep an eye out for the next videos in this series.