Define and run Machine Learning pipelines on Step Functions using Python, Workflow Studio, or States Language

You can use various tools to define and run machine learning (ML) pipelines or DAGs (Directed Acyclic Graphs). Some popular options include AWS Step Functions, Apache Airflow, KubeFlow Pipelines (KFP), TensorFlow Extended (TFX), Argo, Luigi, and Amazon SageMaker Pipelines. All these tools help you compose pipelines in various languages (JSON, YAML, Python, and more), followed by viewing and running them using a workflow orchestrator. A deep comparison of each of these options is out of scope for this post, and involves appropriately selecting and benchmarking tools for your specific use case.

In this post, we discuss how to author end-to-end ML pipelines in Step Functions using three different methods:

Solution overview

In this post, we create a simple workflow that involves a training step, creating a model, configuring an endpoint, and deploying the model

You can also create more complex workflows involving other steps such as Amazon SageMaker Processing, or automatic model tuning (HPO). You can also use Step Functions to integrate with other AWS services such as AWS Lambda, Amazon DynamoDB, AWS Glue, Amazon EMR, Amazon Athena, Amazon Elastic Kubernetes Service (Amazon EKS), and AWS Fargate. For more information on supported services, see Supported AWS Service Integrations for Step Functions. We provide guidance on other similar pipeline tools later in this post.

In this post, we use the MNIST dataset, which is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28×28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images. There are 10 classes (one for each of the 10 digits). The code used here closely follows a similar use case where the task is to classify each input image as one of the 10 digits (0–9).

The main training code uses a class from the standard PyTorch example for the model definition:

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

The main training function works for both single instance and distributed training, and does so by checking ARGs:

is_distributed = len(args.hosts) > 1 and args.backend is not None
if is_distributed:
        # Initialize the distributed environment.
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        os.environ['RANK'] = str(host_rank)
        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
        logger.info('Initialized the distributed environment: '{}' backend on {} nodes. '.format(
            args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(
            dist.get_rank(), args.num_gpus))            

The number of hosts is conveniently stored in an Amazon SageMaker environment variable, which can also be passed in as an argument:

parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))

Next, we load the datasets from the default data directory:

(os.environ['SM_MODEL_DIR']):

train_loader = _get_train_data_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)
test_loader = _get_test_data_loader(args.test_batch_size, args.data_dir, **kwargs)

We then enter the main training loop:

for epoch in range(1, args.epochs + 1):
model.train()

Finally, we save the model:

def save_model(model, model_dir):
    logger.info("Saving the model.")
    path = os.path.join(model_dir, 'model.pth')
    # recommended way from http://pytorch.org/docs/master/notes/serialization.html
    torch.save(model.cpu().state_dict(), path)

This code is stored in a file called mnist.py and used in later steps (see the full code on GitHub). The following are two important takeaways in connection to the pipeline:

  • You can use the data input location on Amazon Simple Storage Service (Amazon S3) as a parameter for the training step in a pipeline. This data is delivered to the training container, the local path of which is stored in an environment variable (for example, SAGEMAKER_CHANNEL_TRAINING).
  • The model is shown here as being saved locally in model_dir; the local path of the model directory (/opt/ml/model) is stored in an environment variable (SM_MODEL_DIR). At the end of the SageMaker training job, the model is copied to an Amazon S3 location so that model and endpoint related pipeline steps can access the model.

Now let’s look at our three methods to author end-to-end ML pipelines in Step Functions.

Use the Step Functions Data Science SDK

The Step Functions Data Science SDK is an open-source library that lets you create workflows entirely in Python. Installing this SDK is as simple as entering the following code:

pip install stepfunctions

The SDK allows you to do the following:

  • Create steps that accomplish tasks
  • Chain those steps together into workflows
  • Branch out to run steps in parallel or based on conditions
  • Include retry, succeed, or fail steps
  • Review a graphical representation and definition for your workflow
  • Create a workflow in Step Functions
  • Start and review runs in Step Functions

