One of the challenges in data science is getting access to operational or real-time data, which is often stored in operational database systems. Being able to connect data science tools to operational data easily and efficiently unleashes enormous potential for gaining insights from real-time data. In this post, we explore using Amazon SageMaker to analyze data stored in Amazon DocumentDB (with MongoDB compatibility).
For illustrative purposes, we use public event data from the GitHub API, which has a complex nested JSON format, and is well-suited for a document database such as Amazon DocumentDB. We use SageMaker to analyze this data, conduct descriptive analysis, and build a simple machine learning (ML) model to predict whether a pull request will close within 24 hours, before writing prediction results back into the database.
SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy ML models quickly. SageMaker removes the heavy lifting from each step of the ML process to make it easier to develop high-quality models.
Amazon DocumentDB is a fast, scalable, highly available, and fully managed document database service that supports MongoDB workloads. You can use the same MongoDB 3.6 application code, drivers, and tools to run, manage, and scale workloads on Amazon DocumentDB without having to worry about managing the underlying infrastructure. As a document database, Amazon DocumentDB makes it easy to store, query, and index JSON data.
Solution overview
In this post, we analyze GitHub events, examples of which include issues, forks, and pull requests. Each event is represented by the GitHub API as a complex, nested JSON object, which is a format well-suited for Amazon DocumentDB. The following code is an example of the output from a pull request event:
Amazon DocumentDB stores each JSON event as a document. Multiple documents are stored in a collection, and multiple collections are stored in a database. Borrowing terminology from relational databases, documents are analogous to rows, and collections are analogous to tables. The following table summarizes these terms.
Document Database Concepts
SQL Concepts
Document
Row
Collection
Table
Database
Database
Field
Column
We now implement the following Amazon DocumentDB tasks using SageMaker:
The CloudFormation stack provisions the following:
A VPC with three private subnets and one public subnet.
An Amazon DocumentDB cluster with three nodes, one in each private subnet. When creating an Amazon DocumentDB cluster in a VPC, its subnet group should have subnets in at least two Availability Zones in a given Region.
An AWS Secrets Manager secret to store login credentials for Amazon DocumentDB. This allows us to avoid storing plaintext credentials in our SageMaker instance.
A SageMaker role to retrieve the Amazon DocumentDB login credentials, allowing connections to the Amazon DocumentDB cluster from a SageMaker notebook.
A SageMaker instance to run queries and analysis.
A SageMaker instance lifecycle configuration to run a bash script every time the instance boots up, downloading a certificate bundle to create TLS connections to Amazon DocumentDB, as well as a Jupyter Notebook containing the code for this tutorial. The script also installs required Python libraries (such as pymongo for database methods and xgboost for ML modeling), so that we don’t need to install these libraries from the notebook. See the following code:
After we establish the connection to our Amazon DocumentDB cluster, we create a database and collection to store our GitHub event data. For this post, we name our database gharchive, and our collection events:
db_name = "gharchive" # name the database
collection_name = "events" # name the collection
db = client[db_name] # create a database
events = db[collection_name] # create a collection
Next, we need to download the data from gharchive.org, which has been aggregated into hourly archives with the following naming format:
https://data.gharchive.org/YYYY-MM-DD-H.json.gz
The aim of this analysis is to predict whether a pull request closes within 24 hours. For simplicity, we limit the analysis over two days: February 10–11, 2015. Across these two days, there were over 1 million GitHub events.
The following code downloads the relevant hourly archives, then formats and ingests the data into your Amazon DocumentDB database. It takes about 7 minutes to run on an ml.t3.xlarge instance.
# Specify target date and time range for GitHub events
year = 2015
month = 2
days = [10, 11]
hours = range(0, 24)
# Download data from gharchive.org and insert into Amazon DocumentDB
for day in days:
for hour in hours:
print(f"Processing events for {year}-{month}-{day}, {hour} hr.")
# zeropad values
month_ = str(month).zfill(2)
day_ = str(day).zfill(2)
# download data
url = f"https://data.gharchive.org/{year}-{month_}-{day_}-{hour}.json.gz"
response = requests.get(url, stream=True)
# decompress data
respdata = zlib.decompress(response.content, zlib.MAX_WBITS|32)
# format data
stringdata = respdata.split(b'n')
data = [json.loads(x) for x in stringdata if 0 < len(x)]
# ingest data
events.insert_many(data, ordered=True, bypass_document_validation=True)
The option ordered=False command allows the data to be ingested out of order. The bypass_document_validation=True command allows the write to skip validating the JSON input, which is safe to do because we validated the JSON structure when we issued the json.loads() command prior to inserting.
Both options expedite the data ingestion process.
Generating descriptive statistics
As is a common first step in data science, we want to explore the data to get some general descriptive statistics. We can use database operations to calculate some of these basic descriptive statistics.
To get a count of the number of GitHub events, we use the count_documents() command:
events.count_documents({})
> 1174157
The count_documents() command gets the number of documents in a collection. Each GitHub event is recorded as a document, and events is what we had named our collection earlier.
The 1,174,157 documents comprise different types of GitHub events. To see the frequency of each type of event occurring in the dataset, we query the database using the aggregate command:
# Frequency of event types
event_types_query = events.aggregate([
# Group by the type attribute and count
{"$group" : {"_id": "$type", "count": {"$sum": 1}}},
# Reformat the data
{"$project": {"_id": 0, "Type": "$_id", "count": "$count"}},
# Sort by the count in descending order
{"$sort": {"count": -1} }
])
df_event_types = pd.DataFrame(event_types_query
The preceding query groups the events by type, runs a count, and sorts the results in descending order of count. Finally, we wrap the output in pd.DataFrame() to convert the results to a DataFrame. This allows us to generate visualizations such as the following.
From the plot, we can see that push events were the most frequent, numbering close to 600,000.
Returning to our goal to predict if a pull request closes within 24 hours, we implement another query to include only pull request events, using the database match operation, and then count the number of such events per pull request URL:
# Frequency of PullRequestEvent actions by URL
action_query = events.aggregate([
# Keep only PullRequestEvent types
{"$match" : {"type": "PullRequestEvent"} },
# Group by HTML URL and count
{"$group": {"_id": "$payload.pull_request.html_url", "count": {"$sum": 1}}},
# Reformat the data
{"$project": {"_id": 0, "url": "$_id", "count": "$count"}},
# Sort by the count in descending order
{"$sort": {"count": -1} }
])
df_action = pd.DataFrame(action_query)
From the result, we can see that a single URL could have multiple pull request events, such as those shown in the following screenshot.
One of the attributes of a pull request event is the state of the pull request after the event. Therefore, we’re interested in the latest event by the end of 24 hours in determining whether the pull request was open or closed in that window of time. We show how to run this query later in this post, but continue now with a discussion of descriptive statistics.
Apart from counts, we can also have the database calculate the mean, maximum, and minimum values for us. In the following query, we do this for potential predictors of a pull request open/close status, specifically the number of stars, forks, and open issues, as well as repository size. We also calculate the time elapsed (in milliseconds) of a pull request event since its creation. For each pull request, there could be multiple pull request events (comments), and this descriptive query spans across all these events:
Before we can begin building our prediction model, we need to select relevant features to include, and also engineer new features. In the following query, we select pull request events from non-empty repositories with more than 50 forks. We select possible predictors including number of forks (forks_count) and number of open issues (open_issues_count), and engineer new predictors by normalizing those counts by the size of the repository (repo.size). Finally, we shortlist the pull request events that fall within our period of evaluation, and record the latest pull request status (open or close), which is the outcome of our predictive model.
df = list(events.aggregate([
# Filter on just PullRequestEvents
{"$match": {
"type": "PullRequestEvent", # focus on pull requests
"payload.pull_request.base.repo.forks_count": {"$gt": 50}, # focus on popular repos
"payload.pull_request.base.repo.size": {"$gt": 0} # exclude empty repos
}},
# Project only features of interest
{"$project": {
"type": 1,
"payload.pull_request.base.repo.size": 1,
"payload.pull_request.base.repo.stargazers_count": 1,
"payload.pull_request.base.repo.has_downloads": 1,
"payload.pull_request.base.repo.has_wiki": 1,
"payload.pull_request.base.repo.has_pages" : 1,
"payload.pull_request.base.repo.forks_count": 1,
"payload.pull_request.base.repo.open_issues_count": 1,
"payload.pull_request.html_url": 1,
"payload.pull_request.created_at": 1,
"payload.pull_request.updated_at": 1,
"payload.pull_request.state": 1,
# calculate no. of open issues normalized by repo size
"issues_per_repo_size": {"$divide": ["$payload.pull_request.base.repo.open_issues_count",
"$payload.pull_request.base.repo.size"]},
# calculate no. of forks normalized by repo size
"forks_per_repo_size": {"$divide": ["$payload.pull_request.base.repo.forks_count",
"$payload.pull_request.base.repo.size"]},
# format datetime variables
"created_time": {"$dateFromString": {"dateString": "$payload.pull_request.created_at"}},
"updated_time": {"$dateFromString": {"dateString": "$payload.pull_request.updated_at"}},
# calculate time elapsed since PR creation
"time_since_created": {"$subtract": [{"$dateFromString": {"dateString": "$payload.pull_request.updated_at"}},
{"$dateFromString": {"dateString": "$payload.pull_request.created_at"}} ]}
}},
# Keep only events within the window (24hrs) since pull requests was created
# Keep only pull requests that were created on or after the start and before the end period
{"$match": {
"time_since_created": {"$lte": prediction_window},
"created_time": {"$gte": date_start, "$lt": date_end}
}},
# Sort by the html_url and then by the updated_time
{"$sort": {
"payload.pull_request.html_url": 1,
"payload.pull_request.updated_time": 1
}},
# keep the information from the first event in each group, plus the state from the last event in each group
# grouping by html_url
{"$group": {
"_id": "$payload.pull_request.html_url",
"repo_size": {"$first": "$payload.pull_request.base.repo.size"},
"stargazers_count": {"$first": "$payload.pull_request.base.repo.stargazers_count"},
"has_downloads": {"$first": "$payload.pull_request.base.repo.has_downloads"},
"has_wiki": {"$first": "$payload.pull_request.base.repo.has_wiki"},
"has_pages" : {"$first": "$payload.pull_request.base.repo.has_pages"},
"forks_count": {"$first": "$payload.pull_request.base.repo.forks_count"},
"open_issues_count": {"$first": "$payload.pull_request.base.repo.open_issues_count"},
"issues_per_repo_size": {"$first": "$issues_per_repo_size"},
"forks_per_repo_size": {"$first": "$forks_per_repo_size"},
"state": {"$last": "$payload.pull_request.state"}
}}
]))
df = pd.DataFrame(df)
Generating predictions
Before building our model, we split our data into two sets for training and testing:
X = df.drop(['state_open'], axis=1)
y = df['state_open']
X_train, X_test, y_train, y_test = train_test_split(X, y,
test_size=0.3,
stratify=y,
random_state=42,
)
For this post, we use 70% of the documents for training the model, and the remaining 30% for testing the model’s predictions against the actual pull request status. We use the XGBoost algorithm to train a binary:logistic model evaluated with area under the curve (AUC) over 20 iterations. The seed is specified to enable reproducibility of results. The other parameters are left as default values. See the following code:
# Format data
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# Specify model parameters
param = {
'objective':'binary:logistic',
'eval_metric':'auc',
'seed': 42,
}
# Train model
num_round = 20
bst = xgb.train(param, dtrain, num_round)
Next, we use the trained model to generate predictions for the test dataset and to calculate and plot the AUC:
A predictor has different definitions of importance. For this post, we use weight, which is the number of times a predictor appears in the XGBoost trees. The top predictor is the number of open issues normalized by the repository size. Using a box plot, we compare the spread of values for this predictor between closed and still-open pull requests.
After we examine the results and are satisfied with the model performance, we can write predictions back into Amazon DocumentDB.
Storing prediction results
The final step is to store the model predictions back into Amazon DocumentDB. First, we create a new Amazon DocumentDB collection to hold our results, called predictions:
predictions = db['predictions']
Then we change the generated predictions to type float, to be accepted by Amazon DocumentDB:
preds = preds.astype(float)
We need to associate these predictions with their respective pull request events. Therefore, we use the pull request URL as each document’s ID. We match each prediction to its respective pull request URL and consolidate them in a list:
urls = y_test.index
def gen_preds(url, pred):
"""
Generate document with prediction of whether pull request will close in 24 hours.
ID is pull request URL.
"""
doc = {
"_id": url,
"close_24hr_prediction": pred}
return doc
documents = [gen_preds(url, pred) for url, pred in zip(urls, preds)]
Finally, we use the insert_many command to write the documents to Amazon DocumentDB:
predictions.insert_many(documents, ordered=False)
We can query a sample of five documents in the predictions collections to verify that the results have been inserted correctly:
pd.DataFrame(predictions.find({}).limit(5))
The following screenshot shows our results.
Cleaning up resources
To save cost, delete the CloudFormation stack you created. This removes all the resources you provisioned using the CloudFormation template, including the VPC, Amazon DocumentDB cluster, and SageMaker instance. For instructions, see Deleting a stack on the AWS CloudFormation console.
Summary
We used SageMaker to analyze data stored in Amazon DocumentDB, conduct descriptive analysis, and build a simple ML model to make predictions, before writing prediction results back into the database.
Annalyn Ng is a Senior Data Scientist with AWS Professional Services, where she develops and deploys machine learning solutions for customers. Annalyn graduated with an MPhil from the University of Cambridge, and blogs about machine learning at algobeans.com. Her book, ’Numsense! Data Science for the Layman’, has been translated into over five languages and is used in top universities as reference text.
Brian Hess is a Senior Solution Architect Specialist for Amazon DocumentDB (with MongoDB compatibility) at AWS. He has been in the data and analytics space for over 20 years and has extensive experience with relational and NoSQL databases.
Leave a Reply