Deploy variational autoencoders for anomaly detection with TensorFlow Serving on Amazon SageMaker

Anomaly detection is the process of identifying items, events, or occurrences that have different characteristics from the majority of the data. It has many applications in various fields, like fraud detection for credit cards, insurance, or healthcare; network intrusion detection for cybersecurity; KPI metrics monitoring for critical systems; and predictive maintenance for in-service equipment. There are four main categories of techniques to detect anomalies: Classification, nearest neighbor, clustering, and statistical. In this post, we focus on a deep learning statistical anomaly detection approach using variational autoencoders.

Deep learning is a sub-field of machine learning (ML) and has been rapidly growing in the past few years. Due to its flexible structure and ability to learn non-linear relationships between data, deep learning models have been proven to be very powerful in solving different problems. An autoencoder is a type of neural network that can be used to learn hidden encoding of input data, which can be used for detecting anomalies. A variational autoencoder can be defined as being an autoencoder whose training is regularized to avoid overfitting and ensure that the latent space has good properties through a probabilistic encoder that enables the generative process.

To enable real-time predictions, you must deploy a trained ML model to an endpoint. Sometimes you may want to deploy more than one model at the same time. A standard practice is to deploy each model to a separate endpoint. Amazon SageMaker uses the TensorFlow Serving REST API to allow you to deploy multiple models to a single multi-model endpoint. Multi-model endpoints provide a scalable and cost-effective solution for deploying a large number of models. They use a shared TFS container that is enabled to host multiple models. This reduces hosting costs by improving endpoint utilization compared with using single-model endpoints. It also reduces deployment overhead because SageMaker manages loading models in memory and scaling them based on their traffic patterns.

In this post, we discuss the implementation of a variational autoencoder on SageMaker to solve an anomaly detection task. We also include examples of how to deploy multiple trained models to a single TensorFlow Serving multi-model endpoint. You can follow the code in the post to run the pipeline from beginning to end.

Dataset

The MNIST dataset is a large database of handwritten digits. It contains 60,000 training images and 10,000 testing images. They are small, 28×28 pixel, grayscale images between 0–9.

Variational autoencoder

An autoencoder is a type of artificial neural network used to learn efficient data coding in an unsupervised manner. An autoencoder has two connected networks:

  • Encoder – Takes an input and converts it into a compressed knowledge representation in the bottleneck layer
  • Decoder – Converts the compressed representation back to the original input

Standard autoencoders learn to generate compact representations of the input. One problem with autoencoders is overfitting, in which the data is reconstructed without any reconstruction loss, which leads to some points of the latent space giving meaningless content after they’re decoded. Another problem is that the latent space may not be continuous, which might cause the decoder to generate an unrealistic output because it doesn’t know how to deal with the region of latent space it hasn’t been seen before.

A variational autoencoder (VAE) provides a probabilistic manner for describing an observation in latent space. Compared with deterministic mappings used by an autoencoder for predictions, a VAE’s bottleneck layer provides a probabilistic Gaussian distribution of hidden vectors by predicting the mean and standard deviation of the distribution. A VAE’s latent spaces are continuous, allowing random sampling and interpolation. VAEs account for the variability of the latent space, which makes the model robust and able to achieve higher performance when compared with an autoencoder-based anomaly detection.

The following diagram illustrates this workflow.

Construct the problem

In this post, we use the MNIST dataset to construct an anomaly detection problem. For an anomaly detection problem, we have normal data as well as anomalies—the normal data is the majority and anomalies the minority. We train the VAE model on normal data, then test the model on anomalies to observe the reconstruction error. This technique is called semi-supervised because the model has only seen normal data during training. In real-world scenarios, we don’t necessarily have labeled anomalies; under such circumstances the semi-supervised method is especially useful. We can train the model to learn the pattern of normal data, so when anomalies happened, the model can identify the data that doesn’t fall into the pattern.

For our use case, we choose 1 and 4 as normal numbers and train the VAE model on the images from MNIST that contain 1 and 4. We choose 5 as the anomaly number and test the model on images with 5 in them to observe the reconstruction error.

Prepare the data

