Lab 06: MLflow — Experiment Tracking & Model Registry
Objective
Background
Without MLflow: With MLflow:
"Which params gave 0.97 AUC?" mlflow.search_runs(filter="metrics.auc > 0.97")
"What data was this trained on?" mlflow.log_param("dataset_version", "v2.3")
"Is prod model still v1.2?" mlflow.MlflowClient().get_model_version(...)
"Why did accuracy drop?" Compare run 847 vs run 901 in UIStep 1: MLflow Tracking — Log Experiments
docker run -it --rm zchencow/innozverse-ai:latest bashimport numpy as np, json, hashlib, time
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
# Dataset
X, y = make_classification(n_samples=5000, n_features=20, n_informative=12,
weights=[0.94, 0.06], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_tr); X_te_s = scaler.transform(X_te)
class MLflowTracker:
"""
MLflow-compatible experiment tracker.
Real MLflow: import mlflow; mlflow.start_run()
"""
def __init__(self, experiment_name: str, tracking_uri: str = "sqlite:///mlruns.db"):
self.experiment_name = experiment_name
self.tracking_uri = tracking_uri
self.runs = {}
self.active_run = None
def start_run(self, run_name: str = None, tags: dict = None) -> str:
run_id = hashlib.md5(f"{run_name}{time.time()}".encode()).hexdigest()[:8]
self.active_run = {
'run_id': run_id,
'run_name': run_name or f"run_{run_id}",
'experiment':self.experiment_name,
'status': 'RUNNING',
'start_time':time.time(),
'params': {},
'metrics': {},
'artifacts': [],
'tags': tags or {},
}
self.runs[run_id] = self.active_run
return run_id
def log_param(self, key: str, value):
self.active_run['params'][key] = value
def log_params(self, params: dict):
self.active_run['params'].update(params)
def log_metric(self, key: str, value: float, step: int = None):
if key not in self.active_run['metrics']:
self.active_run['metrics'][key] = []
self.active_run['metrics'][key].append({'value': value, 'step': step})
def log_artifact(self, name: str, content: str):
self.active_run['artifacts'].append({'name': name, 'size': len(content)})
def end_run(self, status: str = "FINISHED"):
self.active_run['status'] = status
self.active_run['end_time'] = time.time()
self.active_run['duration_s'] = round(
self.active_run['end_time'] - self.active_run['start_time'], 2)
def get_best_run(self, metric: str, mode: str = 'max') -> dict:
finished = {rid: r for rid, r in self.runs.items() if r['status'] == 'FINISHED'}
def last_metric(run):
vals = run['metrics'].get(metric, [])
return vals[-1]['value'] if vals else float('-inf')
if mode == 'max':
return max(finished.values(), key=last_metric)
return min(finished.values(), key=last_metric)
def compare_runs(self, metric: str) -> list:
rows = []
for rid, run in self.runs.items():
vals = run['metrics'].get(metric, [])
last = vals[-1]['value'] if vals else None
rows.append({'run_id': rid[:6], 'name': run['run_name'], metric: last,
'params': run['params']})
return sorted(rows, key=lambda x: x[metric] or 0, reverse=True)
tracker = MLflowTracker(experiment_name="malware_classifier_v2")
# Run multiple experiments
experiments = [
("LogisticRegression", LogisticRegression(C=1.0, max_iter=1000),
{"model_type": "logistic_regression", "C": 1.0, "max_iter": 1000}),
("RandomForest_100", RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42),
{"model_type": "random_forest", "n_estimators": 100, "max_depth": 10}),
("RandomForest_200", RandomForestClassifier(n_estimators=200, max_depth=15, random_state=42),
{"model_type": "random_forest", "n_estimators": 200, "max_depth": 15}),
("GradientBoosting", GradientBoostingClassifier(n_estimators=200, max_depth=4,
learning_rate=0.05, random_state=42),
{"model_type": "gradient_boosting", "n_estimators": 200, "lr": 0.05, "max_depth": 4}),
]
print("Running experiments...\n")
for name, clf, params in experiments:
run_id = tracker.start_run(run_name=name, tags={"dataset": "malware_pe_v3", "env": "dev"})
tracker.log_params(params)
tracker.log_params({"dataset_version": "v3.1", "scaler": "StandardScaler",
"train_size": len(X_tr), "test_size": len(X_te)})
# Train
t0 = time.time()
clf.fit(X_tr_s, y_tr)
train_time = time.time() - t0
# Metrics
prob = clf.predict_proba(X_te_s)[:, 1]
pred = (prob >= 0.5).astype(int)
auc = roc_auc_score(y_te, prob)
f1 = f1_score(y_te, pred)
pre = precision_score(y_te, pred)
rec = recall_score(y_te, pred)
# Log metrics per epoch (simulate training curves)
for cv_fold in range(5):
cv_auc = float(cross_val_score(clf, X_tr_s, y_tr, cv=5, scoring='roc_auc')[cv_fold])
tracker.log_metric("cv_auc", cv_auc, step=cv_fold+1)
tracker.log_metric("test_auc", auc)
tracker.log_metric("test_f1", f1)
tracker.log_metric("precision", pre)
tracker.log_metric("recall", rec)
tracker.log_metric("train_time", train_time)
# Log model artifact
tracker.log_artifact("model.pkl", f"<serialised {name} model>")
tracker.log_artifact("config.json", json.dumps(params))
tracker.end_run()
print(f" {name:<25} AUC={auc:.4f} F1={f1:.4f} time={train_time:.2f}s")
print(f"\nTotal runs logged: {len(tracker.runs)}")Step 2: Experiment Comparison and Hyperparameter Search
Step 3: Model Registry and Lifecycle Management
Step 4: Model Lineage and Reproducibility
Step 5–8: Capstone — Full MLOps Pipeline
Summary
MLflow Component
What It Solves
Real MLflow API
Further Reading
Last updated