Although we don’t use many of these functions, the Step Functions Data Science SDK can include the following:

To get started, we first create a PyTorch estimator with the mnist.py file. We configure the estimator with the training script, an AWS Identity and Access Management (IAM) role, the number of training instances, the training instance type, and hyperparameters:

from sagemaker.pytorch import PyTorch

estimator = PyTorch(entry_point='mnist.py',
                    role=sagemaker_execution_role,
                    framework_version='1.2.0',
                    py_version='py3',
                    train_instance_count=2,
                    train_instance_type='ml.c4.xlarge',
                    hyperparameters={
                        'epochs': 6
                    })

The Data Science SDK provides two ways of defining a pipeline. Firstly, you can use individual steps. For example, you can define a training step with the following code:

training_step = steps.TrainingStep("Train Step",estimator=estimator,...)

Then you create a model step:

model_step = steps.ModelStep("Savemodel", model=training_step.get_expected_model(),...):

Finally, you chain all the steps using the following code:

workflow_definition = steps.Chain([training_step, model_step, transform_step, endpoint_config_step, endpoint_step])

For more information, see Build a machine learning workflow using Step Functions and SageMaker.

Alternatively, you can use a standard training pipeline class that is built in to the SDK:

pipeline = TrainingPipeline(
                estimator=estimator,
                role=workflow_execution_role,
                inputs=inputs,
                s3_bucket=bucket
            )

The workflow execution role allows you to create and run workflows in Step Functions. The following code creates the desired workflow and lets you render the same:

pipeline.render_graph()

Finally, you can create and run the workflow using pipeline.create() and pipeline.execute().

An example output from the execute() statement looks as follows, and provides you with a link to Step Functions where you can view and monitor your execution:

arn:aws:states:us-east-1:account-number:execution:training-pipeline-generated-date:training-pipeline-generated-time.

You can also render the current state of your workflow as it runs from a notebook using the following code:

execution.render_progress()

Use Step Functions Workflow Studio

To use Step Functions Workflow Studio, complete the following steps:

  1. On the Step Functions console, choose Create state machine.
  2. Select Design your workflow visually.
  3. Choose Next.
  4. Enter and filter your SageMaker steps, then drag and drop them to the training step.
  5. In a similar fashion, drag and drop the following states in order:
    1. Create Model
    2. Create Endpoint Config
    3. Create Endpoint

Your workflow should now look like the following diagram.

Now, let’s configure each of these steps.

  1. Choose SageMaker CreateTrainingJob and edit the API parameters in the Form box.
  2. Use the following JSON object if you are following this example:
{
  "AlgorithmSpecification": {
    "TrainingImage": "763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.2.0-cpu-py3",
    "TrainingInputMode": "File"
  },
  "HyperParameters": {
    "epochs": "6",
    "sagemaker_submit_directory.$": "$$.Execution.Input.sourcedir",
    "sagemaker_program.$": "$$.Execution.Input.trainfile"
  },
  "InputDataConfig": [
    {
      "ChannelName": "training",
      "DataSource": {
        "S3DataSource": {
          "S3DataType": "S3Prefix",
          "S3Uri": "s3://sagemaker-us-east-1-497456752804/sagemaker/DEMO-pytorch-mnist",
          "S3DataDistributionType": "FullyReplicated"
        }
      }
    }
  ],
  "StoppingCondition": {
    "MaxRuntimeInSeconds": 1000
  },
  "ResourceConfig": {
    "InstanceCount": 2,
    "InstanceType": "ml.c4.xlarge",
    "VolumeSizeInGB": 30
  },
  "OutputDataConfig": {
    "S3OutputPath": "s3://sagemaker-us-east-1-497456752804/stepfunctions-workflow-training-job-v1/models"
  },
  "RoleArn": "arn:aws:iam::497456752804:role/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK",
  "TrainingJobName.$": "States.Format('trainingjob-{}',$$.Execution.Name)"
}
  1. Select Wait for task to complete.
  2. Edit the API parameters for CreateModel:
{
  "ExecutionRoleArn.$": "$.RoleArn",
  "ModelName.$": "States.Format('model-{}',$$.Execution.Name)",
  "PrimaryContainer": {
    "Image.$": "$.AlgorithmSpecification.TrainingImage",
    "Environment": {
      "SAGEMAKER_PROGRAM.$": "$$.Execution.Input.trainfile",
      "SAGEMAKER_SUBMIT_DIRECTORY.$": "$$.Execution.Input.sourcedir"
    },
    "ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
  }
}
  1. Edit the API parameters for CreateEndpointConfig:
{
  "EndpointConfigName.$": "States.Format('config-{}',$$.Execution.Name)",
  "ProductionVariants": [
    {
      "InitialInstanceCount": 1,
      "InitialVariantWeight": 1,
      "InstanceType": "ml.m4.xlarge",
      "ModelName.$": "States.Format('model-{}',$$.Execution.Name)",
      "VariantName": "AllTraffic"
    }
  ]
}
  1. Edit the API parameters for CreateEndpoint:
{
  "EndpointConfigName.$": "States.Format('config-{}',$$.Execution.Name)",
  "EndpointName.$": "States.Format('endpoint-{}',$$.Execution.Name)"
}
  1. Choose Next, review the generated code, and choose Next.

