Lab 06: MLflow — Experiment Tracking & Model Registry

Objective

Build a production ML experiment tracking system: log parameters, metrics, and artifacts; version models in a registry; compare experiments; promote models through staging → production; and implement model lineage tracking — using MLflow patterns.

Time: 50 minutes | Level: Advanced | Docker Image: zchencow/innozverse-ai:latest


Background

Without MLflow:                    With MLflow:
  "Which params gave 0.97 AUC?"      mlflow.search_runs(filter="metrics.auc > 0.97")
  "What data was this trained on?"   mlflow.log_param("dataset_version", "v2.3")
  "Is prod model still v1.2?"        mlflow.MlflowClient().get_model_version(...)
  "Why did accuracy drop?"           Compare run 847 vs run 901 in UI

MLflow solves the reproducibility crisis: most ML teams can't reproduce their own best results 3 months later.


Step 1: MLflow Tracking — Log Experiments

docker run -it --rm zchencow/innozverse-ai:latest bash
import numpy as np, json, hashlib, time
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import warnings; warnings.filterwarnings('ignore')

np.random.seed(42)

# Dataset
X, y = make_classification(n_samples=5000, n_features=20, n_informative=12,
                             weights=[0.94, 0.06], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_tr); X_te_s = scaler.transform(X_te)

class MLflowTracker:
    """
    MLflow-compatible experiment tracker.
    Real MLflow: import mlflow; mlflow.start_run()
    """
    def __init__(self, experiment_name: str, tracking_uri: str = "sqlite:///mlruns.db"):
        self.experiment_name = experiment_name
        self.tracking_uri    = tracking_uri
        self.runs            = {}
        self.active_run      = None

    def start_run(self, run_name: str = None, tags: dict = None) -> str:
        run_id = hashlib.md5(f"{run_name}{time.time()}".encode()).hexdigest()[:8]
        self.active_run = {
            'run_id':    run_id,
            'run_name':  run_name or f"run_{run_id}",
            'experiment':self.experiment_name,
            'status':    'RUNNING',
            'start_time':time.time(),
            'params':    {},
            'metrics':   {},
            'artifacts': [],
            'tags':      tags or {},
        }
        self.runs[run_id] = self.active_run
        return run_id

    def log_param(self, key: str, value):
        self.active_run['params'][key] = value

    def log_params(self, params: dict):
        self.active_run['params'].update(params)

    def log_metric(self, key: str, value: float, step: int = None):
        if key not in self.active_run['metrics']:
            self.active_run['metrics'][key] = []
        self.active_run['metrics'][key].append({'value': value, 'step': step})

    def log_artifact(self, name: str, content: str):
        self.active_run['artifacts'].append({'name': name, 'size': len(content)})

    def end_run(self, status: str = "FINISHED"):
        self.active_run['status']   = status
        self.active_run['end_time'] = time.time()
        self.active_run['duration_s'] = round(
            self.active_run['end_time'] - self.active_run['start_time'], 2)

    def get_best_run(self, metric: str, mode: str = 'max') -> dict:
        finished = {rid: r for rid, r in self.runs.items() if r['status'] == 'FINISHED'}
        def last_metric(run): 
            vals = run['metrics'].get(metric, [])
            return vals[-1]['value'] if vals else float('-inf')
        if mode == 'max':
            return max(finished.values(), key=last_metric)
        return min(finished.values(), key=last_metric)

    def compare_runs(self, metric: str) -> list:
        rows = []
        for rid, run in self.runs.items():
            vals = run['metrics'].get(metric, [])
            last = vals[-1]['value'] if vals else None
            rows.append({'run_id': rid[:6], 'name': run['run_name'], metric: last,
                          'params': run['params']})
        return sorted(rows, key=lambda x: x[metric] or 0, reverse=True)

tracker = MLflowTracker(experiment_name="malware_classifier_v2")

# Run multiple experiments
experiments = [
    ("LogisticRegression",   LogisticRegression(C=1.0, max_iter=1000),
     {"model_type": "logistic_regression", "C": 1.0, "max_iter": 1000}),
    ("RandomForest_100",     RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42),
     {"model_type": "random_forest", "n_estimators": 100, "max_depth": 10}),
    ("RandomForest_200",     RandomForestClassifier(n_estimators=200, max_depth=15, random_state=42),
     {"model_type": "random_forest", "n_estimators": 200, "max_depth": 15}),
    ("GradientBoosting",     GradientBoostingClassifier(n_estimators=200, max_depth=4,
                                                          learning_rate=0.05, random_state=42),
     {"model_type": "gradient_boosting", "n_estimators": 200, "lr": 0.05, "max_depth": 4}),
]

print("Running experiments...\n")
for name, clf, params in experiments:
    run_id = tracker.start_run(run_name=name, tags={"dataset": "malware_pe_v3", "env": "dev"})
    tracker.log_params(params)
    tracker.log_params({"dataset_version": "v3.1", "scaler": "StandardScaler",
                          "train_size": len(X_tr), "test_size": len(X_te)})
    # Train
    t0 = time.time()
    clf.fit(X_tr_s, y_tr)
    train_time = time.time() - t0
    # Metrics
    prob = clf.predict_proba(X_te_s)[:, 1]
    pred = (prob >= 0.5).astype(int)
    auc = roc_auc_score(y_te, prob)
    f1  = f1_score(y_te, pred)
    pre = precision_score(y_te, pred)
    rec = recall_score(y_te, pred)
    # Log metrics per epoch (simulate training curves)
    for cv_fold in range(5):
        cv_auc = float(cross_val_score(clf, X_tr_s, y_tr, cv=5, scoring='roc_auc')[cv_fold])
        tracker.log_metric("cv_auc", cv_auc, step=cv_fold+1)
    tracker.log_metric("test_auc",   auc)
    tracker.log_metric("test_f1",    f1)
    tracker.log_metric("precision",  pre)
    tracker.log_metric("recall",     rec)
    tracker.log_metric("train_time", train_time)
    # Log model artifact
    tracker.log_artifact("model.pkl",  f"<serialised {name} model>")
    tracker.log_artifact("config.json", json.dumps(params))
    tracker.end_run()
    print(f"  {name:<25} AUC={auc:.4f}  F1={f1:.4f}  time={train_time:.2f}s")

print(f"\nTotal runs logged: {len(tracker.runs)}")

📸 Verified Output:


📸 Verified Output:


Step 3: Model Registry and Lifecycle Management

📸 Verified Output:


Step 4: Model Lineage and Reproducibility

📸 Verified Output:


Step 5–8: Capstone — Full MLOps Pipeline

📸 Verified Output:


Summary

MLflow Component
What It Solves
Real MLflow API

Experiments

"Which params gave best results?"

mlflow.start_run()

Metrics logging

Training curves, per-fold CV

mlflow.log_metric(step=)

Artifact logging

Save models, configs, plots

mlflow.log_artifact()

Model Registry

Version and stage management

mlflow.register_model()

Lineage

Reproducibility audit trail

mlflow.set_tag() + data hash

Further Reading

Last updated