14 Kubeflow Pipelines

Chapter 14: Kubeflow Pipelines🔗

"Kubeflow is the ML toolkit for Kubernetes — it makes deploying ML workflows on K8s simple, portable, and scalable."


14.1 What is Kubeflow?🔗

Kubeflow is an open-source ML platform designed to run on Kubernetes. It provides tools for building, training, and deploying ML models in a portable, scalable manner.

Kubeflow Components🔗

┌──────────────────────────────────────────────────────────────┐
│                    KUBEFLOW ECOSYSTEM                        │
│                                                              │
│  ┌─────────────────┐  ┌──────────────────┐                  │
│  │  KF Pipelines   │  │  Katib (HPO)     │                  │
│  │  (orchestration)│  │  (AutoML/Sweeps) │                  │
│  └─────────────────┘  └──────────────────┘                  │
│  ┌─────────────────┐  ┌──────────────────┐                  │
│  │  KF Notebooks   │  │  KServe          │                  │
│  │  (JupyterHub)   │  │  (model serving) │                  │
│  └─────────────────┘  └──────────────────┘                  │
│  ┌─────────────────┐  ┌──────────────────┐                  │
│  │  Training       │  │  Model Registry  │                  │
│  │  Operator       │  │                  │                  │
│  └─────────────────┘  └──────────────────┘                  │
└──────────────────────────────────────────────────────────────┘

14.2 Kubeflow Pipelines (KFP) Concepts🔗

Concept Description
Component A self-contained, containerized step in a pipeline
Pipeline A graph of components with defined inputs/outputs
Run A single execution of a pipeline
Experiment A logical group of runs for comparison
Artifact Outputs produced by components (datasets, models)
Metadata Auto-logged info about runs, inputs, outputs

14.3 Building a KFP Pipeline🔗

# pip install kfp

import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Dataset, Model, Output, Input, Metrics

# ── Define Components ──────────────────────────────────────────────

@component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas==2.0.0", "scikit-learn==1.3.0"],
)
def preprocess_data(
    raw_data_path: str,
    output_dataset: Output[Dataset],
):
    """Preprocess raw data and output cleaned dataset."""
    import pandas as pd

    df = pd.read_csv(raw_data_path)
    df = df.dropna()
    df = df.drop_duplicates()

    df.to_csv(output_dataset.path, index=False)
    print(f"Preprocessed {len(df)} rows → {output_dataset.path}")


