06 Data Quality Validation

Chapter 06: Data Quality & Validation🔗

"Garbage in, garbage out. MLOps makes data quality a first-class concern, not an afterthought."


6.1 Why Data Quality Matters🔗

Data quality issues are the #1 cause of model failures in production. ML models are only as good as their training data.

Cost of Data Quality Issues:
  - Poor data quality costs orgs $12.9M/year on average
  - Predictive system downtime: ~$125,000/hour
  - AI incidents increased 56% in 2024 — many data-related

Common Data Issues:
  ❌ Missing values (nulls, NaN)
  ❌ Duplicate records
  ❌ Schema changes (column renamed/removed upstream)
  ❌ Distribution shifts (new values in categorical columns)
  ❌ Data type mismatches (int stored as string)
  ❌ Out-of-range values (age = -5, temperature = 9999)
  ❌ Label leakage (future data in training)
  ❌ Class imbalance

6.2 Data Validation with Great Expectations🔗

Great Expectations (GE) is the most popular open-source data validation framework. You define "expectations" about your data, and GE validates every dataset against them.

Key Concepts🔗

Expectation Suite:  A collection of rules for your data
Checkpoint:         Runs the suite against a new batch of data
Data Docs:          Auto-generated HTML reports showing pass/fail

Setup and Usage🔗

# Install
# pip install great-expectations

import great_expectations as gx

# Initialize context
context = gx.get_context()

# Connect to your data
datasource = context.sources.add_pandas_filesystem(
    name="churn_data",
    base_directory="data/raw/"
)
asset = datasource.add_csv_asset(name="churn", batching_regex=r".*\.csv")

# Create expectation suite
suite = context.add_expectation_suite("churn_expectations")

validator = context.get_validator(
    batch_request=asset.build_batch_request(),
    expectation_suite_name="churn_expectations"
)

# ── Define Expectations ──────────────────────────────────────────

# Schema expectations
validator.expect_table_columns_to_match_ordered_list([
    "customer_id", "age", "income", "tenure", "plan", "churned"
])
validator.expect_column_to_exist("customer_id")

# Null checks
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("churned")

# Type checks
validator.expect_column_values_to_be_of_type("age", "int64")
validator.expect_column_values_to_be_of_type("income", "float64")

# Range checks
validator.expect_column_values_to_be_between("age", min_value=18, max_value=120)
validator.expect_column_values_to_be_between("income", min_value=0)

# Categorical checks
validator.expect_column_values_to_be_in_set("plan", ["basic", "standard", "premium"])
validator.expect_column_values_to_be_in_set("churned", [0, 1])

# Uniqueness
validator.expect_column_values_to_be_unique("customer_id")

# Distribution checks
validator.expect_column_mean_to_be_between("age", min_value=25, max_value=55)
validator.expect_column_proportion_of_unique_values_to_be_between(
    "plan", min_value=0.001, max_value=0.5
)

# Save the suite
validator.save_expectation_suite()

# ── Run Checkpoint ───────────────────────────────────────────────
checkpoint = context.add_or_update_checkpoint(
    name="churn_checkpoint",
    validations=[{"batch_request": asset.build_batch_request(),
                  "expectation_suite_name": "churn_expectations"}]
)

result = checkpoint.run()

if not result.success:
    print("❌ DATA VALIDATION FAILED!")
    # Print failures
    for vr in result.list_validation_results():
        for er in vr.results:
            if not er.success:
                print(f"  FAILED: {er.expectation_config.expectation_type} "
                      f"on column '{er.expectation_config.kwargs.get('column')}'")
    raise ValueError("Data quality gate failed — aborting pipeline")
else:
    print("✅ All data quality checks passed!")

6.3 TFX Data Validation (TFDV)🔗

TFX Data Validation (TFDV) is Google's library for data validation, tightly integrated with TFX and Vertex AI Pipelines.

import tensorflow_data_validation as tfdv
import pandas as pd

