Customized model monitoring for near real-time batch inference with Amazon SageMaker

In this post, we present a framework to customize the use of Amazon SageMaker Model Monitor for handling multi-payload inference requests for near real-time inference scenarios. SageMaker Model Monitor monitors the quality of SageMaker ML models in production. Early and proactive detection of deviations in model quality enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling.

Oct 28, 2024 - 18:00
Customized model monitoring for near real-time batch inference with Amazon SageMaker

Real-world applications vary in inference requirements for their artificial intelligence and machine learning (AI/ML) solutions to optimize performance and reduce costs. Examples include financial systems processing transaction data streams, recommendation engines processing user activity data, and computer vision models processing video frames. In these scenarios, customized model monitoring for near real-time batch inference with Amazon SageMaker is essential, making sure the quality of predictions is continuously monitored and any deviations are promptly detected.

In this post, we present a framework to customize the use of Amazon SageMaker Model Monitor for handling multi-payload inference requests for near real-time inference scenarios. SageMaker Model Monitor monitors the quality of SageMaker ML models in production. Early and proactive detection of deviations in model quality enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling. SageMaker Model Monitor provides monitoring capabilities for data quality, model quality, bias drift in a model’s predictions, and drift in feature attribution. SageMaker Model Monitor adapts well to common AI/ML use cases and provides advanced capabilities given edge case requirements such as monitoring custom metrics, handling ground truth data, or processing inference data capture.

You can deploy your ML model to SageMaker hosting services and get a SageMaker endpoint for real-time inference. Your client applications invoke this endpoint to get inferences from the model. To reduce the number of invocations and meet custom business objectives, AI/ML developers can customize inference code to send multiple inference records in one payload to the endpoint for near real-time model predictions. Rather than using a SageMaker Model Monitoring schedule with native configurations, a SageMaker Model Monitor Bring Your Own Container (BYOC) approach meets these custom requirements. Although this advanced BYOC topic can appear overwhelming to AI/ML developers, with the right framework, there is opportunity to accelerate SageMaker Model Monitor BYOC development for customized model monitoring requirements.

In this post, we provide a BYOC framework with SageMaker Model Monitor to enable customized payload handling (such as multi-payload requests) from SageMaker endpoint data capture, use ground truth data, and output custom business metrics for model quality.

Overview of solution

SageMaker Model Monitor uses a SageMaker pre-built image using Spark Deequ, which accelerates the usage of model monitoring. Using this pre-built image occasionally becomes problematic when customization is required. For example, the pre-built image requires one inference payload per inference invocation (request to a SageMaker endpoint). However, if you’re sending multiple payloads in one invocation to reduce the number of invocations and setting up model monitoring with SageMaker Model Monitor, then you will need to explore additional capabilities within SageMaker Model Monitor.

A preprocessor script is a capability of SageMaker Model Monitor to preprocess SageMaker endpoint data capture before creating metrics for model quality. However, even with a preprocessor script, you still face a mismatch in the designed behavior of SageMaker Model Monitor, which expects one inference payload per request.

Given these requirements, we create the BYOC framework shown in the following diagram. In this example, we demonstrate setting up a SageMaker Model Monitor job for monitoring model quality.

The workflow includes the following steps:

  1.  Before and after training an AI/ML model, an AI/ML developer creates baseline and validation data that is used downstream for monitoring model quality. For example, users can save the accuracy score of a model, or create custom metrics, to validate model quality.
  2. An AI/ML developer creates a SageMaker endpoint including custom inference scripts. Data capture must be enabled for the SageMaker endpoint to save real-time inference data to Amazon Simple Storage Service (Amazon S3) and support downstream SageMaker Model Monitor.
  3. A user or application sends a request including multiple inference payloads. If you have a large volume of inference records, SageMaker batch transform may be a suitable option for your use case.
  4. The SageMaker endpoint (which includes the custom inference code to preprocesses the multi-payload request) passes the inference data to the ML model, postprocesses the predictions, and sends a response to the user or application. The information pertaining to the request and response is stored in Amazon S3.
  5. Independent of calling the SageMaker endpoint, the user or application generates ground truth for the predictions returned by the SageMaker endpoint.
  6. A customer image (BYOC) is pushed to Amazon Elastic Container Registry (Amazon ECR) that contains code to perform the following actions:
    • Read input and output contracts required for SageMaker Model Monitor.
    • Read ground truth data.
    • Optionally, read any baseline constraint or validation data (such as accuracy score threshold).
    • Process data capture stored in Amazon S3 from the SageMaker endpoint.
    • Compare real-time data with ground truth and create model quality metrics.
    • Publish metrics to Amazon CloudWatch Logs and output a model quality report.
  7. The AI/ML developer creates a SageMaker Model Monitor schedule and sets the custom image (BYOC) as the referable image URI.