First, import the required packages and set up the SageMaker role and session. We import two files from the src folder: the config file defines the parameters to be used in the scripts, and the model_def contains the functions defining the VAE model. See the following code:

import boto3
from   IPython                   import display
import matplotlib.pyplot         as plt
import numpy                     as np
import pandas                    as pd
import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.tensorflow.serving import Model, Predictor
from sagemaker.tensorflow import TensorFlowModel, TensorFlowPredictor
from   sklearn.decomposition     import PCA
import tensorflow                as tf
from   tensorflow                import keras
from   tensorflow.keras.datasets import mnist
import tensorflow.keras.backend  as K
import time
from scipy.stats import multivariate_normal
from scipy       import stats
from statistics  import mean
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
import os
import sys
PATH = os.path.abspath('..')
if PATH not in sys.path:
    sys.path.append(PATH)
import src.config as config
from src import model_def
role   = sagemaker.get_execution_role()
region = boto3.Session().region_name
sm     = boto3.Session(region_name=region).client('sagemaker')

Next, let’s load the MNIST dataset from TensorFlow and reshape the data. We use train_x, train_y, test_x, and test_y, whose shapes are (60000, 28, 28, 1), (10000, 28, 28, 1), (60000, 10), and (10000, 10), respectively. The training dataset has 60,000 images and the testing dataset has 10,000 images. Each image is 28×28 pixels in greyscale. The dataset has 10 numbers from 0–9. See the following code:

# Load MNIST Data
(train_x, train_y), (test_x, test_y) = mnist.load_data()
train_x = train_x.reshape((-1, 28, 28, 1))
test_x = test_x.reshape((-1, 28, 28, 1))

Then we save the data locally for future usage. After the data is saved locally, we upload them to the default Amazon Simple Storage Service (Amazon S3) bucket. See the following code:

!mkdir -p ../data/train/
!mkdir -p ../data/test/

np.save('../data/train/train_x', train_x)
np.save('../data/test/test_x', test_x)
np.save('../data/train/train_y', train_y)
np.save('../data/test/test_y', test_y)

s3_prefix = 'VAE'
train_s3_prefix = f'{s3_prefix}/train'
test_s3_prefix = f'{s3_prefix}/test'

train_s3 = sagemaker.Session().upload_data(path = '../data/train', key_prefix = train_s3_prefix)
test_s3 = sagemaker.Session().upload_data(path = '../data/test', key_prefix = test_s3_prefix)

The MNIST dataset contains images with numbers 0-9. We selected 1 and 4 as normal numbers and 5 as the anomaly number. The next step is to separate the data accordingly into the normal and anomaly datasets for training and testing:

# Choose a number to be anomaly number and separate from the rest
anomalyNumber = 5
validNumber = [1,4]
allNumbers = validNumber + [anomalyNumber]
train_validIdxs = np.where(np.isin(train_y, validNumber))[0] 
train_anomalyIdxs = np.where(train_y==anomalyNumber)[0]

test_validIdxs = np.where(np.isin(test_y, validNumber))[0]
test_anomalyIdxs = np.where(test_y==anomalyNumber)[0]

We now have an index of 12,585 normal images for training, 2,117 normal images for testing, and 6,313 anomaly images.

The next step is to prepare the data for training the model. For input data x, we convert the pixels to float and scale them to be between 0 and 1. For output data y, we one-hot encode the numbers into vectors of 0 and 1, with 1 representing the number. Then we use the index from the previous step to separate anomalies from normal data. See the following code:

# Data preparation
# Convert from integers to float32
train_x = train_x.astype('float32')
test_x = test_x.astype('float32')

# Scale input to be between 0 and 1
train_x = train_x / 255
test_x = test_x / 255

# One hot encoding output variables
train_y_one_hot = tf.keras.utils.to_categorical(train_y)
test_y_one_hot = tf.keras.utils.to_categorical(test_y)

# Prepare normal data and anomalies
train_x_normal = train_x[train_validIdxs]
train_y_normal = train_y[train_validIdxs]
test_x_normal = test_x[test_validIdxs]
test_y_normal = test_y[test_validIdxs]