@component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas==2.0.0", "scikit-learn==1.3.0", "joblib"],
)
def train_model(
    dataset: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
    n_estimators: int = 200,
    learning_rate: float = 0.05,
):
    """Train GBM classifier and log metrics."""
    import pandas as pd
    import joblib
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, f1_score

    df = pd.read_csv(dataset.path)
    X = df.drop("churned", axis=1)
    y = df["churned"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    clf = GradientBoostingClassifier(
        n_estimators=n_estimators,
        learning_rate=learning_rate
    )
    clf.fit(X_train, y_train)

    # Save model
    joblib.dump(clf, model.path)

    # Log metrics
    y_pred = clf.predict(X_test)
    metrics.log_metric("accuracy", accuracy_score(y_test, y_pred))
    metrics.log_metric("f1_score", f1_score(y_test, y_pred))


@component(
    base_image="python:3.10-slim",
    packages_to_install=["joblib", "google-cloud-aiplatform"],
)
def evaluate_and_deploy(
    model: Input[Model],
    metrics: Input[Metrics],
    accuracy_threshold: float = 0.85,
    project: str = "my-project",
    region: str = "us-central1",
):
    """Evaluate model and deploy if above threshold."""
    import json

    # Read logged metrics
    with open(metrics.path) as f:
        m = json.load(f)

    accuracy = m.get("accuracy", 0)
    print(f"Model accuracy: {accuracy}")

    if accuracy < accuracy_threshold:
        raise ValueError(f"Accuracy {accuracy:.3f} below threshold {accuracy_threshold}")

    # Deploy to Vertex AI Endpoint
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    endpoint = aiplatform.Endpoint.create(display_name="churn-endpoint")
    model_resource = aiplatform.Model.upload(
        display_name="churn-model",
        artifact_uri=model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
    )
    model_resource.deploy(endpoint=endpoint, machine_type="n1-standard-2")
    print(f"✅ Model deployed to endpoint: {endpoint.name}")


# ── Define Pipeline ────────────────────────────────────────────────

@pipeline(
    name="churn-prediction-pipeline",
    description="End-to-end churn model training and deployment",
)
def churn_pipeline(
    raw_data_path: str = "gs://my-bucket/data/raw.csv",
    n_estimators: int = 200,
    learning_rate: float = 0.05,
    accuracy_threshold: float = 0.85,
):
    # Step 1: Preprocess
    preprocess_task = preprocess_data(raw_data_path=raw_data_path)

    # Step 2: Train (depends on preprocess)
    train_task = train_model(
        dataset=preprocess_task.outputs["output_dataset"],
        n_estimators=n_estimators,
        learning_rate=learning_rate,
    )

    # Step 3: Evaluate and deploy (depends on train)
    evaluate_and_deploy(
        model=train_task.outputs["model"],
        metrics=train_task.outputs["metrics"],
        accuracy_threshold=accuracy_threshold,
    )


# ── Compile and Submit ─────────────────────────────────────────────

# Compile to YAML
kfp.compiler.Compiler().compile(
    pipeline_func=churn_pipeline,
    package_path="churn_pipeline.yaml"
)

# Submit to Kubeflow (local) or Vertex AI Pipelines (GCP)
client = kfp.Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_func(
    churn_pipeline,
    arguments={
        "raw_data_path": "gs://my-bucket/data/raw.csv",
        "n_estimators": 300,
        "learning_rate": 0.03,
    },
    experiment_name="churn-experiments",
)

14.4 Running on Vertex AI Pipelines🔗

The same KFP pipeline runs on Vertex AI Pipelines with minimal change:

from google.cloud import aiplatform

aiplatform.init(project="my-project", location="us-central1")

# Submit to Vertex AI Pipelines (uses the compiled YAML)
job = aiplatform.PipelineJob(
    display_name="churn-pipeline-run",
    template_path="churn_pipeline.yaml",
    pipeline_root="gs://my-bucket/pipeline-root/",
    parameter_values={
        "raw_data_path": "gs://my-bucket/data/churn_v3.csv",
        "n_estimators": 300,
        "learning_rate": 0.05,
        "accuracy_threshold": 0.85,
    },
)
job.run(sync=True)

14.5 Katib — Automated HPO on Kubernetes🔗

Katib is Kubeflow's component for hyperparameter optimization.

# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: churn-hpo
  namespace: kubeflow
spec:
  objective:
    type: maximize
    goal: 0.90
    objectiveMetricName: accuracy
  algorithm:
    algorithmName: bayesianoptimization
  maxTrialCount: 30
  parallelTrialCount: 5
  maxFailedTrialCount: 3

  parameters:
    - name: learning_rate
      parameterType: double
      feasibleSpace:
        min: "0.001"
        max: "0.3"
    - name: n_estimators
      parameterType: int
      feasibleSpace:
        min: "50"
        max: "500"
    - name: max_depth
      parameterType: int
      feasibleSpace:
        min: "3"
        max: "10"

  trialTemplate:
    primaryContainerName: training-container
    trialParameters:
      - name: learningRate
        description: Learning rate
        reference: learning_rate
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
              - name: training-container
                image: gcr.io/my-project/churn-trainer:latest
                command:
                  - python3
                  - train.py
                  - "--learning_rate=${trialParameters.learningRate}"

14.6 KServe — Model Serving on Kubernetes🔗

KServe (formerly KFServing) is a Kubernetes-native model serving solution.

# kserve-inferenceservice.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: churn-model
  namespace: production
spec:
  predictor:
    sklearn:
      storageUri: "gs://my-bucket/models/churn-model-v2/"
      resources:
        requests:
          cpu: "500m"
          memory: "512Mi"
        limits:
          cpu: "1"
          memory: "1Gi"
    minReplicas: 2
    maxReplicas: 10

  # Canary — send 20% traffic to new model
  canaryTrafficPercent: 20
  canary:
    predictor:
      sklearn:
        storageUri: "gs://my-bucket/models/churn-model-v3/"
# Make a prediction
SERVICE_HOSTNAME=$(kubectl get inferenceservice churn-model -n production -o jsonpath='{.status.url}')
curl -X POST $SERVICE_HOSTNAME/v1/models/churn-model:predict \
  -H "Content-Type: application/json" \
  -d '{"instances": [[35, 65000, 12, 1]]}'

Next → Chapter 15: Vertex AI Pipelines