Step Functions can look at the resources you use and create a role. However, you may see the following message:

“Step Functions cannot generate an IAM policy if the RoleArn for SageMaker is from a Path. Hardcode the SageMaker RoleArn in your state machine definition, or choose an existing role with the proper permissions for Step Functions to call SageMaker.”

We use a role that we created in the Data Science SDK section instead.

  1. Select Use existing role and use the role StepFunctionsWorkflowExecutionRole.
  2. Choose Create state machine.
  3. When you receive the message that the machine was successfully created, run it with the following input:
{
    "trainfile":"mnist.py",
    "sourcedir":"s3://path/to/sourcedir.tar.gz"
}

Monitor and wait for the run to finish.

Use the Amazon States Language

Both the methods we just discussed are great ways to quickly prototype a state machine on Step Functions. When you need to edit the Step Functions definition directly, you can use the States language. See the following code:

{
  "Comment": "This is your state machine",
  "StartAt": "SageMaker CreateTrainingJob",
  "States": {
    "SageMaker CreateTrainingJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync",
      "Parameters": {
        "AlgorithmSpecification": {
          "TrainingImage": "763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.2.0-cpu-py3",
          "TrainingInputMode": "File"
        },
        "HyperParameters": {
          "epochs": "6",
          "sagemaker_submit_directory.$": "$$.Execution.Input.sourcedir",
          "sagemaker_program.$": "$$.Execution.Input.trainfile"
        },
        "InputDataConfig": [
          {
            "ChannelName": "training",
            "DataSource": {
              "S3DataSource": {
                "S3DataType": "S3Prefix",
                "S3Uri": "s3://sagemaker-us-east-1-497456752804/sagemaker/DEMO-pytorch-mnist",
                "S3DataDistributionType": "FullyReplicated"
              }
            }
          }
        ],
        "StoppingCondition": {
          "MaxRuntimeInSeconds": 1000
        },
        "ResourceConfig": {
          "InstanceCount": 2,
          "InstanceType": "ml.c4.xlarge",
          "VolumeSizeInGB": 30
        },
        "OutputDataConfig": {
          "S3OutputPath": "s3://sagemaker-us-east-1-497456752804/stepfunctions-workflow-training-job-v1/models"
        },
        "RoleArn": "arn:aws:iam::497456752804:role/telecomstack-SagemakerExecutionRole-AHSGUPY5EQIK",
        "TrainingJobName.$": "States.Format('trainingjob-{}',$$.Execution.Name)"
      },
      "Next": "SageMaker CreateModel"
    },
    "SageMaker CreateModel": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createModel",
      "Parameters": {
        "ExecutionRoleArn.$": "$.RoleArn",
        "ModelName.$": "States.Format('model-{}',$$.Execution.Name)",
        "PrimaryContainer": {
          "Image.$": "$.AlgorithmSpecification.TrainingImage",
          "Environment": {
            "SAGEMAKER_PROGRAM.$": "$$.Execution.Input.trainfile",
            "SAGEMAKER_SUBMIT_DIRECTORY.$": "$$.Execution.Input.sourcedir"
          },
          "ModelDataUrl.$": "$.ModelArtifacts.S3ModelArtifacts"
        }
      },
      "Next": "SageMaker CreateEndpointConfig"
    },
    "SageMaker CreateEndpointConfig": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createEndpointConfig",
      "Parameters": {
        "EndpointConfigName.$": "States.Format('config-{}',$$.Execution.Name)",
        "ProductionVariants": [
          {
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 1,
            "InstanceType": "ml.m4.xlarge",
            "ModelName.$": "States.Format('model-{}',$$.Execution.Name)",
            "VariantName": "AllTraffic"
          }
        ]
      },
      "Next": "SageMaker CreateEndpoint"
    },
    "SageMaker CreateEndpoint": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sagemaker:createEndpoint",
      "Parameters": {
        "EndpointConfigName.$": "States.Format('config-{}',$$.Execution.Name)",
        "EndpointName.$": "States.Format('endpoint-{}',$$.Execution.Name)"
      },
      "End": true
    }
  }
}

