logo
Menu

Sagemaker Pipelines using step decorators

AWS announces a new feature, the step decorator, to enable users to easily convert existing python functions to Sagemaker steps for execution in a Sagemaker Pipeline.

Published Jan 29, 2024
SageMaker Pipelines comes with classical built-in Steps, based on different stages of the ML lifecycle e.g. Preprocessing, Training, RegisterModel, etc [1]. This is useful when one is working within the Sagemaker constructs, but has its limitations when users want to directly lift and shift Python code into a SageMaker Pipeline.
During reinvent 2023, AWS announced a new feature, the @step decorator which allows developer to convert python functions to sagemaker pipleine steps [2] .This provides a low-code experience for data scientists to convert the Machine Learning (ML) development code into repeatable and reusable workflow steps of Amazon SageMaker Pipelines. This allows developers the flexibility of running their ml pipelines locally with normal python code and then converting functions into steps by decorating with the step decorator and creating dependencies between steps and creating sagemaker pipeline with the steps passed as argument.

Prerequisites

In this example, we will use dataset that are already available in Amazon SageMaker Canvas [4] as You can also download the Shipping Logs.csv directly from [5]. This dataset needs to be stored in a S3 bucket, which wi;; be referenced in input parameter as in the code below (replace the s3_root_csv value with the path to the csv in the s3 bucket)
We will need the following dependencies in a requirements.txt file
1
2
3
4
5
6
7
sagemaker
pandas
catboost
scikit-learn
fsspec
s3fs
ipywidgets
In the same directory, add a config.yaml with reference to the path of the requirements.txt file created above. I have created this in the same directory. This will install the requirements in the sagemaker image when the local code is run remotely in sagemaker containers.
1
2
3
4
5
6
7
8
9
10
SchemaVersion: '1.0'
SageMaker:
PythonSDK:
Modules:
RemoteFunction:
Dependencies: ./requirements.txt
IncludeLocalWorkDir: true
CustomFileFilter:
IgnoreNamePatterns:
- "*.ipynb" # all notebook files
We need to set the directory in which the config.yaml file resides so that the step decorator can make use of the settings. The SAGEMAKER_USER_CONFIG_OVERRIDE environment variable, is set to the current working directory, where we stored the config.yaml file so the settings can be used.
1
2
3
4
import os

# Set path to config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()
For executing this pipeline in Sagemaker, we first need to create a PipelineSession context (which inherits from Sagemaker Session) and initialises the pipeline, and also set the default bucket which comes with the Session, for the outputs from the pipeline runs.
The input_path variable sets the path to csv for the input dataset downloaded earlier. Change this path, depending on the bucket and prefix you have stored this in.
We will also specify input pipeline parameters at runtime, of various data types using ParameterInteger, ParameterString from the sagemaker.workflow.parameters module.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker import get_execution_role
from sagemaker.workflow.parameters import (
ParameterInteger,
ParameterString,
)

pipeline_session = PipelineSession()
region = pipeline_session.boto_region_name
default_bucket = pipeline_session.default_bucket()
role = get_execution_role()
input_path = f"s3://{default_bucket}/canvas/sample_dataset/canvas-sample-shipping-logs.csv"

categorical_features_names = ['ShippingPriority' ,'ShippingOrigin', 'InBulkOrder', 'Carrier']
instance_count = ParameterInteger(
name="InstanceCount",
default_value=1
)

instance_type = ParameterString(
name="InstanceType",
default_value='ml.m5.large'
)

Local Pipeline Execution

We will first execute a simple workflow, where we take the dataset, split it into train, test , validate dataframes and train a catboost regression model to predict the Expected Number of Shipping Days.Finally, we will have a function to evaluate the performance of this model on the test dataset and compute the mean square error.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
!pip install -r ./requirements.txt

import pandas as pd
import numpy as np
import os
from catboost import CatBoost, CatBoostRegressor, Pool
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import sagemaker

def preprocess(raw_data):
df = pd.read_csv(raw_data)
df.drop(['ProductId', 'OrderID', 'OnTimeDelivery', 'OrderDate'], axis=1, inplace=True)
train, test = train_test_split(df, test_size=0.2)
train, validation = train_test_split(train, test_size=0.2)
print("Completed running the processing job")
return pd.DataFrame(train), pd.DataFrame(validation), pd.DataFrame(test)

def train(
train_df,
validation_df,
categorical_features_names,
target = "ExpectedShippingDays",
iterations=100,
learning_rate=0.01,
n_estimators=4000,
):
y_train = train_df.loc[:, target]
train_df.drop([target], axis=1, inplace=True)
y_validation = validation_df.loc[:, target]
validation_df.drop([target], axis=1, inplace=True)
train_pool = Pool(train_df, label=y_train, cat_features=categorical_features_names)
val_pool = Pool(validation_df, label=y_validation, cat_features=categorical_features_names)
model = CatBoostRegressor(custom_metric= ['R2', 'RMSE'], learning_rate=learning_rate, n_estimators=n_estimators)
model.fit(train_pool, eval_set=val_pool, verbose=2000, plot=True)
return model