This post uses code provided in the following GitHub repo to demonstrate the solution. The process includes the following steps:

  1. Train a multi-classification XGBoost model using the public forest coverage dataset.
  2. Create an inference script for the SageMaker endpoint for custom inference logic.
  3. Create a SageMaker endpoint with data capture enabled.
  4. Create a constraint file that contains metrics used to determine if model quality alerts should be generated.
  5. Create a custom Docker image for SageMaker Model Monitor by using the SageMaker Docker Build CLI and push it to Amazon ECR.
  6. Create a SageMaker Model Monitor schedule with the BYOC image.
  7. View the custom model quality report generated by the SageMaker Model Monitor job.

Prerequisites

To follow along with this walkthrough, make sure you have the following prerequisites:

Train the model

In the SageMaker Studio environment, launch a SageMaker training job to train a multi-classification model and output model artifacts to Amazon S3:


from sagemaker.xgboost.estimator import XGBoost
from sagemaker.estimator import Estimator

hyperparameters = {
    "max_depth": 5,
    "eta": 0.36,
    "gamma": 2.88,
    "min_child_weight": 9.89,
    "subsample": 0.77,
    "objective": "multi:softprob",
    "num_class": 7,
    "num_round": 50
}

xgb_estimator = XGBoost(
    entry_point="./src/train.py",
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    framework_version="1.5-1",
    output_path=f's3://{bucket}/{prefix_name}/models'
)

xgb_estimator.fit(
    {
        "train": train_data_path,
        "validation": validation_data_path
    },
    wait=True,
    logs=True
)

Create Inference Code

Before you deploy the SageMaker endpoint, create an inference script (inference.py) that contains a function to preprocess the request with multiple payloads, invoke the model, and postprocess results.

For output_fn, a payload index is created for each inference record found in the request. This enables you to merge ground truth records with data capture within the SageMaker Model Monitor job.

See the following code:

def input_fn(input_data, content_type):
    """Take request data and de-serializes the data into an object for prediction.
        When an InvokeEndpoint operation is made against an Endpoint running SageMaker model server,
        the model server receives two pieces of information:
            - The request Content-Type, for example "application/json"
            - The request data, which is at most 5 MB (5 * 1024 * 1024 bytes) in size.
    Args:
        input_data (obj): the request data.
        content_type (str): the request Content-Type.
    Returns:
        (obj): data ready for prediction. For XGBoost, this defaults to DMatrix.
    """
    
    if content_type == "application/json":
        request_json = json.loads(input_data)
        prediction_df = pd.DataFrame.from_dict(request_json)
        return xgb.DMatrix(prediction_df)
    else:
        raise ValueError


def predict_fn(input_data, model):
    """A predict_fn for XGBooost Framework. Calls a model on data deserialized in input_fn.
    Args:
        input_data: input data (DMatrix) for prediction deserialized by input_fn
        model: XGBoost model loaded in memory by model_fn
    Returns: a prediction
    """
    output = model.predict(input_data, validate_features=True)
    return output


def output_fn(prediction, accept):
    """Function responsible to serialize the prediction for the response.
    Args:
        prediction (obj): prediction returned by predict_fn .
        accept (str): accept content-type expected by the client.
    Returns: JSON output
    """
    
    if accept == "application/json":
        prediction_labels = np.argmax(prediction, axis=1)
        prediction_scores = np.max(prediction, axis=1)
        output_returns = [
            {
                "payload_index": int(index), 
                "label": int(label), 
                "score": float(score)} for label, score, index in zip(
                prediction_labels, prediction_scores, range(len(prediction_labels))
            )
        ]
        return worker.Response(encoders.encode(output_returns, accept), mimetype=accept)
    
    else:
        raise ValueError