# Compute statistics from training data
train_stats = tfdv.generate_statistics_from_csv("data/train.csv")

# Infer schema from training data
schema = tfdv.infer_schema(statistics=train_stats)

# Display schema (run in notebook)
tfdv.display_schema(schema)

# Validate new (serving) data against training schema
serving_stats = tfdv.generate_statistics_from_csv("data/serving.csv")
anomalies = tfdv.validate_statistics(
    statistics=serving_stats,
    schema=schema
)

# Display anomalies
tfdv.display_anomalies(anomalies)

# Example anomalies detected:
# ┌─────────────────┬──────────────────────────────────────────────────┐
# │ Feature name    │ Anomaly short description                        │
# ├─────────────────┼──────────────────────────────────────────────────┤
# │ 'income'        │ Column dropped                                   │
# │ 'plan'          │ Unexpected string values: ['enterprise']         │
# │ 'age'           │ Out-of-range values: -5                          │
# └─────────────────┴──────────────────────────────────────────────────┘

# Save schema to file for use in pipeline
tfdv.write_schema_text(schema, "schema/schema.pbtxt")

6.4 Pandera — Lightweight DataFrame Validation🔗

Pandera is a lighter-weight option for validating pandas DataFrames with a Pythonic API.

import pandera as pa
from pandera import Column, DataFrameSchema, Check

# Define schema
schema = DataFrameSchema({
    "customer_id": Column(str, Check.str_matches(r'^CUST-\d{6}$')),
    "age": Column(int, [
        Check.greater_than_or_equal_to(18),
        Check.less_than_or_equal_to(120)
    ]),
    "income": Column(float, Check.greater_than_or_equal_to(0)),
    "plan": Column(str, Check.isin(["basic", "standard", "premium"])),
    "churned": Column(int, Check.isin([0, 1])),
})

# Validate dataframe
try:
    validated_df = schema.validate(df)
    print("✅ Data validation passed")
except pa.errors.SchemaError as e:
    print(f"❌ Validation failed: {e}")
    raise

6.5 Data Quality in CI/CD Pipeline🔗

CI/CD Data Quality Gate:

  New Data Arrives (GCS)
        │
        ▼
  ┌─────────────────────────────────────────┐
  │     DATA VALIDATION STAGE               │
  │                                         │
  │  1. Schema check (columns match?)       │
  │  2. Null rate check (< 5% per column?)  │
  │  3. Range check (values in bounds?)     │
  │  4. Distribution check (not drifted?)   │
  │  5. Class balance check                 │
  └────────────────┬────────────────────────┘
                   │
        ┌──────────┴──────────┐
        ▼                     ▼
   All Pass ✅           Any Fail ❌
        │                     │
   Proceed to               Alert team
   Training                 Log failure
                            Stop pipeline
# In Jenkinsfile or GitHub Actions:
- name: Validate Data
  run: |
    python src/validate_data.py
    # Script exits with code 1 if validation fails
    # CI/CD treats non-zero exit = failure → pipeline stops

6.6 Data Quality Metrics to Track🔗

Metric Formula Alert When
Null rate null_count / total_rows > 5% per column
Duplicate rate duplicate_count / total_rows > 1%
Schema match Columns match expected schema Any mismatch
Value range Min/Max within bounds Out of range
Distribution shift KS test, PSI score PSI > 0.2
Class imbalance minority_class / total < 1%
Freshness Age of data > 24 hours old

6.7 Tools Summary🔗

Tool Best For Integration
Great Expectations Complex suites, reporting Python, Airflow, Spark
TFDV TFX/Vertex AI pipelines TFX, Kubeflow
Pandera Quick DataFrame checks pytest, Python
Deequ (AWS) Spark/large scale AWS, PySpark
Soda Business-friendly Airflow, dbt
Whylogs Profiling + monitoring Any Python

Next → Chapter 07: MLflow