Copy docker run --rm zchencow/innozverse-python:latest python3 - << 'PYEOF'
import numpy as np
np.random.seed(42)
# ββ Test problem: minimise a 2D bowl + noise ββββββββββββββββββββββββββββββββββ
# f(w1, w2) = w1Β² + 10Β·w2Β² (narrow valley β hard for vanilla SGD)
def loss_fn(w, X, y):
y_pred = X @ w
return np.mean((y_pred - y) ** 2)
def gradient(w, X, y, batch_idx=None):
if batch_idx is not None:
X, y = X[batch_idx], y[batch_idx]
y_pred = X @ w
return 2 * X.T @ (y_pred - y) / len(y)
# Generate regression data
n, d = 200, 5
X_data = np.random.randn(n, d)
w_true = np.array([1.5, -2.0, 0.8, 1.2, -0.5])
y_data = X_data @ w_true + np.random.normal(0, 0.3, n)
# ββ Step 1: Optimiser implementations ββββββββββββββββββββββββββββββββββββββββ
print("=== Step 1: Optimiser Classes ===")
class SGD:
def __init__(self, lr=0.01):
self.lr = lr
def step(self, w, grad, t=None):
return w - self.lr * grad
def name(self): return f"SGD(lr={self.lr})"
class SGDMomentum:
def __init__(self, lr=0.01, momentum=0.9):
self.lr, self.momentum = lr, momentum
self.v = None
def step(self, w, grad, t=None):
if self.v is None: self.v = np.zeros_like(w)
self.v = self.momentum * self.v - self.lr * grad # velocity update
return w + self.v
def name(self): return f"SGD+Momentum(lr={self.lr},Ξ²={self.momentum})"
class RMSProp:
def __init__(self, lr=0.01, beta=0.9, eps=1e-8):
self.lr, self.beta, self.eps = lr, beta, eps
self.v = None # running avg of squared gradients
def step(self, w, grad, t=None):
if self.v is None: self.v = np.zeros_like(w)
self.v = self.beta * self.v + (1-self.beta) * grad**2
return w - self.lr * grad / (np.sqrt(self.v) + self.eps)
def name(self): return f"RMSProp(lr={self.lr})"
class Adam:
def __init__(self, lr=0.01, beta1=0.9, beta2=0.999, eps=1e-8):
self.lr, self.beta1, self.beta2, self.eps = lr, beta1, beta2, eps
self.m = None # 1st moment (mean of gradients)
self.v = None # 2nd moment (mean of squared gradients)
def step(self, w, grad, t):
if self.m is None:
self.m = np.zeros_like(w)
self.v = np.zeros_like(w)
# Update biased estimates
self.m = self.beta1 * self.m + (1-self.beta1) * grad
self.v = self.beta2 * self.v + (1-self.beta2) * grad**2
# Bias correction (critical in early iterations when m,v β 0)
m_hat = self.m / (1 - self.beta1**t)
v_hat = self.v / (1 - self.beta2**t)
return w - self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
def name(self): return f"Adam(lr={self.lr})"
print(" Implemented: SGD, SGD+Momentum, RMSProp, Adam")
# ββ Step 2: Optimiser comparison βββββββββββββββββββββββββββββββββββββββββββββ
print("\n=== Step 2: Convergence Comparison (Full Batch) ===")
def run_optimiser(opt, X, y, epochs=200, batch_size=None):
w = np.zeros(X.shape[1])
history = []
for t in range(1, epochs+1):
if batch_size:
idx = np.random.choice(len(X), batch_size, replace=False)
grad = gradient(w, X, y, idx)
else:
grad = gradient(w, X, y)
w = opt.step(w, grad, t=t)
if t % 20 == 0:
history.append((t, loss_fn(w, X, y)))
return w, history
optimisers = [
SGD(lr=0.05),
SGDMomentum(lr=0.05, momentum=0.9),
RMSProp(lr=0.05),
Adam(lr=0.05),
]
print(f" {'Optimiser':<35} {'Loss@20':>8} {'Loss@100':>9} {'Loss@200':>9}")
results = {}
for opt in optimisers:
opt.__init__(**{k:v for k,v in vars(opt).items() if not k.startswith('_') and k not in ['m','v']})
w, hist = run_optimiser(opt, X_data, y_data, epochs=200)
l20 = [l for e,l in hist if e==20][0]
l100 = [l for e,l in hist if e==100][0]
l200 = hist[-1][1]
results[opt.name()] = (w, hist, l200)
print(f" {opt.name():<35} {l20:>8.6f} {l100:>9.6f} {l200:>9.6f}")
# ββ Step 3: Mini-batch SGD ββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("\n=== Step 3: Batch Size Effect on Adam ===")
print(f" {'Batch Size':<12} {'Loss@200':>10} {'Variance':>10}")
for bs in [1, 8, 32, 64, 200]:
losses = []
for _ in range(3): # 3 runs to measure variance
opt = Adam(lr=0.01)
w, hist = run_optimiser(opt, X_data, y_data, epochs=200, batch_size=bs if bs < 200 else None)
losses.append(hist[-1][1])
print(f" {bs:<12} {np.mean(losses):>10.6f} {np.std(losses):>10.6f}")
# ββ Step 4: Learning rate sensitivity ββββββββββββββββββββββββββββββββββββββββ
print("\n=== Step 4: Learning Rate Sensitivity ===")
print(f" Adam - effect of learning rate:")
print(f" {'LR':<8} {'Loss@200':>10} {'Converged?':>12}")
for lr in [0.001, 0.01, 0.05, 0.1, 0.5, 1.0]:
opt = Adam(lr=lr)
w, hist = run_optimiser(opt, X_data, y_data, epochs=200)
final_loss = hist[-1][1]
converged = "β" if final_loss < 0.5 else "β diverged/stuck"
print(f" {lr:<8} {final_loss:>10.6f} {converged:>12}")
# ββ Step 5: Weight recovery test βββββββββββββββββββββββββββββββββββββββββββββ
print("\n=== Step 5: Weight Recovery (Adam) ===")
opt = Adam(lr=0.05)
w_found, _ = run_optimiser(opt, X_data, y_data, epochs=500)
print(f" {'Feature':<10} {'True':>8} {'Recovered':>10} {'Error':>8}")
for i, (wt, wr) in enumerate(zip(w_true, w_found)):
print(f" w{i:<9} {wt:>8.4f} {wr:>10.4f} {abs(wt-wr):>8.4f}")
final_loss = loss_fn(w_found, X_data, y_data)
print(f"\n Final loss: {final_loss:.8f}")
print(f" True loss achievable: ~{np.var(np.random.normal(0,0.3,n)):.4f} (noise variance)")
PYEOF