Monitoring Data Pipelines with Amazon CloudWatch: A How-To Guide
A data pipeline that runs silently and incorrectly is worse than one that fails loudly. Production data engineering requires monitoring that catches failures, slowdowns, and data quality issues before stakeholders notice β ideally before they have any downstream impact. Amazon CloudWatch provides the building blocks: metrics, logs, alarms, dashboards, and Log Insights queries. This tutorial shows you how to wire them together for the AWS data services you actually use: Glue, Step Functions, and Lambda.
Setting Up SNS for Alert Delivery
Before creating alarms, set up an SNS topic that routes alerts to your teamβs notification channel. All CloudWatch alarms in this tutorial will publish to this topic.
# Create the SNS topic
SNS_TOPIC_ARN=$(aws sns create-topic \
--name data-pipeline-alerts \
--region ca-central-1 \
--query TopicArn \
--output text)
echo "SNS Topic ARN: $SNS_TOPIC_ARN"
# Subscribe your team's email
aws sns subscribe \
--topic-arn "$SNS_TOPIC_ARN" \
--protocol email \
--notification-endpoint "data-team@yourcompany.com"
# Optional: subscribe a Slack webhook via Lambda
# (deploy a Lambda that transforms SNS β Slack webhook, then subscribe it)
aws sns subscribe \
--topic-arn "$SNS_TOPIC_ARN" \
--protocol lambda \
--notification-endpoint "arn:aws:lambda:ca-central-1:123456789012:function:sns-to-slack"
Store the SNS Topic ARN β you will reference it in every alarm below.
Monitoring AWS Glue Jobs
Glue publishes metrics to CloudWatch automatically when you enable metrics on the job. Enable them in the job definition with --enable-metrics in the default arguments.
Creating a Glue Job Failure Alarm
# Alarm: alert when a Glue job fails
aws cloudwatch put-metric-alarm \
--alarm-name "GlueJob-ingest-raw-FAILED" \
--alarm-description "Alert when the ingest-raw Glue job fails" \
--namespace "Glue" \
--metric-name "glue.driver.aggregate.numFailedTasks" \
--dimensions Name=JobName,Value=prod-ingest-raw \
--statistic Sum \
--period 300 \
--evaluation-periods 1 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--treat-missing-data notBreaching \
--alarm-actions "$SNS_TOPIC_ARN" \
--ok-actions "$SNS_TOPIC_ARN" \
--region ca-central-1
Glue Job Duration Alarm (SLA Breach Detection)
If your pipeline has an SLA (e.g., must complete within 30 minutes), alert when execution time exceeds the threshold:
# Note: Glue job execution time is available as a custom metric
# You need to publish this yourself from the job, or use EventBridge
# Alternative: use CloudWatch Logs metric filter on the Glue log
aws logs put-metric-filter \
--log-group-name "/aws-glue/jobs/output" \
--filter-name "GlueJobDurationExceeded" \
--filter-pattern "[timestamp, requestId, level=\"ERROR\", ...]" \
--metric-transformations \
metricName=GlueJobErrors,metricNamespace=DataPipeline,metricValue=1,defaultValue=0 \
--region ca-central-1
Publishing Custom Glue Metrics from PySpark
For metrics that Glue does not natively emit (e.g., records processed, nulls detected), publish them from within your Glue job:
# Inside your Glue PySpark job script
import boto3
cloudwatch = boto3.client("cloudwatch", region_name="ca-central-1")
def publish_metric(metric_name: str, value: float, job_name: str, unit: str = "Count"):
cloudwatch.put_metric_data(
Namespace="DataPipeline/Glue",
MetricData=[
{
"MetricName": metric_name,
"Dimensions": [
{"Name": "JobName", "Value": job_name},
],
"Value": value,
"Unit": unit,
}
],
)
# After running your transformation
records_in = source_df.count()
records_out = output_df.count()
null_rate = output_df.filter("order_id IS NULL").count() / max(records_out, 1)
publish_metric("RecordsProcessed", records_out, "prod-ingest-raw")
publish_metric("NullRatePct", null_rate * 100, "prod-ingest-raw", "Percent")
# Alert if null rate exceeds 5%
if null_rate > 0.05:
publish_metric("DataQualityFailure", 1, "prod-ingest-raw")
Then create an alarm on your custom metric:
aws cloudwatch put-metric-alarm \
--alarm-name "GlueJob-DataQualityFailure" \
--alarm-description "Data quality check failed in Glue job" \
--namespace "DataPipeline/Glue" \
--metric-name "DataQualityFailure" \
--dimensions Name=JobName,Value=prod-ingest-raw \
--statistic Sum \
--period 300 \
--evaluation-periods 1 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--treat-missing-data notBreaching \
--alarm-actions "$SNS_TOPIC_ARN" \
--region ca-central-1
Monitoring AWS Step Functions
Step Functions emits execution-level metrics to the AWS/States namespace automatically. The most useful: ExecutionsFailed, ExecutionsTimedOut, and ExecutionTime.
# Alarm: any Step Functions execution failure
aws cloudwatch put-metric-alarm \
--alarm-name "StepFunctions-DailyPipeline-ExecutionFailed" \
--alarm-description "Daily ETL pipeline Step Functions execution failed" \
--namespace "AWS/States" \
--metric-name "ExecutionsFailed" \
--dimensions Name=StateMachineArn,Value="arn:aws:states:ca-central-1:123456789012:stateMachine:daily-etl-pipeline" \
--statistic Sum \
--period 300 \
--evaluation-periods 1 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--treat-missing-data notBreaching \
--alarm-actions "$SNS_TOPIC_ARN" \
--region ca-central-1
# Alarm: pipeline taking more than 45 minutes (SLA breach)
aws cloudwatch put-metric-alarm \
--alarm-name "StepFunctions-DailyPipeline-DurationSLA" \
--alarm-description "Daily pipeline exceeding 45-minute SLA" \
--namespace "AWS/States" \
--metric-name "ExecutionTime" \
--dimensions Name=StateMachineArn,Value="arn:aws:states:ca-central-1:123456789012:stateMachine:daily-etl-pipeline" \
--statistic Maximum \
--period 3600 \
--evaluation-periods 1 \
--threshold 2700000 \
--comparison-operator GreaterThanThreshold \
--treat-missing-data notBreaching \
--alarm-actions "$SNS_TOPIC_ARN" \
--region ca-central-1
ExecutionTime is in milliseconds β 2,700,000 ms = 45 minutes.
Monitoring AWS Lambda ETL Functions
Lambda metrics in the AWS/Lambda namespace include Errors, Duration, Throttles, and ConcurrentExecutions.
# Alarm: Lambda error rate over 5% in a 5-minute window
# Uses a metric math expression
aws cloudwatch put-metric-alarm \
--alarm-name "Lambda-etl-processor-ErrorRate" \
--alarm-description "Lambda ETL error rate exceeds 5%" \
--metrics '[
{
"Id": "errors",
"MetricStat": {
"Metric": {
"Namespace": "AWS/Lambda",
"MetricName": "Errors",
"Dimensions": [{"Name": "FunctionName", "Value": "etl-processor"}]
},
"Period": 300,
"Stat": "Sum"
},
"ReturnData": false
},
{
"Id": "invocations",
"MetricStat": {
"Metric": {
"Namespace": "AWS/Lambda",
"MetricName": "Invocations",
"Dimensions": [{"Name": "FunctionName", "Value": "etl-processor"}]
},
"Period": 300,
"Stat": "Sum"
},
"ReturnData": false
},
{
"Id": "error_rate",
"Expression": "100 * errors / MAX([invocations, 1])",
"Label": "Error Rate (%)",
"ReturnData": true
}
]' \
--threshold 5 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--treat-missing-data notBreaching \
--alarm-actions "$SNS_TOPIC_ARN" \
--region ca-central-1
CloudWatch Log Insights Queries
Log Insights lets you run SQL-like queries against CloudWatch log groups interactively or programmatically. These are essential for debugging pipeline failures.
Find all Glue job errors in the last 24 hours:
fields @timestamp, @message
| filter @logStream like /driver/
| filter @message like /ERROR/ or @message like /Exception/
| sort @timestamp desc
| limit 100
Analyze Lambda cold start latency:
filter @type = "REPORT"
| parse @message "Init Duration: * ms" as initDuration
| filter ispresent(initDuration)
| stats
count() as coldStarts,
avg(initDuration) as avgInitMs,
max(initDuration) as maxInitMs,
pct(initDuration, 95) as p95InitMs
| sort coldStarts desc
Find the slowest Step Functions states in the last 7 days:
fields @timestamp, execution_arn, name, status, duration
| filter type = "TaskStateExited"
| stats
avg(duration) as avgDurationMs,
max(duration) as maxDurationMs,
count() as executions
by name
| sort avgDurationMs desc
| limit 20
Detect missing pipeline runs (pipeline should run every day):
fields @timestamp, @message
| filter @message like /Pipeline execution completed/
| stats count() as runs by bin(1d) as day
| filter runs = 0
Run these queries programmatically via the AWS CLI for automated reporting:
aws logs start-query \
--log-group-name "/aws/lambda/etl-processor" \
--start-time $(date -d '24 hours ago' +%s) \
--end-time $(date +%s) \
--query-string 'fields @timestamp, @message | filter @message like /ERROR/ | limit 50' \
--region ca-central-1
CloudWatch Dashboard JSON
A dashboard consolidates key metrics in one view. Here is a minimal JSON definition for a data pipeline dashboard:
{
"widgets": [
{
"type": "metric",
"properties": {
"title": "Glue Job Status",
"metrics": [
["Glue", "glue.driver.aggregate.numCompletedTasks", "JobName", "prod-ingest-raw"],
["Glue", "glue.driver.aggregate.numFailedTasks", "JobName", "prod-ingest-raw"]
],
"period": 300,
"stat": "Sum",
"view": "timeSeries"
}
},
{
"type": "metric",
"properties": {
"title": "Step Functions Executions",
"metrics": [
["AWS/States", "ExecutionsStarted", "StateMachineArn", "arn:aws:states:ca-central-1:123456789012:stateMachine:daily-etl-pipeline"],
["AWS/States", "ExecutionsSucceeded", "StateMachineArn", "arn:aws:states:ca-central-1:123456789012:stateMachine:daily-etl-pipeline"],
["AWS/States", "ExecutionsFailed", "StateMachineArn", "arn:aws:states:ca-central-1:123456789012:stateMachine:daily-etl-pipeline"]
],
"period": 3600,
"stat": "Sum",
"view": "timeSeries"
}
},
{
"type": "metric",
"properties": {
"title": "Lambda Error Rate",
"metrics": [
[{"expression": "100 * errors / MAX([invocations, 1])", "label": "Error Rate (%)"}],
["AWS/Lambda", "Errors", "FunctionName", "etl-processor", {"id": "errors", "visible": false}],
["AWS/Lambda", "Invocations", "FunctionName", "etl-processor", {"id": "invocations", "visible": false}]
],
"period": 300,
"stat": "Sum",
"view": "timeSeries",
"yAxis": {"left": {"min": 0, "max": 100}}
}
},
{
"type": "alarm",
"properties": {
"title": "Pipeline Alarms",
"alarms": [
"arn:aws:cloudwatch:ca-central-1:123456789012:alarm:GlueJob-ingest-raw-FAILED",
"arn:aws:cloudwatch:ca-central-1:123456789012:alarm:StepFunctions-DailyPipeline-ExecutionFailed",
"arn:aws:cloudwatch:ca-central-1:123456789012:alarm:Lambda-etl-processor-ErrorRate"
]
}
}
]
}
Deploy the dashboard:
aws cloudwatch put-dashboard \
--dashboard-name "DataPipelineMonitoring" \
--dashboard-body file://dashboard.json \
--region ca-central-1
Composing with Other Monitoring Tools
CloudWatch is the foundation, but production data platforms often layer additional tools on top. For pipeline-level SLA monitoring and data quality alerting, CloudWatch integrates with DataOps Practices tooling including Great Expectations (see Data Quality with Great Expectations on AWS) which publishes custom metrics to CloudWatch when validation suites run. For CI/CD-triggered deployments where you want to verify monitoring coverage before merge, CI/CD for Data Pipelines with GitHub Actions covers the full workflow.
Conclusion
Effective pipeline monitoring is not a single alarm β it is a layered system that catches failures at the infrastructure level (Glue job failed), the SLA level (pipeline took too long), and the data quality level (null rate exceeded threshold). CloudWatch provides all three layers through metric alarms, Log Insights queries, and custom metric publishing from within your pipeline code.
The configurations in this tutorial are a starting point. Every pipeline has unique SLAs and failure modes β invest time early in understanding what βnormalβ looks like for your specific workloads before setting thresholds.
If you are building monitoring and observability for an AWS data platform and want expert guidance on alarm strategy, incident response runbooks, and cost-optimized log retention, contact Infra IT Consulting. We help Canadian data teams build reliable, observable data infrastructure.
Related posts
Book a free 30-minute consultation to discuss your data engineering and analytics needs.
Talk to our team β