train_x_anomaly = train_x[train_anomalyIdxs]
train_y_anomaly = train_y[train_anomalyIdxs]
test_x_anomaly = test_x[test_anomalyIdxs]
test_y_anomaly = test_y[test_anomalyIdxs]
x_anomaly = np.concatenate([train_x_anomaly, test_x_anomaly])
y_anomaly = np.concatenate([train_y_anomaly, test_y_anomaly])

print(train_x_normal.shape, train_y_normal.shape, test_x_normal.shape, test_y_normal.shape,
      x_anomaly.shape, y_anomaly.shape)

Visualize the data

We plot the first 25 images of normal data and anomalies for double-checking:

def generate_original_images(x):    
    plt.figure(figsize=(5,5))
    for i in range(25):
        plt.subplot(5,5,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(x[i], cmap=plt.cm.binary)
    plt.show()
generate_original_images(train_x_normal[:25])

The following image of the normal images shows 1 and 4.

We plot the anomalies with the following code:

generate_original_images(x_anomaly[:25])

The image of the anomalies shows 5.

Train the model on SageMaker

SageMaker Script Mode allows you to train the model with the SageMaker pre-built containers for TensorFlow, PyTorch, and Apache MXNet and other popular frameworks on machines managed by SageMaker. For our use case, we use the TensorFlow 2.0 container provided by SageMaker. SageMaker training requires the data in Amazon S3 or an Amazon Elastic File System (Amazon EFS) or Amazon FSx for Lustre file system. For this post, we keep our data in Amazon S3. The training script (train.py) contains details of the training steps.

First, we set up a TensorFlow estimator object (estimator) for SageMaker hosted training. The key parameters for the estimator include the following:

  • Hyperparameters – The hyperparameters for training the model
  • entry_point – The path to the local Python source file, which should be run as the entry point to training
  • instance_type – The type of instances used for training
  • framework_version – The TensorFlow version you want to use for running your model training code
  • py_version – The Python version you want to use for running your model training code

The estimator.fit sends train.py to be run on the TensorFlow container running on SageMaker hosted training instances. See the following code:

model_dir = '/opt/ml/model'
hyperparameters = {'epochs': config.EPOCHS,
                   'batch_size': config.BATCH_SIZE,
                   'learning_rate': config.LEARNING_RATE}

estimator = TensorFlow(
                       entry_point          = config.TRAIN_ENTRY_POINT,
                       source_dir           = config.TRAIN_SOURCE_DIR,
                       model_dir            = model_dir,
                       instance_type        = config.TRAIN_INSTANCE_TYPE,
                       instance_count       = config.TRAIN_INSTANCE_COUNT,
                       hyperparameters      = hyperparameters,
                       role                 = role,
                       base_job_name        = config.TRAIN_BASE_JOB_NAME,
                       framework_version    = config.TRAIN_FRAMEWORK_VERSION,
                       py_version           = config.TRAIN_PY_VERSION,
                       )
                      
inputs = {'train': train_s3,
          'test': test_s3}
          
estimator.fit(inputs)

Download the model artifacts

After the model is trained, the model artifacts are saved in Amazon S3. We download the model artifacts from Amazon S3 to a local folder and extract them:

model_artifacts_s3 = estimator.model_data

version = 'v1'
os.makedirs(f'../model/{version}', exist_ok=True)
!aws s3 cp {model_artifacts_s3} ../model/{version}/model.tar.gz
!tar -xzvf ../model/{version}/model.tar.gz -C ../model/{version}

Deploy trained models to one endpoint

Our VAE has an encoder and a decoder. We use the encoder to get the condensed vector representations from the hidden layer, and the decoder to recreate the input. The encoder gives us the hidden layer distribution, from which we randomly sample condensed vector representations. These vector representations are passed through the decoder to generate the output, which is used to calculate the reconstruction error. In this section, we demonstrate how to deploy the encoder, decoder, as well as the whole VAE model to one single endpoint.

To deploy multiple models to a single TensorFlow Serving endpoint, the model artifacts need to be constructed in the following format:

└── multi
├── model1
│   └──
│       ├── saved_model.pb
│       └── variables
│           └── …
└── model2
└──
├── saved_model.pb
└── variables
└── …

Each folder in the model artifact contains a saved model and the related variables. They are deployed separately to a single endpoint.

Following the preceding format, we construct our output model artifacts in train.py, which contains five models:

  • Variational autoencoders (model/vae)
  • The model generating the mean of the hidden distributions (model/encoder_mean)
  • The model generating the log variance of the hidden distributions (model/encoder_lgvar)
  • The model generating the random samples from the hidden layer distribution defined by encoder_mean and encoder_lgvar (model/encoder_sampler)
  • The decoder (model/decoder)

The model/encoder_mean, model/encoder_lgvar, and model/encoder_sampler models combined serve as an encoder used to generate hidden vectors.

The following code shows our model structure:

└── model
├── vae
│   └── 1
│       ├── saved_model.pb
│       └── variables
│           └── …
├── encoder_mean
│   └── 2
│       ├── saved_model.pb
│       └── variables
│           └── …
├── encoder_lgvar
│   └── 3
│       ├── saved_model.pb
│       └── variables
│           └── …
├── encoder_sampler
│   └── 4
│       ├── saved_model.pb
│       └── variables
│           └── …
├── decoder
│   └── 5
│       ├── saved_model.pb
│       └── variables
│           └── …
├──test_loss.npy
└──train_loss.npy

Next, we use TensorFlow Serving to deploy all the models in the model artifact to a single endpoint. We provide the S3 path, SageMaker execution role, TensorFlow framework version, and the default model name to a TensorFlow model object. Then we deploy the model by calling model.deploy, during which we can set the hosting instance count as well as the instance type.

When model.deploy is called, on each instance, three steps occur:

  1. Start a Docker container optimized for TensorFlow Serving.
  2. Start a TensorFlow Serving process configured to run your model.
  3. Start an HTTP server that provides access to TensorFlow Server through the SageMaker InvokeEndpoint

See the following code:

env = {
    'SAGEMAKER_TFS_DEFAULT_MODEL_NAME': config.SAGEMAKER_TFS_DEFAULT_MODEL_NAME
}

model = TensorFlowModel(model_data = model_artifacts_s3, 
              role = role, 
              framework_version = config.TRAIN_FRAMEWORK_VERSION,
              env = env)

predictor = model.deploy(initial_instance_count = config.INFERENCE_INITIAL_INSTANCE_COUNT,
                         instance_type = config.INFERENCE_INSTANCE_TYPE)

Now that the endpoint is created, we can get the predictor for each model by creating TensorFlow predictors. When creating the predictors, we provide the endpoint as well as the name of the model, which is the name of the folder that contains the model and its variables. The predictor object returned by the deploy function is ready to use to make predictions using the default model (vae in this example). See the following code:

# get the endpoint name from the default predictor
endpoint = predictor.endpoint_name

# get a predictor for 'encoder_sampler'
encoder_mean_predictor = TensorFlowPredictor(endpoint, model_name = 'encoder_mean')
encoder_lgvar_predictor = TensorFlowPredictor(endpoint, model_name = 'encoder_lgvar')
encoder_sampler_predictor = TensorFlowPredictor(endpoint, model_name = 'encoder_sampler')
decoder_predictor = TensorFlowPredictor(endpoint, model_name = 'decoder')

Visualize the predictions

With the trained model, we can plot the prediction results for both normal and anomaly data. See the following code:

def generate_prediction_images(x):
    z_mean = encoder_mean_predictor.predict(x)['predictions']
    z_lgvar = encoder_lgvar_predictor.predict(x)['predictions']
    x_pred = predictor.predict(x)['predictions']
        
    plt.figure(figsize=(5,5))
    for i in range(25):
        plt.subplot(5,5,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(x_pred[i], cmap=plt.cm.binary)
    plt.show()

Generate input and prediction images for normal data with the following code:

generate_original_images(train_x_normal[:25])
generate_prediction_images(train_x_normal[:25])

The following image shows our inputs.

The following image shows the model predictions.

Generate input and prediction images for anomaly data with the following code:

generate_original_images(x_anomaly[:25])
generate_prediction_images(x_anomaly[:25])

The following image shows our inputs.

The following image shows the model predictions.

The results show that the model can recreate normal data very well. For anomaly data, the model reproduced certain features but not completely.

PCA of bottleneck layer vectors

Principal Component Analysis (PCA) is a dimension reduction method used to reduce the dimensionality of large datasets by transforming a large set of variables into a smaller one that still contains most of the information in the large set. The hidden (bottleneck) layer of the model provides the latent representations of the input data. These vectors contain compressed knowledge of the inputs. In the following code, we use PCA to find the principal components of the hidden vectors and visualize them to observe the distribution of the data:


train_x = np.concatenate((train_x_normal[:1400], x_anomaly[:700]), axis=0)
train_y = np.concatenate((train_y_normal[:1400], y_anomaly[:700]))

# PCA on the latent variables
train_x_hidden = encoder_sampler_predictor.predict(train_x)['predictions']
pca_3d = PCA(n_components = 3)
PCA_hidden_3d = pca_3d.fit_transform(train_x_hidden)
pca_2d = PCA(n_components = 2)
PCA_hidden_2d = pca_2d.fit_transform(train_x_hidden)

# Plot the principal components
fig = plt.figure(figsize=(10,10))

ax0 = fig.add_subplot(211, projection='3d')
p0  = ax0.scatter(PCA_hidden_3d[:, 0], PCA_hidden_3d[:, 1],
                  PCA_hidden_3d[:, 2], c=train_y, cmap='tab10', s=1)
plt.legend(handles=p0.legend_elements()[0], labels=allNumbers)
plt.show

# colors = ['yellow', 'gold', 'blue']
ax1 = fig.add_subplot(212)
p1 = ax1.scatter(PCA_hidden_2d[:,0], PCA_hidden_2d[:, 1], c =train_y, 
              cmap='tab10') #matplotlib.colors.ListedColormap(colors))
plt.legend(handles=p1.legend_elements()[0], labels=allNumbers)
plt.show()

The result shows that each number’s vectors cluster together. There is a little overlap between 4 and 5, which explains why some of the predictions of number 5 on the trained model preserve some features from 4.

Detect anomalies with reconstruction error

Reconstruction error is calculated using the reduced mean of the binary cross entropy. It tells us the difference between input images and reconstructed images. If the reconstruction error is high, it means there is a large difference between the input and the reconstructed output. Let’s calculate the reconstruction error for the train and test (normal and anomalies) datasets. In the following code: we take 2,000 data points from each dataset for a demonstration:

def compute_reconstruction_error(predictor, x):
    x_pred  = predictor.predict(x)['predictions']
    cross_ent = K.binary_crossentropy(x, x_pred)
    recon = tf.reduce_sum(cross_ent, axis=[1,2,3]) #consolidate at each instance
    return recon

train_normal_recon_loss   = compute_reconstruction_error(predictor, train_x_normal[:2000])
test_normal_recon_loss    = compute_reconstruction_error(predictor, test_x_normal[:2000])
anomaly_recon_loss = compute_reconstruction_error(predictor, x_anomaly[:2000])

Next, we plot the reconstruction error for train normal and anomaly data:

plt.plot(train_normal_recon_loss[:50], label = 'train normal')
plt.plot(test_normal_recon_loss[:50], label = 'test normal')
plt.plot(anomaly_recon_loss[:50], label = 'anomalies')
plt.title('Reconstruction Error')
plt.legend()
plt.show()

From the graph, we have two observations:

  1. the reconstruction error for normal train and test is almost the same
  2. the reconstruction error for normal data is lower than the error for anomaly data.

Further statistics analysis shows that the average reconstruction loss for anomalies (225.75) is 171.39 higher than that of the normal data (54.36):

print(stats.describe(train_normal_recon_loss))
print(stats.describe(anomaly_recon_loss))

Evaluate the model performance

To evaluate the ability of the model to differentiate between normal data and anomalies, we set a threshold: when the reconstruction error is higher, we assign it as an anomaly, and when it’s lower, we assign it as normal data. To find the threshold, let’s look at statistical properties of the reconstruction error:

print(f'1, 99% Percentile of normal reconstruction loss is {np.percentile(train_normal_recon_loss, 1)}, {np.percentile(train_normal_recon_loss, 99)}')
print(f'4, 99% Percentile of abnormal reconstruction loss is {np.percentile(anomaly_recon_loss, 4)}, {np.percentile(anomaly_recon_loss, 99)}')

For normal data, 99% of the data has a reconstruction error lower than 120. For anomalies, 4% of the data has a reconstruction error lower than 126.94, which means 96% of the data has a reconstruction error higher than 126.94.

In this case, the 99 percentile of normal data reconstruction errors is a good threshold to use because it can separate the anomalies from normal data pretty well:

threshold = np.ceil(np.percentile(train_normal_recon_loss, 99))

For ground truth data, we label the normal numbers (1 and 4) as True and anomalies (5) as False. For prediction labels, when reconstruction error is higher than the threshold, we mark it as 1, and 0 otherwise. See the following code:

# 1 - anomaly, 0 - normal
test_y_labels = np.concatenate([test_y_normal[:2000], y_anomaly[:2000]], axis=0)
test_y_labels[np.where(np.isin(test_y_labels, validNumber))[0]] = [False]*len(np.where(np.isin(test_y_labels, validNumber))[0])
test_y_labels[np.where(test_y_labels==anomalyNumber)[0]] = [True]*len(np.where(test_y_labels==anomalyNumber))
# print(test_y_labels.shape, test_recon_loss.shape)

test_recon_loss = np.concatenate([test_normal_recon_loss.numpy(), anomaly_recon_loss.numpy()], axis=0)
test_y_pred = np.array([1 if x>threshold else 0 for x in test_recon_loss])

The result shows the model can produce 98.12% accuracy, 98.49% precision, 97.75% recall, 98.12% F1 score, 96.25% Cohen Kappa score, and 98.13% ROC AUC:

# accuracy: (tp + tn) / (p + n)
accuracy = accuracy_score(test_y_labels, test_y_pred)
print('Accuracy: %f' % accuracy, 'n')
# precision tp / (tp + fp)
precision = precision_score(test_y_labels, test_y_pred)
print('Precision: %f' % precision, 'n')
# recall: tp / (tp + fn)
recall = recall_score(test_y_labels, test_y_pred)
print('Recall: %f' % recall, 'n')
# f1: 2 tp / (2 tp + fp + fn)
f1 = f1_score(test_y_labels, test_y_pred)
print('F1 score: %f' % f1, 'n')
 
# kappa
kappa = cohen_kappa_score(test_y_labels, test_y_pred)
print('Cohens kappa: %f' % kappa, 'n')
# ROC AUC
auc = roc_auc_score(test_y_labels, test_y_pred)
print('ROC AUC: %f' % auc, 'n')
# confusion matrix
matrix = confusion_matrix(test_y_labels, test_y_pred)
print('Confusion Matrix:', 'n', matrix, 'n')

Clean up

Now that we have finished the prediction and evaluation, we need to clean up to prevent unnecessary cost. We delete the endpoint with the following code:

# delete the SageMaker endpoint
predictor.delete_endpoint()

Summary

Variational autoencoders are a powerful method for anomaly detection. This post provides an example application of a VAE on SageMaker. SageMaker provides the capability to train ML models quickly, as well as host the trained models on a REST API. When it comes to hosting more than one model, TensorFlow Serving on SageMaker is a great choice to host multiple models on one endpoint. This post is a peek into the usage of VAEs and SageMaker, we look forward to seeing you use this knowledge and apply to your use cases! To learn more about how to use TensorFlow with Amazon SageMaker, refer to the documentation.


About the Author

Yi Xiang is a Data Scientist at the Amazon Machine Learning Solutions Lab, where she helps AWS customers across different industries accelerate their AI and cloud adoption.

View Original Source (aws.amazon.com) Here.

Leave a Reply

Your email address will not be published. Required fields are marked *

Shared by: AWS Machine Learning

Tags: