Apache Spark DataKickstart: Read and Write with PySpark

Every Spark pipeline involves reading data from a data source or table. For data engineers we usually end the pipelines by writing the transformed data. In this tutorial we walk through some of the most common format and cloud storage locations for reading and writing with Spark. We’ll save some of the advanced Delta Lake capabilities for another tutorial.

The code used in this part of the course can be found at https://github.com/datakickstart/ApacheSparkDataKickstart.

Reads

Comma Separated Values (CSV)

First let’s look at reading from a Comma Separated Values file, or CSV for short. which is a file format that has been popular since I started my career. In this format the value for each column is separated by a comma, or possibly another separator like a tab or pipe character. Each row is on its own line, though sometimes we use a custom character as a row delimiter also.

csv_df = (
  spark.read
    .option("header","true")
    .option("inferSchema", "true")
    .csv("/databricks-datasets/nyctaxi/taxizone/taxi_zone_lookup.csv") 
 )

display(csv_df.limit(10))

JSON

JSON is a popular format for storing application data or sending data from across services. It’s popular because you can store complex nested structures yet it’s also human readable (though a bit much to read a whole file). For better performance you would want to define the schema up front but infer schema is easier to get started and fine when working with smaller files.

json_df = (
  spark.read
    .option("inferSchema", "true")
    .json("s3://amazon-berkeley-objects/listings/metadata/")
 )

display(json_df.limit(10))

Parquet

Next, let’s read from a parquet file. This is a popular format for analytics because of its good read performance for analytic queries. It uses columnar storage, compression, and file metadata to do efficient data skipping.

The parquet format was the best we had for years, but newer formats have been developed that add another layer on top of parquet for better performance and capabilities. One popular format is Delta Lake format. This still uses Parquet under the covers but adds a transaction log and some optimization capabilities. Let’s look at how easy it is to switch to reading from a Delta Lake table.

parquet_df = (
  spark.read
    .parquet("wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow/puYear=2018/puMonth=*/*.parquet")
)

display(parquet_df.limit(10))

Azure Storage

To read data from Azure Data Lake Storage (ADLS) or Microsoft OneLake you need to setup some credentials. Here is an example where we save some credentials as environment variables and set the proper spark configs to be able to read from ADLS.

def adls_authenticate(client_id, credential, directory_id):
  """
  Dependencies:`
    - Service principal is created and assigned permission
    - Secret scope created using Azure Key Vault or Databricks Secret Scope
    - Key values are added to the secret scope so that references from dbutils.secrets.get work properly
  """
  spark.conf.set("fs.azure.account.auth.type", "OAuth")
  spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
  spark.conf.set("fs.azure.account.oauth2.client.id", client_id)
  spark.conf.set("fs.azure.account.oauth2.client.secret", credential)
  spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/{0}/oauth2/token".format(directory_id))
import os

client_id = "cf903c7b-5fda-41e3-8302-3ad3033d2408" #os.getenv("ADLS_CLIENT_ID")
credential= os.getenv("ADLS_CREDENTIAL")
directory_id = os.getenv("ADLS_DIRECTORY_ID")

adls_authenticate(client_id, credential, directory_id)
delta_df = (
  spark.read
    .format("delta")
    .load("abfss://demo@datakickstartadls.dfs.core.windows.net/nyctaxi/tripdata/yellow_delta")
)

display(delta_df.limit(10))

For OneLake, the path would look a little different: “abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/nyctaxi/tripdata/yellow_delta”

AWS S3

When reading (or writing) to AWS S3 you have several options. For environments that support Instance Profile, that is the best way to set permissions. Though be aware that anyone who uses the cluster would be able to run with those permissions.

Option 1 (best): Assign Instance Profile to your cluster

Option 2: Set environment variables (works for Databricks)

AWS_SECRET_ACCESS_KEY={{secrets/scope/aws_secret_access_key}}
AWS_ACCESS_KEY_ID={{secrets/scope/aws_access_key_id}}

Option 3: Use S3A and spark properties, may have to add the correct package for this to work

spark.hadoop.fs.s3a.aws.credentials.provider <aws-credentials-provider-class>
spark.hadoop.fs.s3a.endpoint <aws-endpoint>
spark.hadoop.fs.s3a.server-side-encryption-algorithm SSE-KMS

Once one of these are configured you can read from an S3 bucket which is accessible by the role.

parquet_df = (
  spark.read
    .parquet("s3://dvannoy-private/nyc_taxi_yellow_parquet")
)
display(parquet_df.limit(10))

Google Cloud Storage

To read from Google Cloud Storage you can configure Spark configs similar to what you do for ADLS above. Let’s look at some example code.

def gcs_authenticate(project_id, email, private_key_id, private_key):
  """
  Dependencies:`
    - Service account is created and assigned permission
  """
  spark.conf.set("google.cloud.auth.service.account.enable", "true")
  spark.conf.set("fs.gs.auth.service.account.email", email)
  spark.conf.set("fs.gs.project.id", project_id)
  spark.conf.set("fs.gs.auth.service.account.private.key", private_key)
  spark.conf.set("fs.gs.auth.service.account.private.key.id", private_key_id)
