Where MLOps Meets Data Engineering: Building ML-Ready Pipelines
The boundary between data engineering and machine learning is more porous than most organisationsβ org charts suggest. Data scientists complain that the data they receive is not clean enough to train reliable models. Data engineers complain that ML teams make undocumented changes to feature definitions that break production pipelines. Platform teams get caught in the middle trying to serve both constituencies with infrastructure designed for neither.
MLOps β the practice of applying DevOps and data engineering rigour to machine learning model development, deployment, and monitoring β requires that data engineers and ML engineers share ownership of the pipeline from raw data through to model serving. This post describes how to build that shared foundation on AWS using SageMaker Feature Store, Glue, and the operational patterns that make ML-ready data pipelines reliable at scale.
The Feature Pipeline: Where Data Engineering Ends and ML Begins
The most important interface between data engineering and ML is the feature pipeline. A feature is an input variable used by a model at training and inference time. The feature pipeline transforms raw data from operational systems into feature values, validates them, and stores them in a location where both the training process and the inference service can access them consistently.
The canonical failure mode is training-serving skew: the features used to train a model are computed differently from the features used to serve predictions in production. A model trained on daily average revenue per customer will make poor predictions if production inference uses hourly average revenue because that was more convenient to compute. The skew is often subtle β a different join key, a different time window, a different null-handling rule β and produces model degradation that is hard to diagnose.
The solution is a single feature computation function used at both training time and inference time. Amazon SageMaker Feature Store is designed around this principle: you compute a feature value once, store it in the Feature Store, and both training jobs and inference endpoints read from the same store.
SageMaker Feature Store Architecture
SageMaker Feature Store has two components with distinct purposes:
Online Store: A low-latency key-value store (backed by DynamoDB) that serves the most recent feature values for a given entity. Inference endpoints query the Online Store for real-time predictions. Read latency is in the single-digit milliseconds.
Offline Store: An S3-backed columnar store (Parquet format, catalogued in Glue Data Catalog) that retains the full history of feature values with timestamps. Training jobs read from the Offline Store to construct training datasets at the correct point in time, preventing data leakage.
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import (
FeatureDefinition, FeatureTypeEnum
)
sagemaker_session = sagemaker.Session()
region = "ca-central-1"
# Define the customer features group
feature_group = FeatureGroup(
name="customer-revenue-features",
sagemaker_session=sagemaker_session
)
feature_group.feature_definitions = [
FeatureDefinition(feature_name="customer_id", feature_type=FeatureTypeEnum.STRING),
FeatureDefinition(feature_name="event_time", feature_type=FeatureTypeEnum.STRING), # ISO 8601
FeatureDefinition(feature_name="revenue_30d", feature_type=FeatureTypeEnum.FRACTIONAL),
FeatureDefinition(feature_name="order_count_30d", feature_type=FeatureTypeEnum.INTEGRAL),
FeatureDefinition(feature_name="avg_order_value_90d", feature_type=FeatureTypeEnum.FRACTIONAL),
FeatureDefinition(feature_name="days_since_last_order", feature_type=FeatureTypeEnum.INTEGRAL),
FeatureDefinition(feature_name="product_category_primary", feature_type=FeatureTypeEnum.STRING),
]
# Create the feature group with both online and offline stores
feature_group.create(
s3_uri=f"s3://ml-feature-store-prod/customer-revenue-features/",
record_identifier_name="customer_id",
event_time_feature_name="event_time",
role_arn="arn:aws:iam::123456789:role/SageMakerFeatureStoreRole",
enable_online_store=True,
tags=[{"Key": "domain", "Value": "customer"}, {"Key": "environment", "Value": "production"}]
)
Building the Feature Engineering Pipeline with AWS Glue
The feature engineering pipeline is data engineering work: reading from curated data lake tables, computing feature values, validating them, and writing them to Feature Store. This pipeline runs on a schedule (typically daily for batch features, or continuously for near-real-time features using Glue Streaming) and is version-controlled, tested, and deployed via CI/CD β the same DataOps practices applied to any production pipeline.
# Glue feature engineering job: customer revenue features
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import boto3
from datetime import datetime, timezone
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'FEATURE_GROUP_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Read from curated data lake
orders = glueContext.create_dynamic_frame.from_catalog(
database="analytics_db",
table_name="orders_curated"
).toDF()
# Compute features with a consistent event_time
event_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
customer_features = orders \
.filter(F.col("order_status").isin(["confirmed", "shipped", "delivered"])) \
.groupBy("customer_id") \
.agg(
F.sum(F.when(
F.datediff(F.current_date(), F.col("created_at")) <= 30,
F.col("order_total")
).otherwise(0)).alias("revenue_30d"),
F.count(F.when(
F.datediff(F.current_date(), F.col("created_at")) <= 30,
1
)).alias("order_count_30d"),
F.avg(F.when(
F.datediff(F.current_date(), F.col("created_at")) <= 90,
F.col("order_total")
)).alias("avg_order_value_90d"),
F.min(F.datediff(F.current_date(), F.col("created_at"))).alias("days_since_last_order")
) \
.withColumn("event_time", F.lit(event_time))
# Validate features before ingestion
assert customer_features.filter(F.col("revenue_30d") < 0).count() == 0, \
"Negative revenue detected β feature pipeline halted"
assert customer_features.filter(F.col("customer_id").isNull()).count() == 0, \
"Null customer_id detected β feature pipeline halted"
# Ingest into SageMaker Feature Store via Spark connector
feature_store_manager = boto3.client("sagemaker-featurestore-runtime", region_name="ca-central-1")
# Write in batches
records = customer_features.collect()
for batch_start in range(0, len(records), 100):
batch = records[batch_start:batch_start + 100]
feature_store_manager.batch_get_record # Example; use put_record in practice
for row in batch:
feature_store_manager.put_record(
FeatureGroupName=args['FEATURE_GROUP_NAME'],
Record=[
{"FeatureName": "customer_id", "ValueAsString": str(row["customer_id"])},
{"FeatureName": "event_time", "ValueAsString": event_time},
{"FeatureName": "revenue_30d", "ValueAsString": str(row["revenue_30d"] or 0)},
{"FeatureName": "order_count_30d", "ValueAsString": str(row["order_count_30d"] or 0)},
{"FeatureName": "avg_order_value_90d", "ValueAsString": str(row["avg_order_value_90d"] or 0)},
{"FeatureName": "days_since_last_order", "ValueAsString": str(row["days_since_last_order"] or 999)},
]
)
Point-in-Time Correct Training Datasets
One of the most subtle and consequential problems in ML data engineering is constructing training datasets without data leakage. If you are training a churn prediction model, the features used to predict whether a customer churned in March must be computed from data available before that customer churned β not from data that includes events that happened after the churn event.
SageMaker Feature Storeβs Offline Store enables point-in-time correct dataset construction via its time-travel query capability. Using Athena against the Offline Store:
-- Point-in-time correct feature retrieval
-- For each customer in the training label set,
-- get feature values as they were at the time of the label event
WITH label_events AS (
-- Training labels: customers who churned (or did not) in Q1 2024
SELECT customer_id, churn_date, churn_flag
FROM training_labels.customer_churn_q1_2024
),
feature_snapshots AS (
SELECT
f.customer_id,
f.revenue_30d,
f.order_count_30d,
f.avg_order_value_90d,
f.days_since_last_order,
f.write_time,
ROW_NUMBER() OVER (
PARTITION BY f.customer_id, l.churn_date
ORDER BY f.write_time DESC
) AS rn
FROM "customer-revenue-features" f
JOIN label_events l ON f.customer_id = l.customer_id
-- Only use feature values that existed BEFORE the label event
WHERE f.write_time < l.churn_date
AND f.is_deleted = FALSE
)
SELECT
l.customer_id,
l.churn_flag,
s.revenue_30d,
s.order_count_30d,
s.avg_order_value_90d,
s.days_since_last_order
FROM label_events l
JOIN feature_snapshots s ON l.customer_id = s.customer_id AND s.rn = 1;
This query is run against the Glue Data Catalog table backed by the Feature Storeβs Offline Store S3 data. The write_time column is automatically maintained by Feature Store and enables the temporal join. The resulting dataset is free of data leakage by construction.
Model Monitoring and Feature Drift Detection
MLOps does not end at model deployment. Production models degrade when the statistical distribution of incoming features changes (feature drift) or when the relationship between features and labels changes (concept drift). On AWS, SageMaker Model Monitor provides automated drift detection against baselines computed at training time.
Data engineers own the feature pipeline; ML engineers own the model. Model Monitor sits at the boundary: it monitors the features flowing into the inference endpoint and alerts when their distribution diverges from the training baseline. When an alert fires, the investigation starts with the feature pipeline β is the computation logic correct? Has an upstream data source changed? β before moving to the model itself.
This shared ownership model is more effective than treating model monitoring as purely an ML concern. Feature drift is almost always caused by upstream data changes that data engineers are better positioned to diagnose and fix. The DataOps Practices framework applied to feature pipelines β automated tests, freshness monitoring, CloudWatch alarms on feature ingestion volume β provides early warning before drift degrades model performance.
Redshift ML: Bringing ML to the Warehouse
For organisations whose data already lives in Amazon Redshift and whose ML use cases are tabular prediction problems, Amazon Redshift ML offers a compelling alternative to building a separate SageMaker training pipeline. Redshift ML uses SageMaker Autopilot under the hood, training a model directly on Redshift data and making it callable as a SQL function.
-- Train a churn prediction model directly in Redshift
CREATE MODEL customer_churn_model
FROM (
SELECT
revenue_30d, order_count_30d, avg_order_value_90d,
days_since_last_order, churn_flag
FROM analytics.customer_churn_training_set
)
TARGET churn_flag
FUNCTION predict_churn
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftMLRole'
AUTO ON
SETTINGS (S3_BUCKET 'ml-artifacts-prod', MAX_RUNTIME 7200);
-- Use the trained model in a SQL query
SELECT
customer_id,
predict_churn(
revenue_30d, order_count_30d, avg_order_value_90d, days_since_last_order
) AS churn_probability
FROM analytics.customer_features_current
WHERE churn_probability > 0.7
ORDER BY churn_probability DESC;
Redshift ML is the right choice when: your data is already in Redshift, your ML team is comfortable with SQL, and your use cases do not require custom model architectures. It dramatically lowers the barrier to ML for data teams that have not invested in a dedicated ML platform.
Organisational Alignment: Shared Ownership of the Feature Layer
The technical infrastructure is only part of the problem. The organisational alignment between data engineering and ML engineering is equally important. Effective teams establish:
Feature ownership: Each feature group has a named owner team responsible for the pipeline, quality, and documentation. ML teams are consumers of features, not owners of the pipelines that produce them.
Feature registration process: Before an ML team can use a new feature in a production model, the feature must be registered in a feature catalogue (SageMaker Feature Store provides this), have documented semantics and computation logic, and have a validated feature pipeline in production.
Change communication: When a feature pipeline changes β computation logic updated, upstream data source modified β the owning team notifies all ML models that consume the feature and coordinates retraining if the change affects model performance.
This shared ownership model is a direct application of Data Contracts to the ML domain: the feature pipeline is a data producer, the ML model is a data consumer, and the feature definition is the contract between them.
Conclusion
The intersection of MLOps and data engineering is where model reliability is actually determined. Training sophisticated models on unreliable features produces sophisticated but unreliable predictions. Investing in the feature pipeline β its correctness, its monitoring, its point-in-time integrity, and its shared ownership between data and ML teams β is the highest-leverage investment most ML teams can make for model quality.
On AWS, SageMaker Feature Store, Glue, Redshift ML, and Model Monitor provide the building blocks for this infrastructure. The engineering patterns β CI/CD for feature pipelines, automated quality gates, drift monitoring β are the same patterns that make data pipelines reliable applied to the ML domain.
If you are designing the data infrastructure layer for an ML platform on AWS or need help bridging your data engineering and ML engineering practices, contact the Infra IT Consulting team. We work with organisations in Canada, the UK, and Africa to build the data foundation that makes machine learning reliable in production.
Related posts
The Data Platform Maturity Model: Where Does Your Organisation Stand?
Read more Data Architecture & StrategyLambda vs. Kappa Architecture: Which Fits Your Streaming Use Case?
Read more Data Architecture & StrategyVector Databases on AWS: Enabling AI-Powered Search and RAG
Read moreBook a free 30-minute consultation to discuss your data engineering and analytics needs.
Talk to our team β