Load and transform data from Delta Lake using Amazon SageMaker Studio and Apache Spark

Data lakes have become the norm in the industry for storing critical business data. The primary rationale for a data lake is to land all types of data, from raw data to preprocessed and postprocessed data, and may include both structured and unstructured data formats. Having a centralized data store for all types of data allows modern big data applications to load, transform, and process whatever type of data is needed. Benefits include storing data as is without the need to first structure or transform it. Most importantly, data lakes allow controlled access to data from many different types of analytics and machine learning (ML) processes in order to guide better decision-making.

Multiple vendors have created data lake architectures, including AWS Lake Formation. In addition, open-source solutions allow companies to access, load, and share data easily. One of the options for storing data in the AWS Cloud is Delta Lake. The Delta Lake library enables reads and writes in open-source Apache Parquet file format, and provides capabilities like ACID transactions, scalable metadata handling, and unified streaming and batch data processing. Delta Lake offers a storage layer API that you can use to store data on top of an object-layer storage like Amazon Simple Storage Service (Amazon S3).

Data is at the heart of ML—training a traditional supervised model is impossible without access to high-quality historical data, which is commonly stored in a data lake. Amazon SageMaker is a fully managed service that provides a versatile workbench for building ML solutions and provides highly tailored tooling for data ingestion, data processing, model training, and model hosting. Apache Spark is a workhorse of modern data processing with an extensive API for loading and manipulating data. SageMaker has the ability to prepare data at petabyte scale using Spark to enable ML workflows that run in a highly distributed manner. This post highlights how you can take advantage of the capabilities offered by Delta Lake using Amazon SageMaker Studio.

Solution overview

In this post, we describe how to use SageMaker Studio notebooks to easily load and transform data stored in the Delta Lake format. We use a standard Jupyter notebook to run Apache Spark commands that read and write table data in CSV and Parquet format. The open-source library delta-spark allows you to directly access this data in its native format. This library allows you to take advantage of the many API operations to apply data transformations, make schema modifications, and use time-travel or as-of-timestamp queries to pull a particular version of the data.

In our sample notebook, we load raw data into a Spark DataFrame, create a Delta table, query it, display audit history, demonstrate schema evolution, and show various methods for updating the table data. We use the DataFrame API from the PySpark library to ingest and transform the dataset attributes. We use the delta-spark library to read and write data in Delta Lake format and to manipulate the underlying table structure, referred to as the schema.

We use SageMaker Studio, the built-in IDE from SageMaker, to create and run Python code from a Jupyter notebook. We have created a GitHub repository that contains this notebook and other resources to run this sample on your own. The notebook demonstrates exactly how to interact with data stored in Delta Lake format, which permits tables to be accessed in-place without the need to replicate data across disparate datastores.

For this example, we use a publicly available dataset from Lending Club that represents customer loans data. We downloaded the accepted data file (accepted_2007_to_2018Q4.csv.gz), and selected a subset of the original attributes. This dataset is available under the Creative Commons (CCO) License.

Prerequisites

You must install a few prerequisites prior to using the delta-spark functionality. To satisfy required dependencies, we have to install some libraries into our Studio environment, which runs as a Dockerized container and is accessed via a Jupyter Gateway app:

  • OpenJDK for access to Java and associated libraries
  • PySpark (Spark for Python) library
  • Delta Spark open-source library

We can use either conda or pip to install these libraries, which are publicly available in either conda-forge, PyPI servers, or Maven repositories.

This notebook is designed to run within SageMaker Studio. After you launch the notebook within Studio, make sure you choose the Python 3(Data Science) kernel type. Additionally, we suggest using an instance type with at least 16 GB of RAM (like ml.g4dn.xlarge), which allows PySpark commands to run faster. Use the following commands to install the required dependencies, which make up the first several cells of the notebook:

%conda install openjdk -q -y
%pip install pyspark==3.2.0
%pip install delta-spark==1.1.0
%pip install -U "sagemaker>2.72"

After the installation commands are complete, we’re ready to run the core logic in the notebook.

Implement the solution