Deploy the SageMaker endpoint

Now that you have created the inference script, you can create the SageMaker endpoint:


from sagemaker.model_monitor import DataCaptureConfig

predictor = xgb_estimator.deploy(
    instance_type="ml.m5.large",
    initial_instance_count=1,
    wait=True,
    data_capture_config=DataCaptureConfig(
        enable_capture=True,
        sampling_percentage=100,
        destination_s3_uri=f"s3://{bucket}/{prefix_name}/model-monitor/data-capture"
    ),
    source_dir="./src",
    entry_point="inference.py"
)

Create constraints for model quality monitoring

In model quality monitoring, you need to compare your metric generated from ground truth and data capture with a pre-specified threshold. In this example, we use the accuracy value of the trained model on the test set as a threshold. If the newly computed accuracy metric (generated using ground truth and data capture) is lower than this threshold, a violation report will be generated and the metrics will be published to CloudWatch.

See the following code:

constraints_dict = {
    "accuracy":{
        "threshold": accuracy_value
    }
}
    

# Serializing json
json_object = json.dumps(constraints_dict, indent=4)
 
# Writing to sample.json
with open("constraints.json", "w") as outfile:
    outfile.write(json_object)

This contraints.json file is written to Amazon S3 and will be the input for the processing job for the SageMaker Model Monitor job downstream.

Publish the BYOC image to Amazon ECR

Create a script named model_quality_monitoring.py to perform the following functions:

  • Read environment variables and any arguments passed to the SageMaker Model Monitor job
  • Read SageMaker endpoint data capture and constraint metadata configured with the SageMaker Model Monitor job
  • Read ground truth data from Amazon S3 using the AWS SDK for pandas
  • Create accuracy metrics with data capture and ground truth
  • Create metrics and violation reports given constraint violations
  • Publish metrics to CloudWatch if violations are present

This script serves as the entry point for the SageMaker Model Monitor job. With a custom image, the entry point script needs to be specified in the Docker image, as shown in the following code. This way, when the SageMaker Model Monitor job initiates, the specified script is run. The sm-mm-mqm-byoc:1.0 image URI is passed to the image_uri argument when you define the SageMaker Model Monitor job downstream.

FROM 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3

RUN python3 -m pip install awswrangler

ENV PYTHONUNBUFFERED=TRUE

ADD ./src/model_quality_monitoring.py /

ENTRYPOINT ["python3", "/model_quality_monitoring.py"]

The custom BYOC image is pushed to Amazon ECR using the SageMaker Docker Build CLI:

sm-docker build . --file ./docker/Dockerfile --repository sm-mm-mqm-byoc:1.0

Create a SageMaker Model Monitor schedule

Next, you use the Amazon SageMaker Python SDK to create a model monitoring schedule. You can define the BYOC ECR image created in the previous section as the image_uri parameter.

You can customize the environment variables and arguments passed to the SageMaker Processing job when SageMaker Model Monitor runs the model quality monitoring job. In this example, the ground truth Amazon S3 URI path is passed as an environment variable and is used within the SageMaker Processing job:


sm_mm_mqm = ModelMonitor(
    role=role, 
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sm-mm-mqm-byoc:1.0", 
    instance_count=1, 
    instance_type='ml.m5.xlarge', 
    base_job_name="sm-mm-mqm-byoc",
    sagemaker_session=sess,
    env={
        "ground_truth_s3_uri_path": f"s3://{bucket}/{prefix_name}/model-monitor/mqm/ground_truth/{predictor.endpoint_name}"
    }
)

Before you create the schedule, specify the endpoint name, the Amazon S3 URI output location you want to send violation reports to, the statistics and constraints metadata files (if applicable), and any custom arguments you want to pass to your entry script within your BYOC SageMaker Processing job. In this example, the argument –-create-violation-tests is passed, which creates a mock violation for demonstration purposes. SageMaker Model Monitor accepts the rest of the parameters and translates them into environment variables, which you can use within your custom monitoring job.

sm_mm_mqm.create_monitoring_schedule(
    endpoint_input=predictor.endpoint_name,
    output=MonitoringOutput(
        source="/opt/ml/processing/output",
        destination=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/reports"
    ),
    statistics=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/statistics.json",
    constraints=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/constraints.json",
    monitor_schedule_name="sm-mm-byoc-batch-inf-schedule",
    schedule_cron_expression=CronExpressionGenerator().hourly(),
    arguments=[
        "--create-violation-tests"
    ]
)

Review the entry point script model_quallity_monitoring.py to better understand how to use custom arguments and environment variables provided by the SageMaker Model Monitor job.

Observe the SageMaker Model Monitor job output

Now that the SageMaker Model Monitor resource is created, the SageMaker endpoint is invoked.

In this example, a request is provided that includes a list of two payloads in which we want to collect predictions:

sm_runtime = boto3.client("sagemaker-runtime")

response = sm_runtime.invoke_endpoint(
    EndpointName=predictor.endpoint_name,
    ContentType="application/json",
    Accept="application/json",
    Body=test_records,
    InferenceId="0"
)

InferenceId is passed as an argument to the invoke_endpoint method. This ID is used downstream when merging the ground truth data to the real-time SageMaker endpoint data capture. In this example, we want to collect ground truth with the following structure.

InferenceI payload_index groundTruthLabel
0 0 1
0 1 0

This makes it simpler when merging the ground truth data with real-time data within the SageMaker Model Monitor custom job.

Because we set the CRON schedule for the SageMaker Model Monitor job to an hourly schedule, we can view the results at the end of the hour. In SageMaker Studio Classic, by navigating the SageMaker endpoint details page, you can choose the Monitoring job history tab to view status reports of the SageMaker Model Monitor job.


If an issue is found, you can choose the monitoring job name to review the report.

In this example, the custom model monitoring metric created in the BYOC flagged an accuracy score violation of -1 (this was done purposely for demonstration with the argument --create-violation-tests).

This gives you the ability to monitor model quality violations for your custom SageMaker Model Monitor job within the SageMaker Studio console. If you want to invoke CloudWatch alarms based on published CloudWatch metrics, you must create these CloudWatch metrics with your BYOC job. You can review how this is done within the monitor_quality_monitoring.py script. For automated alerts for model monitoring, creating an Amazon Simple Notification Service (Amazon SNS) topic is recommended, which email user groups will subscribe to for alerts on a given CloudWatch metric alarm.

Clean up

To avoid incurring future charges, delete all resources related to the SageMaker Model Monitor schedule by completing the following steps:

  1. Delete data capture and any ground truth data:
    ! aws s3 rm s3://{bucket}/{prefix_name}/model-monitor/data-capture/{predictor.endpoint_name} --recursive
    ! aws s3 rm s3://{bucket}/{prefix_name}/model-monitor/mqm/ground_truth/{predictor.endpoint_name} --recursive
  2. Delete the monitoring schedule:
    sm_mm_mqm.delete_monitoring_schedule()
  3. Delete the SageMaker model and SageMaker endpoint:
    predictor.delete_model()
    predictor.delete_endpoint()

Conclusion

Custom business or technical requirements for a SageMaker endpoint frequently have an impact on downstream efforts in model monitoring. In this post, we provided a framework that enables you to customize SageMaker Model Monitor jobs (in this case, for monitoring model quality) to handle the use case of passing multiple inference payloads to a SageMaker endpoint.

Explore the provided GitHub repository to implement this customized model monitoring framework with SageMaker Model Monitor. You can use this framework as a starting point to monitor your custom metrics or handle other unique requirements for model quality monitoring in your AI/ML applications.


About the Authors

Joe King is a Sr. Data Scientist at AWS, bringing a breadth of data science, ML engineering, MLOps, and AI/ML architecting to help businesses create scalable solutions on AWS.

Ajay Raghunathan is a Machine Learning Engineer at AWS. His current work focuses on architecting and implementing ML solutions at scale. He is a technology enthusiast and a builder with a core area of interest in AI/ML, data analytics, serverless, and DevOps. Outside of work, he enjoys spending time with family, traveling, and playing football.

Raju Patil is a Sr. Data Scientist with AWS Professional Services. He architects, builds, and deploys AI/ML solutions to help AWS customers across different verticals overcome business challenges in a variety of AI/ML use cases.

Jat AI Stay informed with the latest in artificial intelligence. Jat AI News Portal is your go-to source for AI trends, breakthroughs, and industry analysis. Connect with the community of technologists and business professionals shaping the future.