project_id = os.getenv("GCS_PROJECT_ID")
email = os.getenv("GCS_EMAIL")
private_key_id = os.getenv("GCS_PRIVATE_KEY_ID")
private_key = os.getenv("GCS_PRIVATE_KEY")

gcs_authenticate(project_id, email, private_key_id, private_key)
parquet_df = (
  spark.read
    .parquet("gs://datakickstart-demo/nyctaxi/tripdata/yellow_parquet/yellow_tripdata_2019-01.parquet")
)
display(parquet_df.limit(10))

Read with JDBC or MSSQL connector

A common way to connect to a database is JDBC, though many cloud databases have their own special connectors as well. Here are a few examples of connecting to databases for reads.

import os

database = "StackOverflow2010"
db_host_name = "sandbox-2-sqlserver.database.windows.net"
db_url = f"jdbc:sqlserver://{db_host_name};databaseName={database}"
db_user = "dv_admin"
db_password = os.getenv("SQL_PWD")
  
table = "Users"

df = (
    spark.read
    .format("jdbc")
    .option("url", db_url)
    .option("dbtable", table)
    .option("user", db_user)
    .option("password", db_password)
    .load()
)

display(df.limit(10))
# On Databricks, need to add library for com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 and set secrets
database = "StackOverflow2010"
db_host_name = "sandbox-2-sqlserver.database.windows.net"
db_url = f"jdbc:sqlserver://{db_host_name};databaseName={database}"
db_user = "dv_admin"
db_password = os.getenv("SQL_PWD")

table = "Users"

sql = f"select Location, count(1) as record_count from {table} group by Location having count(1) > 50"

df = (
    spark.read
    .format("com.microsoft.sqlserver.jdbc.spark")
    .option("url", db_url)
#     .option("dbtable", table)
  .option("query", sql)
    .option("user", db_user)
    .option("password", db_password)
    .load()
)

display(df.limit(10))
user = "datakickstart"
url = os.getenv("SNOWFLAKE_URL")
password = os.getenv("SNOWFLAKE_PWD")
snowflake_database = "DATAKICKSTART"
snowflake_schema = "PUBLIC"
snowflake_cluster = "COMPUTE_WH"

# snowflake connection options
options = {
  "sfUrl": url,
  "sfUser": user,
  "sfPassword": password,
  "sfDatabase": snowflake_database,
  "sfSchema": snowflake_schema,
  "sfWarehouse": snowflake_cluster
}

df = spark.read \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "trips_dev") \
  .load()

display(df.limit(10))

Writes

Delta

delta_path = "dbfs:/demo/write_examples/items_delta"

(
  df.write
    .format("delta")
  .mode("overwrite")
    .save(delta_path)
)

CSV

csv_path = "dbfs:/demo/write_examples/items_csv"
(
  df.write
    .mode("overwrite")
    .option("header","true")
    .csv(csv_path) 
 )

Parquet

parquet_path = "dbfs:/demo/write_examples/items_parquet"

(
  df.write
    .mode("overwrite")
    .parquet(parquet_path)
)

JDBC

import os

database = "StackOverflow2010"
db_host_name = "sandbox-2-sqlserver.database.windows.net"
db_url = f"jdbc:sqlserver://{db_host_name};databaseName={database}"
db_user = "dv_admin"
db_password = os.getenv("SQL_PWD")
  
table = "items_test"

(
  df.write
    .mode("overwrite")
    .format("jdbc")
    .option("url", db_url)
    .option("dbtable", table)
    .option("user", db_user)
    .option("password", db_password)
    .save()
)

Snowflake

user = "datakickstart"
url = os.getenv("SNOWFLAKE_URL")
password = os.getenv("SNOWFLAKE_PWD")
snowflake_database = "DATAKICKSTART"
snowflake_schema = "PUBLIC"
snowflake_cluster = "COMPUTE_WH"

# snowflake connection options
options = {
  "sfUrl": url,
  "sfUser": user,
  "sfPassword": password,
  "sfDatabase": snowflake_database,
  "sfSchema": snowflake_schema,
  "sfWarehouse": snowflake_cluster
}

df.write \
  .format("snowflake") \
  .options(**options) \
  .option("dbtable", "ITEMS_TEST") \
  .save()

Conclusion

Hopefully the video and written summary gives you a good start on readnig and writing with Spark. You can see the pattern is similar but many options vary across different formats. Usually the specific documentation for your source or destination will be the most useful, but a quick google search often gets you to examples to help you along the way.

References

Databricks read from OneLake by Abe Pabbathi

Integrate OneLake with Azure Databricks

Connect to Azure Storage

Connect to AWS S3

Connect to Google Cloud Storage

1 Comment

Leave a Reply