Build machine learning-ready datasets from the Amazon SageMaker offline Feature Store using the Amazon SageMaker Python SDK
Amazon SageMaker Feature Store is a purpose-built service to store and retrieve feature data for use by machine learning (ML) models. Feature Store provides an online store capable of low-latency, high-throughput reads and writes, and an offline store that provides bulk access to all historical record data. Feature Store handles the synchronization of data between the online and offline stores.
Because model development is an iterative process, customers will frequently query the offline store and build various datasets for model training. Currently, there are several ways to access features in the offline store, including running SQL queries with Amazon Athena or using Spark SQL in Apache Spark. However, these patterns require writing ad hoc (and sometimes complex) SQL statements, which isn’t always suitable for the data scientist persona.
Feature Store recently extended the SageMaker Python SDK to make it easier to create datasets from the offline store. With this release, you can use a new set of methods in the SDK to create datasets without writing SQL queries. These new methods support common operations such as time travel, filtering duplicate records, and joining multiple feature groups while ensuring point-in-time accuracy.
In this post, we demonstrate how to use the SageMaker Python SDK to build ML-ready datasets without writing any SQL statements.
Solution overview
To demonstrate the new functionality, we work with two datasets: leads and web marketing metrics. These datasets can be used to build a model that predicts if a lead will convert into a sale given marketing activities and metrics captured for that lead.
The leads data contains information on prospective customers who are identified using Lead_ProspectID
. The features for a lead (for example, LeadSource
) can be updated over time, which results in a new record for that lead. The Lead_EventTime
represents the time in which each record is created. The following screenshot shows an example of this data.
The web marketing metrics data tracks the engagement metrics for a lead, where each lead is identified using the Web_ProspectID
. The Web_EventTime
represents the time in which the record was created. Unlike the leads feature group, there is only one record per lead in this feature group. The following screenshot shows an example of this data.
We walk through the key parts of the sagemaker-feature-store-offline-sdk.ipynb
notebook, which demonstrates the following steps:
- Create a dataset from a feature group.
- Join multiple feature groups.
- Create a point-in-time join between a feature group and a dataset based on a set of events at specific timestamps.
- Retrieve feature history within a specific time range.
- Retrieve features as of a specific timestamp.
Prerequisites
You need the following prerequisites:
- An AWS account.
- A SageMaker Jupyter notebook instance. Access the code from the GitHub repository and upload it to your notebook instance.
- You can also run the notebook in an Amazon SageMaker Studio environment, which is an IDE for ML development. You can clone the GitHub repo via a terminal inside the Studio environment using the following command:
We assume a feature group for the leads data has been created using the existing FeatureGroup.create
method, and can be referenced using the variable base_fg
. For more information on feature groups, refer to Create Feature Groups.
Create a dataset from a feature group
To create a dataset using the SageMaker SDK, we use the new FeatureStore
class, which contains the create_dataset
method. This method accepts a base feature group that may be joined with other feature groups or DataFrames. We start by providing the leads feature group as the base and an Amazon Simple Storage Service (Amazon S3) path to store the dataset:
The create_dataset
method returns a DatasetBuilder
object, which can be used to generate a dataset from one or multiple feature groups (which we demonstrate in the next section). To create a simple dataset consisting of only the leads features, we invoke the to_csv_file
method. This runs a query in Athena to retrieve the features from the offline store, and saves the results to the specified S3 path.
Join multiple feature groups
With the SageMaker SDK, you can easily join multiple feature groups to build a dataset. You can also perform join operations between an existing Pandas DataFrame to a single or multiple feature groups. The base feature group is an important concept for joins. The base feature group is the feature group that has other feature groups or the Pandas DataFrame joined to it.
While creating the dataset using the create_dataset
function, we use the with_feature_group
method, which performs an inner join between the base feature group and another feature group using the record identifier and the target feature name in the base feature group. In our example, the base feature group is the leads feature group, and the target feature group is the web marketing feature group. The with_feature_group
method accepts the following arguments:
- feature_group – This is the feature group we are joining with. In our code sample, the target feature group is created by using the web marketing dataset.
- target_feature_name_in_base – The name of the feature in the base feature group that we’re using as a key in the join. We use
Lead_ProspectID
as the record identifier for the base feature group. - included_feature_names – This is the list of the feature names of the base feature group. We use this field to specify the features that we want to include in the dataset.
The following code shows an example of creating a dataset by joining the base feature group with the target feature group:
You can extend the join operations to include multiple feature groups by adding the with_feature_group
method at the end of the preceding code example and defining the required arguments for the new feature group. You can also perform join operations with an existing DataFrame by defining the base to be your existing Pandas DataFrame and joining with the interested feature groups. The following code sample shows how to create dataset with an existing Pandas DataFrame and an existing feature group:
For more examples on these various configurations, refer to Create a Dataset from your Feature Groups.
Create a point-in-time join
One of the most powerful capabilities of this enhancement is to perform point-in-time joins simply and without the need to write complex SQL code. When building ML models, data scientists need to avoid data leakage or target leakage, which is accidentally using data during model training that wouldn’t be available at the time of prediction. For instance, if we’re trying to predict credit card fraud, we should exclude transactions that arrive after the fraudulent charge we’re trying to predict, otherwise the trained model could use this post-fraud information to alter the model, making it generalize less well.
Retrieval of point-in-time accurate feature data requires you to supply an entity DataFrame that provides a set of record IDs (or primary key) and corresponding event times that serve as the cutoff time for the event. This retrieval mechanism is sometimes referred to as row-level time travel, because it allows a different time constraint to be applied for each row key. To perform point-in-time joins with the SageMaker SDK, we use the Dataset Builder class and provide the entity DataFrame as the base argument to the constructor.
In the following code, we create a simple entity DataFrame with two records. We set the event times, used to indicate the cutoff time, near the middle of the time series data (mid-January 2023):
When we use the point_in_time_accurate_join
functionality with the create_dataset
call, the internal query excludes all records with timestamps later then the cutoff times supplied, returning the latest feature values that would have been available at the time of the event:
Notice that there are only two records in the DataFrame returned by the point-in-time join. This is because we only submitted two record IDs in the entity DataFrame, one for each Lead_ProspectID
we want to retrieve. The point-in-time criteria specifies that a record’s event time (stored in the Lead_Eventtime
field) must contain a value that is less than the cutoff time.
Additionally, we instruct the query to retrieve only the latest record that meets this criteria because we have applied the with_number_of_recent_records_by_record_identifier
method. When used in conjunction with the point_in_time_accurate_join
method, this allows the caller to specify how many records to return from those that meet the point-in-time join criteria.
Compare point-in-time join results with Athena query results
To verify the output returned by the SageMaker SDK point_in_time_accurate_join
function, we compare it to the result of an Athena query. First, we create a standard Athena query using a SELECT statement tied to the specific table created by the Feature Store runtime. This table name can be found by referencing the table_name
field after instantiating the athena_query
from the FeatureGroup
API:
The Athena query doesn’t contain any point-in-time join semantics, so it returns all records that match the specified record_id
(Lead_ProspectID
).
Next, we use the Pandas library to sort the Athena results by event times for easy comparison. The records with timestamps later than the event times specified in the entity DataFrame (for example, 2023-01-15T00:00:00Z
) submitted to the point_in_time_accurate_join
don’t show up in the point-in-time results. Because we additionally specified that we only want a single record from the preceding create_dataset
code, we only get the latest record prior to the cutoff time. By comparing the SageMaker SDK results with the Athena query results, we see that the point-in-time join function returned the proper records.
Therefore, we have confidence that we can use the SageMaker SDK to perform row-level time travel and avoid target leakage. Furthermore, this capability works across multiple feature groups that may be refreshed on completely different timelines.
Retrieve feature history within a specific time range
We also want to demonstrate the use of specifying a time range window when joining the feature groups to form a dataset. The time window is defined using with_event_time_range
, which accepts two inputs, starting_timestamp
and ending_timestamp
, and returns a dataset builder object. In our code sample, we set the retrieval time window for 1 full day from 2022-07-01 00:00:00
until 2022-07-02 00:00:00
.
The following code shows how to create a dataset with the specified event time window while joining the base feature group with the target feature group:
We also confirm the difference between the sizes of the dataset created using with_event_time_range
by exporting to a Pandas DataFrame with the to_dataframe()
method and displaying the data. Notice how the result set has only a fraction of the original 10,020 records, because it only retrieves records whose event_time
is within the 1-day time period.
Retrieve features as of a specific timestamp
The DatasetBuilder as_of
method retrieves features from a dataset that meet a timestamp-based constraint, which the caller provides as an argument to the function. This mechanism is useful for scenarios such as rerunning experiments on previously collected data, backtesting time series models, or building a dataset from a previous state of the offline store for data auditing purposes. This functionality is sometimes referred to as time travel because it essentially rolls back the data store to an earlier date and time. This time constraint is also referred to as the cutoff timestamp.
In our sample code, we first create the cutoff timestamp by reading the write_time
value for the last record written to the Feature Store, the one written with put_record
. Then we provide this cutoff timestamp to the DatasetBuilder
as an argument to the as_of
method:
It’s important to note that the as_of
method applies the time constraint to the internal write_time
field, which is automatically generated by Feature Store. The write_time
field represents the actual timestamp when the record is written to the data store. This is different than other methods like point-in-time-accurate-join
and with_event_time_range
that use the client-provided event_time
field as a comparator.
Clean up
Be sure to delete all the resources created as part of this example to avoid incurring ongoing charges. This includes the feature groups and the S3 bucket containing the offline store data.
SageMaker Python SDK experience vs. writing SQL
The new methods in the SageMaker Python SDK allow you to quickly create datasets and move to the training step quickly during the ML lifecycle. To show the time and effort that can be saved, let’s examine a use case where we need to join two feature groups while retrieving the features within a specified time frame. The following figure compares the Python queries on the offline Feature Store vs. SQL used to create the dataset behind a Python query.
As you can see, the same operation of joining two feature groups requires you to create a long, complex SQL query, whereas it can be accomplished using just the with_feature_group
and with_event_time_range
methods in the SageMaker Python SDK.
Conclusion
The new offline store methods in the Python SageMaker SDK allow you to query your offline features without having to write complex SQL statements. This provides a seamless experience for customers who are accustomed to writing Python code during model development. For more information about feature groups, refer to Create a Dataset From Your Feature Groups and Feature Store APIs: Feature Group.
The full example in this post can be found in the GitHub repository. Give it a try and let us know your feedback in the comments.
About the Authors
Paul Hargis has focused his efforts on machine learning at several companies, including AWS, Amazon, and Hortonworks. He enjoys building technology solutions and teaching people how to leverage them. Paul likes to help customers expand their machine learning initiatives to solve real-world problems. Prior to his role at AWS, he was lead architect for Amazon Exports and Expansions, helping amazon.com improve the experience for international shoppers.
Mecit Gungor is an AI/ML Specialist Solution Architect at AWS helping customers design and build AI/ML solutions at scale. He covers a wide range of AI/ML use cases for Telecommunication customers and currently focuses on Generative AI, LLMs, and training and inference optimization. He can often be found hiking in the wilderness or playing board games with his friends in his free time.
Tony Chen is a Machine Learning Solutions Architect at AWS, helping customers design scalable and robust machine learning capabilities in the cloud. As a former data scientist and data engineer, he leverages his experience to help tackle some of the most challenging problems organizations face with operationalizing machine learning.
Sovik Kumar Nath is an AI/ML solution architect with AWS. He has extensive experience in end-to-end designs and solutions for machine learning; business analytics within financial, operational, and marketing analytics; healthcare; supply chain; and IoT. Outside work, Sovik enjoys traveling and watching movies.
Leave a Reply