Chapter 06: Data Quality & Validation🔗
"Garbage in, garbage out. MLOps makes data quality a first-class concern, not an afterthought."
6.1 Why Data Quality Matters🔗
Data quality issues are the #1 cause of model failures in production. ML models are only as good as their training data.
Cost of Data Quality Issues:
- Poor data quality costs orgs $12.9M/year on average
- Predictive system downtime: ~$125,000/hour
- AI incidents increased 56% in 2024 — many data-related
Common Data Issues:
❌ Missing values (nulls, NaN)
❌ Duplicate records
❌ Schema changes (column renamed/removed upstream)
❌ Distribution shifts (new values in categorical columns)
❌ Data type mismatches (int stored as string)
❌ Out-of-range values (age = -5, temperature = 9999)
❌ Label leakage (future data in training)
❌ Class imbalance
6.2 Data Validation with Great Expectations🔗
Great Expectations (GE) is the most popular open-source data validation framework. You define "expectations" about your data, and GE validates every dataset against them.
Key Concepts🔗
Expectation Suite: A collection of rules for your data
Checkpoint: Runs the suite against a new batch of data
Data Docs: Auto-generated HTML reports showing pass/fail
Setup and Usage🔗
# Install
# pip install great-expectations
import great_expectations as gx
# Initialize context
context = gx.get_context()
# Connect to your data
datasource = context.sources.add_pandas_filesystem(
name="churn_data",
base_directory="data/raw/"
)
asset = datasource.add_csv_asset(name="churn", batching_regex=r".*\.csv")
# Create expectation suite
suite = context.add_expectation_suite("churn_expectations")
validator = context.get_validator(
batch_request=asset.build_batch_request(),
expectation_suite_name="churn_expectations"
)
# ── Define Expectations ──────────────────────────────────────────
# Schema expectations
validator.expect_table_columns_to_match_ordered_list([
"customer_id", "age", "income", "tenure", "plan", "churned"
])
validator.expect_column_to_exist("customer_id")
# Null checks
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("churned")
# Type checks
validator.expect_column_values_to_be_of_type("age", "int64")
validator.expect_column_values_to_be_of_type("income", "float64")
# Range checks
validator.expect_column_values_to_be_between("age", min_value=18, max_value=120)
validator.expect_column_values_to_be_between("income", min_value=0)
# Categorical checks
validator.expect_column_values_to_be_in_set("plan", ["basic", "standard", "premium"])
validator.expect_column_values_to_be_in_set("churned", [0, 1])
# Uniqueness
validator.expect_column_values_to_be_unique("customer_id")
# Distribution checks
validator.expect_column_mean_to_be_between("age", min_value=25, max_value=55)
validator.expect_column_proportion_of_unique_values_to_be_between(
"plan", min_value=0.001, max_value=0.5
)
# Save the suite
validator.save_expectation_suite()
# ── Run Checkpoint ───────────────────────────────────────────────
checkpoint = context.add_or_update_checkpoint(
name="churn_checkpoint",
validations=[{"batch_request": asset.build_batch_request(),
"expectation_suite_name": "churn_expectations"}]
)
result = checkpoint.run()
if not result.success:
print("❌ DATA VALIDATION FAILED!")
# Print failures
for vr in result.list_validation_results():
for er in vr.results:
if not er.success:
print(f" FAILED: {er.expectation_config.expectation_type} "
f"on column '{er.expectation_config.kwargs.get('column')}'")
raise ValueError("Data quality gate failed — aborting pipeline")
else:
print("✅ All data quality checks passed!")
6.3 TFX Data Validation (TFDV)🔗
TFX Data Validation (TFDV) is Google's library for data validation, tightly integrated with TFX and Vertex AI Pipelines.
import tensorflow_data_validation as tfdv
import pandas as pd
# Compute statistics from training data
train_stats = tfdv.generate_statistics_from_csv("data/train.csv")
# Infer schema from training data
schema = tfdv.infer_schema(statistics=train_stats)
# Display schema (run in notebook)
tfdv.display_schema(schema)
# Validate new (serving) data against training schema
serving_stats = tfdv.generate_statistics_from_csv("data/serving.csv")
anomalies = tfdv.validate_statistics(
statistics=serving_stats,
schema=schema
)
# Display anomalies
tfdv.display_anomalies(anomalies)
# Example anomalies detected:
# ┌─────────────────┬──────────────────────────────────────────────────┐
# │ Feature name │ Anomaly short description │
# ├─────────────────┼──────────────────────────────────────────────────┤
# │ 'income' │ Column dropped │
# │ 'plan' │ Unexpected string values: ['enterprise'] │
# │ 'age' │ Out-of-range values: -5 │
# └─────────────────┴──────────────────────────────────────────────────┘
# Save schema to file for use in pipeline
tfdv.write_schema_text(schema, "schema/schema.pbtxt")
6.4 Pandera — Lightweight DataFrame Validation🔗
Pandera is a lighter-weight option for validating pandas DataFrames with a Pythonic API.
import pandera as pa
from pandera import Column, DataFrameSchema, Check
# Define schema
schema = DataFrameSchema({
"customer_id": Column(str, Check.str_matches(r'^CUST-\d{6}$')),
"age": Column(int, [
Check.greater_than_or_equal_to(18),
Check.less_than_or_equal_to(120)
]),
"income": Column(float, Check.greater_than_or_equal_to(0)),
"plan": Column(str, Check.isin(["basic", "standard", "premium"])),
"churned": Column(int, Check.isin([0, 1])),
})
# Validate dataframe
try:
validated_df = schema.validate(df)
print("✅ Data validation passed")
except pa.errors.SchemaError as e:
print(f"❌ Validation failed: {e}")
raise
6.5 Data Quality in CI/CD Pipeline🔗
CI/CD Data Quality Gate:
New Data Arrives (GCS)
│
▼
┌─────────────────────────────────────────┐
│ DATA VALIDATION STAGE │
│ │
│ 1. Schema check (columns match?) │
│ 2. Null rate check (< 5% per column?) │
│ 3. Range check (values in bounds?) │
│ 4. Distribution check (not drifted?) │
│ 5. Class balance check │
└────────────────┬────────────────────────┘
│
┌──────────┴──────────┐
▼ ▼
All Pass ✅ Any Fail ❌
│ │
Proceed to Alert team
Training Log failure
Stop pipeline
# In Jenkinsfile or GitHub Actions:
- name: Validate Data
run: |
python src/validate_data.py
# Script exits with code 1 if validation fails
# CI/CD treats non-zero exit = failure → pipeline stops
6.6 Data Quality Metrics to Track🔗
| Metric | Formula | Alert When |
|---|---|---|
| Null rate | null_count / total_rows |
> 5% per column |
| Duplicate rate | duplicate_count / total_rows |
> 1% |
| Schema match | Columns match expected schema | Any mismatch |
| Value range | Min/Max within bounds | Out of range |
| Distribution shift | KS test, PSI score | PSI > 0.2 |
| Class imbalance | minority_class / total |
< 1% |
| Freshness | Age of data | > 24 hours old |
6.7 Tools Summary🔗
| Tool | Best For | Integration |
|---|---|---|
| Great Expectations | Complex suites, reporting | Python, Airflow, Spark |
| TFDV | TFX/Vertex AI pipelines | TFX, Kubeflow |
| Pandera | Quick DataFrame checks | pytest, Python |
| Deequ (AWS) | Spark/large scale | AWS, PySpark |
| Soda | Business-friendly | Airflow, dbt |
| Whylogs | Profiling + monitoring | Any Python |
Next → Chapter 07: MLflow