You can create a new Step Functions state machine on the Step Functions console by selecting Write your workflow in code.

A successful run shows each state in green.

Each state points to resources in SageMaker. In the following screenshot, the link under Resource points to the model created as a result of the CreateModel step.

When to use what?

The following table summarizes the supported service integrations.

Supported Service Amazon States Language (New) Step Functions Workflow Studio AWS Step Functions Data Science SDK
AWS Lambda
AWS Batch
Amazon DynamoDB
Amazon ECS/AWS Fargate
Amazon SNS
Amazon SQS
AWS Glue
Amazon SageMaker
Amazon EMR
Amazon EMR on EKS
AWS CodeBuild
Amazon Athena
Amazon EKS
Amazon API Gateway
AWS Glue DataBrew
Amazon EventBridge
AWS Step Functions

Although most of what you typically need for your pipelines is included in the Step Functions Data Science SDK, you may need to integrate with other supported services that are supported by other choices shown in the preceding table.

In addition, consider the skillsets in your existing team—teams that are used to working with a particular tool may prefer sticking to the same for maximizing productivity. This is true when considering the options within Step Functions explored here, but also others such as the AWS Cloud Development Kit (AWS CDK), AWS Serverless Application Model (AWS SAM), Airflow, KubeFlow, and SageMaker Pipelines. Specifically around Pipelines, consider that data scientists and ML engineers may benefit from working on a single platform that includes the ability to not only maintain and run pipelines, but also manage models, endpoints, notebooks and other features.

Lastly, consider a hybrid set of services for using the right tool for the right job. For example:

Summary

In this post, we looked at three different ways of authoring and running Step Functions pipelines, specifically for end-to-end ML workflows. Choosing a pipelining tool for ML is an important step for a team, and is a decision that needs to consider the context, existing skillsets, connections to various other teams with these skillsets in an organization, available service integration, service limits, and applicable quotas. Contact your AWS team to help guide you through these decisions; we are eager to help!

For further reading, check out the following:


About the Author

Shreyas Subramanian is a AI/ML specialist Solutions Architect, and helps customers by using Machine Learning to solve their business challenges on the AWS Cloud.

View Original Source (aws.amazon.com) Here.

Leave a Reply

Your email address will not be published. Required fields are marked *

Shared by: AWS Machine Learning

Tags: