Copy docker run --rm zchencow/innozverse-python:latest python3 - << 'PYEOF'
import numpy as np
import json, struct, hashlib
from collections import Counter
np.random.seed(42)
print("=" * 60)
print(" InnoZverse ML Pipeline — Surface Sales Predictor")
print("=" * 60)
# ══════════════════════════════════════════════════════════════
# STAGE 1: Data Generation & Ingestion
# ══════════════════════════════════════════════════════════════
print("\n[Stage 1] Data Ingestion")
np.random.seed(42)
n = 120 # training samples
# Simulate 120 Surface product configurations with noisy sales
price = np.random.uniform(399, 3499, n)
ram = np.random.choice([4, 8, 16, 32], n)
storage = np.random.choice([64, 128, 256, 512, 1000], n)
display = np.random.choice([10.5, 12.3, 13.0, 13.5, 28.0], n)
quarter = np.random.randint(1, 5, n) # fiscal quarter
rating = np.random.uniform(3.5, 5.0, n)
# Target: units sold (nonlinear function of features + noise)
def true_sales(price, ram, storage, display, quarter, rating):
base = 2000
price_effect = -0.5 * price
ram_effect = 20 * ram
holiday_boost = 300 * (quarter == 4)
display_pen = -50 * (display > 20) # Studio sells fewer units
rating_boost = 100 * (rating - 4.0)
return (base + price_effect + ram_effect + holiday_boost + display_pen + rating_boost
+ np.random.normal(0, 80))
units_sold = np.array([
true_sales(price[i], ram[i], storage[i], display[i], quarter[i], rating[i])
for i in range(n)
])
units_sold = np.clip(units_sold, 50, 2000)
# Introduce some data quality issues
missing_mask = np.random.choice(n, 8, replace=False)
price_with_na = price.copy().astype(object)
for idx in missing_mask[:3]: price_with_na[idx] = None # 3 missing prices
X_raw = np.column_stack([price, ram, storage, display, quarter.astype(float), rating])
y = units_sold
print(f" Loaded {n} samples, {X_raw.shape[1]} features")
print(f" Target range: [{y.min():.0f}, {y.max():.0f}] units/period")
print(f" Missing values injected: {sum(1 for v in price_with_na if v is None)}")
# ══════════════════════════════════════════════════════════════
# STAGE 2: Preprocessing Pipeline
# ══════════════════════════════════════════════════════════════
print("\n[Stage 2] Preprocessing Pipeline")
class Pipeline:
"""Chains transformers + a final estimator."""
def __init__(self, steps):
self.steps = steps
def fit(self, X, y=None):
X_t = X.copy()
for name, transformer in self.steps[:-1]:
X_t = transformer.fit_transform(X_t, y)
name, estimator = self.steps[-1]
estimator.fit(X_t, y)
self.fitted_X = X_t
return self
def predict(self, X):
X_t = X.copy()
for name, transformer in self.steps[:-1]:
X_t = transformer.transform(X_t)
return self.steps[-1][1].predict(X_t)
def transform_only(self, X):
X_t = X.copy()
for name, transformer in self.steps[:-1]:
X_t = transformer.transform(X_t)
return X_t
class MedianImputer:
def fit_transform(self, X, y=None):
self.medians = np.nanmedian(X, axis=0)
return self.transform(X)
def transform(self, X):
X = X.copy()
for j in range(X.shape[1]):
mask = np.isnan(X[:,j])
X[mask, j] = self.medians[j]
return X
class StandardScaler:
def fit_transform(self, X, y=None):
self.mu = X.mean(0)
self.std = X.std(0) + 1e-8
return self.transform(X)
def transform(self, X):
return (X - self.mu) / self.std
class PolyFeatures:
def __init__(self, degree=2, cols=None):
self.degree = degree
self.cols = cols # which columns to interact
def fit_transform(self, X, y=None):
self.n_orig = X.shape[1]
cols = self.cols or list(range(min(3, self.n_orig)))
self.pairs = [(i,j) for i in cols for j in cols if i<=j]
return self.transform(X)
def transform(self, X):
extras = [X[:,i]*X[:,j] for (i,j) in self.pairs]
return np.column_stack([X] + extras) if extras else X
# ══════════════════════════════════════════════════════════════
# STAGE 3: Model Implementations
# ══════════════════════════════════════════════════════════════
print("\n[Stage 3] Model Definitions")
class LinearRegressor:
def fit(self, X, y):
X_b = np.column_stack([np.ones(len(X)), X])
self.theta = np.linalg.lstsq(X_b, y, rcond=None)[0]
return self
def predict(self, X):
return np.column_stack([np.ones(len(X)), X]) @ self.theta
class KNNRegressor:
def __init__(self, k=5):
self.k = k
def fit(self, X, y):
self.X_tr, self.y_tr = X, y; return self
def predict(self, X):
preds = []
for x in X:
dists = np.sqrt(((self.X_tr - x)**2).sum(1))
w = 1 / (dists[np.argsort(dists)[:self.k]] + 1e-10)
preds.append(np.average(self.y_tr[np.argsort(dists)[:self.k]], weights=w))
return np.array(preds)
class RandomForestRegressor:
"""Bagging of simple regression stumps (depth-1 decision trees)."""
def __init__(self, n_trees=20, max_features=3, max_depth=4):
self.n_trees, self.max_features, self.max_depth = n_trees, max_features, max_depth
self.trees = []
def _build_stump(self, X, y):
best = {"gain": -np.inf}
for feat in np.random.choice(X.shape[1], self.max_features, replace=False):
for thresh in np.percentile(X[:,feat], [25,50,75]):
l, r = y[X[:,feat]<=thresh], y[X[:,feat]>thresh]
if len(l)==0 or len(r)==0: continue
gain = y.var() - (len(l)*l.var()+len(r)*r.var())/len(y)
if gain > best["gain"]:
best = {"gain":gain,"feat":feat,"thresh":thresh,"lv":l.mean(),"rv":r.mean()}
return best
def fit(self, X, y):
n = len(X)
for _ in range(self.n_trees):
idx = np.random.choice(n, n, replace=True) # bootstrap
tree = self._build_stump(X[idx], y[idx])
self.trees.append(tree)
return self
def predict(self, X):
preds = np.zeros(len(X))
for tree in self.trees:
mask = X[:, tree["feat"]] <= tree["thresh"]
preds[mask] += tree["lv"]
preds[~mask] += tree["rv"]
return preds / len(self.trees)
print(f" Models: LinearRegressor, KNNRegressor, RandomForestRegressor")
# ══════════════════════════════════════════════════════════════
# STAGE 4: Train / Test Split + CV
# ══════════════════════════════════════════════════════════════
print("\n[Stage 4] Splitting & Cross-Validation")
def rmse(a, b): return np.sqrt(np.mean((a-b)**2))
def r2(a, b): return 1 - np.sum((a-b)**2)/(np.sum((a-a.mean())**2)+1e-10)
idx = np.random.permutation(n)
n_test = 24
X_train_raw, y_train = X_raw[idx[n_test:]], y[idx[n_test:]]
X_test_raw, y_test = X_raw[idx[:n_test]], y[idx[:n_test]]
def cv_score(model_class, model_kwargs, X, y_cv, k=5):
n_cv = len(X)
fold_size = n_cv // k
rmses = []
for fold in range(k):
val_idx = list(range(fold*fold_size, (fold+1)*fold_size))
train_idx = [i for i in range(n_cv) if i not in val_idx]
imp = MedianImputer()
sc = StandardScaler()
X_tr = sc.fit_transform(imp.fit_transform(X[train_idx]))
X_va = sc.transform(imp.transform(X[val_idx]))
m = model_class(**model_kwargs).fit(X_tr, y_cv[train_idx])
rmses.append(rmse(m.predict(X_va), y_cv[val_idx]))
return np.mean(rmses), np.std(rmses)
models_cv = [
("Linear", LinearRegressor, {}),
("KNN k=5", KNNRegressor, {"k":5}),
("KNN k=10", KNNRegressor, {"k":10}),
("RF n=20", RandomForestRegressor, {"n_trees":20}),
("RF n=50", RandomForestRegressor, {"n_trees":50}),
]
print(f" {'Model':<15} {'CV-RMSE':>10} {'±':>6}")
cv_results = {}
for name, cls, kwargs in models_cv:
mean, std = cv_score(cls, kwargs, X_train_raw, y_train)
cv_results[name] = mean
print(f" {name:<15} {mean:>10.2f} ±{std:>5.2f}")
best_model_name = min(cv_results, key=cv_results.get)
print(f"\n ✓ Best model: {best_model_name} (CV-RMSE={cv_results[best_model_name]:.2f})")
# ══════════════════════════════════════════════════════════════
# STAGE 5: Final Training & Test Evaluation
# ══════════════════════════════════════════════════════════════
print("\n[Stage 5] Final Evaluation on Held-Out Test Set")
imputer = MedianImputer()
scaler = StandardScaler()
X_tr_proc = scaler.fit_transform(imputer.fit_transform(X_train_raw))
X_te_proc = scaler.transform(imputer.transform(X_test_raw))
final_results = {}
for name, cls, kwargs in models_cv:
m = cls(**kwargs).fit(X_tr_proc, y_train)
preds = m.predict(X_te_proc)
final_results[name] = {"rmse": rmse(preds, y_test), "r2": r2(preds, y_test), "model": m}
print(f" {'Model':<15} {'Test RMSE':>10} {'R²':>8}")
for name, res in final_results.items():
marker = " ←best" if res["rmse"] == min(r["rmse"] for r in final_results.values()) else ""
print(f" {name:<15} {res['rmse']:>10.2f} {res['r2']:>8.4f}{marker}")
# ══════════════════════════════════════════════════════════════
# STAGE 6: Ensemble (Voting)
# ══════════════════════════════════════════════════════════════
print("\n[Stage 6] Ensemble — Weighted Voting")
weights = {name: 1/res["rmse"] for name, res in final_results.items()}
total_w = sum(weights.values())
ensemble_pred = sum(
(weights[name]/total_w) * res["model"].predict(X_te_proc)
for name, res in final_results.items()
)
ens_rmse = rmse(ensemble_pred, y_test)
ens_r2 = r2(ensemble_pred, y_test)
print(f" Ensemble RMSE: {ens_rmse:.2f} R²={ens_r2:.4f}")
best_single = min(final_results.values(), key=lambda x: x["rmse"])["rmse"]
print(f" Improvement over best single: {(best_single-ens_rmse)/best_single*100:+.1f}%")
# ══════════════════════════════════════════════════════════════
# STAGE 7: Inference on New Products
# ══════════════════════════════════════════════════════════════
print("\n[Stage 7] Inference — New Surface Products")
new_products = np.array([
[864, 16, 256, 12.3, 1, 4.3], # Surface Pro 12 standard
[1299, 32, 512, 13.5, 4, 4.6], # Surface Pro 12 premium, Q4
[399, 4, 64, 10.5, 2, 4.0], # Surface Go budget
[3499, 32,1000, 28.0, 3, 4.8], # Surface Studio
])
prod_names = ["Pro 12 Std","Pro 12 Prem Q4","Go Budget","Studio"]
X_new = scaler.transform(imputer.transform(new_products))
rf50 = final_results["RF n=50"]["model"]
knn10 = final_results["KNN k=10"]["model"]
print(f" {'Product':<17} {'RF-50':>8} {'KNN-10':>8} {'Ensemble':>10} {'Confidence'}")
for i, name in enumerate(prod_names):
x_i = X_new[i:i+1]
p_rf = rf50.predict(x_i)[0]
p_knn = knn10.predict(x_i)[0]
p_ens = (p_rf + p_knn) / 2
conf = "HIGH" if abs(p_rf-p_knn)/p_ens < 0.15 else "LOW"
print(f" {name:<17} {p_rf:>8.0f} {p_knn:>8.0f} {p_ens:>10.0f} {conf}")
# ══════════════════════════════════════════════════════════════
# STAGE 8: Model Serialisation
# ══════════════════════════════════════════════════════════════
print("\n[Stage 8] Model Serialisation (JSON)")
def serialise_rf(model):
return {"type":"RandomForest","n_trees":model.n_trees,
"trees":[{"feat":int(t["feat"]),"thresh":float(t["thresh"]),
"lv":float(t["lv"]),"rv":float(t["rv"])} for t in model.trees]}
def deserialise_rf(data):
m = RandomForestRegressor(n_trees=data["n_trees"])
m.trees = data["trees"]
return m
payload = json.dumps({
"model": serialise_rf(rf50),
"scaler": {"mu": scaler.mu.tolist(), "std": scaler.std.tolist()},
"imputer": {"medians": imputer.medians.tolist()},
"version": "1.0.0",
})
checksum = hashlib.md5(payload.encode()).hexdigest()[:8]
print(f" Serialised model: {len(payload)} bytes")
print(f" MD5 checksum: {checksum}")
# Round-trip test
model_data = json.loads(payload)
rf_loaded = deserialise_rf(model_data["model"])
rf_loaded.trees = model_data["model"]["trees"]
sc_loaded = type('S', (), {"mu": np.array(model_data["scaler"]["mu"]),
"std": np.array(model_data["scaler"]["std"]),
"transform": lambda self, X: (X-self.mu)/self.std})()
pred_orig = rf50.predict(X_te_proc)
pred_loaded = rf_loaded.predict(X_te_proc)
diff = np.abs(pred_orig - pred_loaded).max()
print(f" Round-trip prediction error: {diff:.10f} ({'✓ PASS' if diff < 0.01 else '✗ FAIL'})")
print("\n" + "=" * 60)
print(" Pipeline Complete!")
print(f" Best model: RF n=50 | Test RMSE: {final_results['RF n=50']['rmse']:.1f}")
print(f" Ensemble: | Test RMSE: {ens_rmse:.1f}")
print("=" * 60)
PYEOF