Chapter 14: Kubeflow Pipelines🔗
"Kubeflow is the ML toolkit for Kubernetes — it makes deploying ML workflows on K8s simple, portable, and scalable."
14.1 What is Kubeflow?🔗
Kubeflow is an open-source ML platform designed to run on Kubernetes. It provides tools for building, training, and deploying ML models in a portable, scalable manner.
Kubeflow Components🔗
┌──────────────────────────────────────────────────────────────┐
│ KUBEFLOW ECOSYSTEM │
│ │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ KF Pipelines │ │ Katib (HPO) │ │
│ │ (orchestration)│ │ (AutoML/Sweeps) │ │
│ └─────────────────┘ └──────────────────┘ │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ KF Notebooks │ │ KServe │ │
│ │ (JupyterHub) │ │ (model serving) │ │
│ └─────────────────┘ └──────────────────┘ │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ Training │ │ Model Registry │ │
│ │ Operator │ │ │ │
│ └─────────────────┘ └──────────────────┘ │
└──────────────────────────────────────────────────────────────┘
14.2 Kubeflow Pipelines (KFP) Concepts🔗
| Concept | Description |
|---|---|
| Component | A self-contained, containerized step in a pipeline |
| Pipeline | A graph of components with defined inputs/outputs |
| Run | A single execution of a pipeline |
| Experiment | A logical group of runs for comparison |
| Artifact | Outputs produced by components (datasets, models) |
| Metadata | Auto-logged info about runs, inputs, outputs |
14.3 Building a KFP Pipeline🔗
# pip install kfp
import kfp
from kfp import dsl
from kfp.dsl import component, pipeline, Dataset, Model, Output, Input, Metrics
# ── Define Components ──────────────────────────────────────────────
@component(
base_image="python:3.10-slim",
packages_to_install=["pandas==2.0.0", "scikit-learn==1.3.0"],
)
def preprocess_data(
raw_data_path: str,
output_dataset: Output[Dataset],
):
"""Preprocess raw data and output cleaned dataset."""
import pandas as pd
df = pd.read_csv(raw_data_path)
df = df.dropna()
df = df.drop_duplicates()
df.to_csv(output_dataset.path, index=False)
print(f"Preprocessed {len(df)} rows → {output_dataset.path}")
@component(
base_image="python:3.10-slim",
packages_to_install=["pandas==2.0.0", "scikit-learn==1.3.0", "joblib"],
)
def train_model(
dataset: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
n_estimators: int = 200,
learning_rate: float = 0.05,
):
"""Train GBM classifier and log metrics."""
import pandas as pd
import joblib
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
df = pd.read_csv(dataset.path)
X = df.drop("churned", axis=1)
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
clf = GradientBoostingClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate
)
clf.fit(X_train, y_train)
# Save model
joblib.dump(clf, model.path)
# Log metrics
y_pred = clf.predict(X_test)
metrics.log_metric("accuracy", accuracy_score(y_test, y_pred))
metrics.log_metric("f1_score", f1_score(y_test, y_pred))
@component(
base_image="python:3.10-slim",
packages_to_install=["joblib", "google-cloud-aiplatform"],
)
def evaluate_and_deploy(
model: Input[Model],
metrics: Input[Metrics],
accuracy_threshold: float = 0.85,
project: str = "my-project",
region: str = "us-central1",
):
"""Evaluate model and deploy if above threshold."""
import json
# Read logged metrics
with open(metrics.path) as f:
m = json.load(f)
accuracy = m.get("accuracy", 0)
print(f"Model accuracy: {accuracy}")
if accuracy < accuracy_threshold:
raise ValueError(f"Accuracy {accuracy:.3f} below threshold {accuracy_threshold}")
# Deploy to Vertex AI Endpoint
from google.cloud import aiplatform
aiplatform.init(project=project, location=region)
endpoint = aiplatform.Endpoint.create(display_name="churn-endpoint")
model_resource = aiplatform.Model.upload(
display_name="churn-model",
artifact_uri=model.uri,
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",
)
model_resource.deploy(endpoint=endpoint, machine_type="n1-standard-2")
print(f"✅ Model deployed to endpoint: {endpoint.name}")
# ── Define Pipeline ────────────────────────────────────────────────
@pipeline(
name="churn-prediction-pipeline",
description="End-to-end churn model training and deployment",
)
def churn_pipeline(
raw_data_path: str = "gs://my-bucket/data/raw.csv",
n_estimators: int = 200,
learning_rate: float = 0.05,
accuracy_threshold: float = 0.85,
):
# Step 1: Preprocess
preprocess_task = preprocess_data(raw_data_path=raw_data_path)
# Step 2: Train (depends on preprocess)
train_task = train_model(
dataset=preprocess_task.outputs["output_dataset"],
n_estimators=n_estimators,
learning_rate=learning_rate,
)
# Step 3: Evaluate and deploy (depends on train)
evaluate_and_deploy(
model=train_task.outputs["model"],
metrics=train_task.outputs["metrics"],
accuracy_threshold=accuracy_threshold,
)
# ── Compile and Submit ─────────────────────────────────────────────
# Compile to YAML
kfp.compiler.Compiler().compile(
pipeline_func=churn_pipeline,
package_path="churn_pipeline.yaml"
)
# Submit to Kubeflow (local) or Vertex AI Pipelines (GCP)
client = kfp.Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_func(
churn_pipeline,
arguments={
"raw_data_path": "gs://my-bucket/data/raw.csv",
"n_estimators": 300,
"learning_rate": 0.03,
},
experiment_name="churn-experiments",
)
14.4 Running on Vertex AI Pipelines🔗
The same KFP pipeline runs on Vertex AI Pipelines with minimal change:
from google.cloud import aiplatform
aiplatform.init(project="my-project", location="us-central1")
# Submit to Vertex AI Pipelines (uses the compiled YAML)
job = aiplatform.PipelineJob(
display_name="churn-pipeline-run",
template_path="churn_pipeline.yaml",
pipeline_root="gs://my-bucket/pipeline-root/",
parameter_values={
"raw_data_path": "gs://my-bucket/data/churn_v3.csv",
"n_estimators": 300,
"learning_rate": 0.05,
"accuracy_threshold": 0.85,
},
)
job.run(sync=True)
14.5 Katib — Automated HPO on Kubernetes🔗
Katib is Kubeflow's component for hyperparameter optimization.
# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: churn-hpo
namespace: kubeflow
spec:
objective:
type: maximize
goal: 0.90
objectiveMetricName: accuracy
algorithm:
algorithmName: bayesianoptimization
maxTrialCount: 30
parallelTrialCount: 5
maxFailedTrialCount: 3
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.001"
max: "0.3"
- name: n_estimators
parameterType: int
feasibleSpace:
min: "50"
max: "500"
- name: max_depth
parameterType: int
feasibleSpace:
min: "3"
max: "10"
trialTemplate:
primaryContainerName: training-container
trialParameters:
- name: learningRate
description: Learning rate
reference: learning_rate
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: gcr.io/my-project/churn-trainer:latest
command:
- python3
- train.py
- "--learning_rate=${trialParameters.learningRate}"
14.6 KServe — Model Serving on Kubernetes🔗
KServe (formerly KFServing) is a Kubernetes-native model serving solution.
# kserve-inferenceservice.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: churn-model
namespace: production
spec:
predictor:
sklearn:
storageUri: "gs://my-bucket/models/churn-model-v2/"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
minReplicas: 2
maxReplicas: 10
# Canary — send 20% traffic to new model
canaryTrafficPercent: 20
canary:
predictor:
sklearn:
storageUri: "gs://my-bucket/models/churn-model-v3/"
# Make a prediction
SERVICE_HOSTNAME=$(kubectl get inferenceservice churn-model -n production -o jsonpath='{.status.url}')
curl -X POST $SERVICE_HOSTNAME/v1/models/churn-model:predict \
-H "Content-Type: application/json" \
-d '{"instances": [[35, 65000, 12, 1]]}'