def evaluate(model, test_df,target = "ExpectedShippingDays",):
y_test = test_df.loc[:, target]
test_df.drop([target], axis=1, inplace=True)
predictions = model.predict(test_df)

mse = mean_squared_error(y_test, predictions)
std = np.std(y_test - predictions)
report_dict = {
"regression_metrics": {
"mse": {"value": mse, "standard_deviation": std},
},
}
return report_dict
We can call the functions above in a normal python script as below to check that script runs end to end as expected and see the logs below. Before doing, this make sure the dependencies in the requirements.txt file are installed. Either pip install -r requirements.txt inside a virtual environment such as conda or venv or if using a notebook add !pip install -r ./requirements.txt in a preceding cell block.
1
2
3
4
5
6
7
8

categorical_features_names = ['ShippingPriority' ,'ShippingOrigin', 'InBulkOrder', 'Carrier']

train_df, val_df, test_df = preprocess(s3_root_csv)
categorical_features_names = ['ShippingPriority' ,'ShippingOrigin', 'InBulkOrder', 'Carrier']
model = train(train_df, val_df, categorical_features_names)
report = evaluate(model, test_df)
print(f"evaluation report: {report}")

Remote Execution in Sagemaker

Next, we want to convert the python functions we executed locally to Sagemaker steps to enable execution end to end in a sagemaker pipeline. We can do this by simply passing the custom functions (i.e. the preprocess , train and evaluate functions above) as input to the step decorator as below. Alternatively, you could also decorate each function with @step[3].
The return value of the step decorator is a DelayedReturn object. This means that the function is not executed immediately but is delayed to pipeline execution time when the step is running. The DelayedReturn object from each step, can be used as input of subsequent steps [3]. This way, one can chain a series of functions with the step decorator and later pass the array of steps as input when creating sagemaker pipeline for execution.
1
2
3
4
5
6
7
8
9
10
from sagemaker.workflow.function_step import step

delayed_data = step(preprocess, name="ShippingPreprocess")(input_path)
delayed_model = step(train, name="ShippingTrain")(train_df=delayed_data[0],
validation_df=delayed_data[1],
categorical_features_names=categorical_features_names)
delayed_evaluation_result = step(evaluate, name="ShippingEval")(model=delayed_model,
test_df=delayed_data[2])

steps = [delayed_evaluation_result]
After defining all the steps, we can group them into a pipeline ShippingPipeline as below along with the parameters defined previously (these will be used to set the instance count and type for each step training job).
Note if running from Sagemaker notebook, you may need to run !sudo chmod 777 lost+found, otherwise you may get a permissions denied error.
1
2
3
4
5
6
7
8
9
10
11
12
13
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
name="ShippingPipeline",
parameters=[
instance_count,
instance_type,
],
steps=steps,
sagemaker_session=pipeline_session
)
pipeline.upsert(role_arn=role)
execution = pipeline.start()
Once the pipeline is started, you can monitor the progress of each step in the pipeline, in the training jobs, which should have the name of the step in the job . If all steps, are successfully, you should see the job status a compete.
We can also list the execution steps. Once the pipeline completes successfully we should see all the steps and the status and timestamps as below.
1
execution.list_steps()
We can retrieve the output of a step as follows, once the pipeline completes. For example, the preprocess step should return a tuple of dataframes and the evaluation step, should return a dictionary of the metrics report.
1
execution.result(step_name="ShippingPreprocess")
Navigating to sagemaker studio, we can get more informatoon about the pipeline, for example the visual DAG, logs, InputDataConfig and Output of each step etc

 Conclusion

This blog has demonstrated the use of step decorator to enable practitioners to more easily convert existing python functions to Sagemaker steps for execution in Sagemaker. This reduces the amount of refactoring required and boilerplate code. Each step can now be executed as an isolated training job allowing chaining together python functions into a pipeline.,
This gives the developer more flexibility of using custom steps rather than just the classical steps available in Sagemaker.

 References

  1. Sagemaker Built in Steps https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html
  2. AWS simplified AI/ML workflow announcement https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sagemaker-pipelines-developer-ai-ml/
  3. Lift and Sift Step Decorator https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-step-decorator.html
  4. Sagemaker Canvas sample datasets https://docs.aws.amazon.com/sagemaker/latest/dg/canvas-sample-datasets.html.
  5. Shipping Data Download: https://catalog.us-east-1.prod.workshops.aws/workshops/80ba0ea5-7cf9-4b8c-9d3f-1cd988b6c071/en-US/1-use-cases/7-supply-chain#loading-the-dataset.
     

Comments