Chapter 16: Prefect & ZenML🔗
Prefect🔗
Prefect is a modern Python-native workflow orchestration tool with excellent developer experience.
# pip install prefect
from prefect import flow, task
from prefect.deployments import Deployment
@task(retries=3, retry_delay_seconds=60)
def ingest_data(source: str) -> str:
import pandas as pd
df = pd.read_csv(source)
df.to_csv("/tmp/raw.csv", index=False)
return "/tmp/raw.csv"
@task
def train_model(data_path: str) -> float:
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score
df = pd.read_csv(data_path)
X, y = df.drop("label", axis=1), df["label"]
model = GradientBoostingClassifier()
model.fit(X, y)
return accuracy_score(y, model.predict(X))
@flow(name="ML Training Pipeline", log_prints=True)
def ml_pipeline(source: str = "gs://bucket/data.csv"):
data_path = ingest_data(source)
accuracy = train_model(data_path)
print(f"Accuracy: {accuracy:.4f}")
# Run locally
ml_pipeline()
# Deploy to Prefect Cloud / server
deployment = Deployment.build_from_flow(
flow=ml_pipeline,
name="weekly-training",
schedule={"cron": "0 2 * * 1"},
)
deployment.apply()
ZenML🔗
ZenML is an MLOps framework that abstracts the stack — you define pipelines once and run them on any backend (local, GCP, AWS, Kubeflow).
# pip install zenml
from zenml import pipeline, step
from zenml.integrations.mlflow.flavors import mlflow_experiment_tracker_settings
@step
def ingest() -> pd.DataFrame:
return pd.read_csv("data/train.csv")
@step
def train(df: pd.DataFrame) -> ClassifierMixin:
model = GradientBoostingClassifier()
model.fit(df.drop("label", axis=1), df["label"])
return model
@pipeline
def training_pipeline():
df = ingest()
model = train(df)
# Run on any ZenML stack (local, GKE, Vertex AI, etc.)
training_pipeline()
ZenML Stacks connect your pipeline to infrastructure:
zenml stack register gcp-stack \
-a gcs-artifact-store \
-o vertex-orchestrator \
-e vertex-experiment-tracker \
--set
Next → Chapter 17: Docker