To run Apache Spark commands, we need to instantiate a SparkSession object. After we include the necessary import commands, we configure the SparkSession by setting some additional configuration parameters. The parameter with key spark.jars.packages passes the names of additional libraries used by Spark to run delta commands. The following initial lines of code assemble a list of packages, using traditional Maven coordinates (groupId:artifactId:version), to pass these additional packages to the SparkSession.

Additionally, the parameters with key spark.sql.extensions and spark.sql.catalog.spark_catalog enable Spark to properly handle Delta Lake functionality. The final configuration parameter with key fs.s3a.aws.credentials.provider adds the ContainerCredentialsProvider class, which allows Studio to look up the AWS Identity and Access Management (IAM) role permissions made available via the container environment. The code creates a SparkSession object that is properly initialized for the SageMaker Studio environment:

# Configure Spark to use additional library packages to satisfy dependencies

# Build list of packages entries using Maven coordinates (groupId:artifactId:version)
pkg_list = []
pkg_list.append("io.delta:delta-core_2.12:1.1.0")
pkg_list.append("org.apache.hadoop:hadoop-aws:3.2.2")

packages=(",".join(pkg_list))
print('packages: '+packages)

# Instantiate Spark via builder
# Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions

spark = (SparkSession
    .builder
    .appName("PySparkApp") 
    .config("spark.jars.packages", packages) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .config("fs.s3a.aws.credentials.provider", 
"com.amazonaws.auth.ContainerCredentialsProvider") 
    .getOrCreate()) 

sc = spark.sparkContext

print('Spark version: '+str(sc.version))

In the next section, we upload a file containing a subset of the Lending Club consumer loans dataset to our default S3 bucket. The original dataset is very large (over 600 MB), so we provide a single representative file (2.6 MB) for use by this notebook. PySpark uses the s3a protocol to enable additional Hadoop library functionality. Therefore, we modify each native S3 URI from the s3 protocol to use s3a in the cells throughout this notebook.

We use Spark to read in the raw data (with options for both CSV or Parquet files) with the following code, which returns a Spark DataFrame named loans_df:

loans_df = spark.read.csv(s3a_raw_csv, header=True)

The following screenshot shows the first 10 rows from the resulting DataFrame.

We can now write out this DataFrame as a Delta Lake table with a single line of code by specifying .format("delta") and providing the S3 URI location where we want to write the table data:

loans_df.write.format("delta").mode("overwrite").save(s3a_delta_table_uri)

The next few notebook cells show an option for querying the Delta Lake table. We can construct a standard SQL query, specify delta format and the table location, and submit this command using Spark SQL syntax:

sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY loan_amnt'
sql_results = spark.sql(sql_cmd)

The following screenshot shows the results of our SQL query as ordered by loan_amnt.

Interact with Delta Lake tables

In this section, we showcase the DeltaTable class from the delta-spark library. DeltaTable is the primary class for programmatically interacting with Delta Lake tables. This class includes several static methods for discovering information about a table. For example, the isDeltaTable method returns a Boolean value indicating whether the table is stored in delta format:

# Use static method to determine table type
print(DeltaTable.isDeltaTable(spark, s3a_delta_table_uri))

You can create DeltaTable instances using the path of the Delta table, which in our case is the S3 URI location. In the following code, we retrieve the complete history of table modifications:

deltaTable = DeltaTable.forPath(spark, s3a_delta_table_uri)
history_df = deltaTable.history()
history_df.head(3)

The output indicates the table has six modifications stored in the history, and shows the latest three versions.

Schema evolution

In this section, we demonstrate how Delta Lake schema evolution works. By default, delta-spark forces table writes to abide by the existing schema by enforcing constraints. However, by specifying certain options, we can safely modify the schema of the table.

First, let’s read data back in from the Delta table. Because this data was written out as delta format, we need to specify .format("delta") when reading the data, then we provide the S3 URI where the Delta table is located. Second, we write the DataFrame back out to a different S3 location where we demonstrate schema evolution. See the following code:

delta_df = (spark.read.format("delta").load(s3a_delta_table_uri))
delta_df.write.format("delta").mode("overwrite").save(s3a_delta_update_uri)

Now we use the Spark DataFrame API to add two new columns to our dataset. The column names are funding_type and excess_int_rate, and the column values are set to constants using the DataFrame withColumn method. See the following code:

funding_type_col = "funding_type"
excess_int_rate_col = "excess_int_rate"

delta_update_df = (delta_df.withColumn(funding_type_col, lit("NA"))
                           .withColumn(excess_int_rate_col, lit(0.0)))
delta_update_df.dtypes

A quick look at the data types (dtypes) shows the additional columns are part of the DataFrame.

Now we enable the schema modification, thereby changing the underlying schema of the Delta table, by setting the mergeSchema option to true in the following Spark write command:

(delta_update_df.write.format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true") # option - evolve schema
 .save(s3a_delta_update_uri)
)

Let’s check the modification history of our new table, which shows that the table schema has been modified:

deltaTableUpdate = DeltaTable.forPath(spark, s3a_delta_update_uri)

# Let's retrieve history BEFORE schema modification
history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

The history listing shows each revision to the metadata.

Conditional table updates

You can use the DeltaTable update method to run a predicate and then apply a transform whenever the condition evaluates to True. In our case, we write the value FullyFunded to the funding_type column whenever the loan_amnt equals the funded_amnt. This is a powerful mechanism for writing conditional updates to your table data.

deltaTableUpdate.update(condition = col("loan_amnt") == col("funded_amnt"),
    set = { funding_type_col: lit("FullyFunded") } )

The following screenshot shows our results.

In the final change to the table data, we show the syntax to pass a function to the update method, which in our case calculates the excess interest rate by subtracting 10.0% from the loan’s int_rate attribute. One more SQL command pulls records that meet our condition, using the WHERE clause to locate records with int_rate greater than 10.0%:

# Function that calculates rate overage (amount over 10.0)
def excess_int_rate(rate):
    return (rate-10.0)

deltaTableUpdate.update(condition = col("int_rate") > 10.0,
 set = { excess_int_rate_col: excess_int_rate(col("int_rate")) } )

The new excess_int_rate column now correctly contains the int_rate minus 10.0%.

Our last notebook cell retrieves the history from the Delta table again, this time showing the modifications after the schema modification has been performed:

# Finally, let's retrieve table history AFTER the schema modifications

history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

The following screenshot shows our results.

Conclusion

You can use SageMaker Studio notebooks to interact directly with data stored in the open-source Delta Lake format. In this post, we provided sample code that reads and writes this data using the open source delta-spark library, which allows you to create, update, and manage the dataset as a Delta table. We also demonstrated the power of combining these critical technologies to extract value from preexisting data lakes, and showed how to use the capabilities of Delta Lake on SageMaker.

Our notebook sample provides an end-to-end recipe for installing prerequisites, instantiating Spark data structures, reading and writing DataFrames in Delta Lake format, and using functionalities like schema evolution. You can integrate these technologies to magnify their power to provide transformative business outcomes.


About the Authors

Paul Hargis has focused his efforts on Machine Learning at several companies, including AWS, Amazon, and Hortonworks. He enjoys building technology solutions and also teaching people how to make the most of it. Prior to his role at AWS, he was lead architect for Amazon Exports and Expansions helping amazon.com improve experience for international shoppers. Paul likes to help customers expand their machine learning initiatives to solve real-world problems.

Vedant Jain is a Sr. AI/ML Specialist Solutions Architect, helping customers derive value out of the Machine Learning ecosystem at AWS. Prior to joining AWS, Vedant has held ML/Data Science Specialty positions at various companies such as Databricks, Hortonworks (now Cloudera) & JP Morgan Chase. Outside of his work, Vedant is passionate about making music, using Science to lead a meaningful life & exploring delicious vegetarian cuisine from around the world.

View Original Source (aws.amazon.com) Here.

Leave a Reply

Your email address will not be published. Required fields are marked *

Shared by: AWS Machine Learning

Tags: