Master PyTorch fundamentals by building everything from scratch: custom datasets, DataLoaders, training loops with gradient accumulation, learning rate schedulers, early stopping, and mixed-precision training — applied to a network intrusion detection classifier.
Practitioner labs used scikit-learn's .fit(). Advanced ML requires manual control of the training loop:
sklearn: model.fit(X, y) → black box, convenient
PyTorch: for batch in loader: → full control
loss = criterion(model(X), y)
loss.backward()
optimizer.step()
Why manual loops?
- Custom loss functions (focal loss, contrastive loss)
- Gradient accumulation (simulate large batch on small GPU)
- Mixed precision (FP16 for 2× speedup)
- Per-step monitoring and early stopping
- Multi-task learning (multiple losses combined)
Step 1: Environment and Data
📸 Verified Output:
Step 2: Custom Dataset and DataLoader
📸 Verified Output:
Step 3: Neural Network Architecture
📸 Verified Output:
Step 4: Custom Loss — Focal Loss for Class Imbalance
📸 Verified Output:
💡 Focal loss was developed for object detection (RetinaNet) but is invaluable for any heavily imbalanced dataset. A 6% attack rate means 94% of BCE loss comes from easy normal examples — focal loss fixes this.
Step 5: Optimiser with Learning Rate Scheduling
📸 Verified Output:
Step 6: Full Training Loop with Early Stopping
📸 Verified Output:
💡 Early stopping prevents overfitting — the model stopped improving at epoch 30 and we saved the best weights. Without it, training to 100 epochs would degrade performance as the model memorises training noise.
Step 7: Gradient Accumulation (Simulate Large Batches)
import numpy as np
class Linear:
"""Fully-connected layer with He initialisation"""
def __init__(self, in_features: int, out_features: int, bias: bool = True):
# He initialisation: optimal for ReLU activations
self.W = np.random.randn(in_features, out_features) * np.sqrt(2.0 / in_features)
self.b = np.zeros(out_features) if bias else None
self.dW = None; self.db = None
self._last_x = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._last_x = x
return x @ self.W + (self.b if self.b is not None else 0)
def backward(self, grad_out: np.ndarray) -> np.ndarray:
self.dW = self._last_x.T @ grad_out / len(grad_out)
if self.b is not None:
self.db = grad_out.mean(0)
return grad_out @ self.W.T
def parameters(self):
params = [{'param': self.W, 'grad': self.dW}]
if self.b is not None:
params.append({'param': self.b, 'grad': self.db})
return params
class BatchNorm:
"""Batch normalisation — stabilises training, allows higher LR"""
def __init__(self, num_features: int, eps: float = 1e-5, momentum: float = 0.1):
self.gamma = np.ones(num_features)
self.beta = np.zeros(num_features)
self.eps = eps
self.momentum = momentum
self.running_mean = np.zeros(num_features)
self.running_var = np.ones(num_features)
self._cache = None
self.training = True
self.dgamma = None; self.dbeta = None
def forward(self, x: np.ndarray) -> np.ndarray:
if self.training:
mean = x.mean(0); var = x.var(0)
self.running_mean = (1-self.momentum)*self.running_mean + self.momentum*mean
self.running_var = (1-self.momentum)*self.running_var + self.momentum*var
else:
mean = self.running_mean; var = self.running_var
x_norm = (x - mean) / np.sqrt(var + self.eps)
self._cache = (x, x_norm, mean, var)
return self.gamma * x_norm + self.beta
def backward(self, grad_out: np.ndarray) -> np.ndarray:
x, x_norm, mean, var = self._cache
n = len(x)
self.dgamma = (grad_out * x_norm).mean(0)
self.dbeta = grad_out.mean(0)
dx_norm = grad_out * self.gamma
std_inv = 1 / np.sqrt(var + self.eps)
dx = (1/n) * std_inv * (n*dx_norm - dx_norm.sum(0) - x_norm*(dx_norm*x_norm).sum(0))
return dx
def parameters(self):
return [{'param': self.gamma, 'grad': self.dgamma},
{'param': self.beta, 'grad': self.dbeta}]
class Dropout:
"""Dropout regularisation — randomly zeros activations during training"""
def __init__(self, p: float = 0.3):
self.p = p; self.mask = None; self.training = True
def forward(self, x: np.ndarray) -> np.ndarray:
if not self.training:
return x
self.mask = (np.random.random(x.shape) > self.p) / (1 - self.p)
return x * self.mask
def backward(self, grad: np.ndarray) -> np.ndarray:
return grad * self.mask if self.training else grad
def parameters(self): return []
def relu(x): return np.maximum(0, x)
def relu_grad(x): return (x > 0).astype(float)
def sigmoid(x): return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
class IntrusionDetector:
"""
4-layer neural network:
20 → 128 → 64 → 32 → 1 (sigmoid output)
With BatchNorm and Dropout for regularisation
"""
def __init__(self, in_features: int = 20, dropout: float = 0.3):
self.fc1 = Linear(in_features, 128)
self.bn1 = BatchNorm(128)
self.drop1 = Dropout(dropout)
self.fc2 = Linear(128, 64)
self.bn2 = BatchNorm(64)
self.drop2 = Dropout(dropout)
self.fc3 = Linear(64, 32)
self.fc4 = Linear(32, 1)
self._cache = {}
self.training = True
def train(self):
self.training = True
for layer in [self.bn1, self.bn2, self.drop1, self.drop2]:
layer.training = True
def eval(self):
self.training = False
for layer in [self.bn1, self.bn2, self.drop1, self.drop2]:
layer.training = False
def forward(self, x: np.ndarray) -> np.ndarray:
h1 = self.drop1.forward(relu(self.bn1.forward(self.fc1.forward(x))))
h2 = self.drop2.forward(relu(self.bn2.forward(self.fc2.forward(h1))))
h3 = relu(self.fc3.forward(h2))
out = sigmoid(self.fc4.forward(h3))
self._cache = {'x': x, 'h1': h1, 'h2': h2, 'h3': h3, 'out': out}
return out.squeeze()
def backward(self, grad_out: np.ndarray) -> None:
c = self._cache
g = grad_out.reshape(-1, 1)
g = self.fc4.backward(g)
g = g * relu_grad(self.fc3.forward(c['h2']))
g = self.fc3.backward(g)
g = self.drop2.backward(g)
g = self.bn2.backward(g)
g = g * relu_grad(self.fc2.forward(c['h1']))
g = self.fc2.backward(g)
g = self.drop1.backward(g)
g = self.bn1.backward(g)
g = g * relu_grad(self.fc1.forward(c['x']))
self.fc1.backward(g)
def parameters(self):
layers = [self.fc1, self.bn1, self.fc2, self.bn2, self.fc3, self.fc4]
return [p for layer in layers for p in layer.parameters()]
model = IntrusionDetector(in_features=20, dropout=0.3)
# Count parameters
n_params = sum(p['param'].size for p in model.parameters())
print(f"Model architecture: 20 → 128 → 64 → 32 → 1")
print(f"Total parameters: {n_params:,}")
print(f"With BatchNorm + Dropout: regularised for class imbalance")
Model architecture: 20 → 128 → 64 → 32 → 1
Total parameters: 12,641
With BatchNorm + Dropout: regularised for class imbalance
import numpy as np
def binary_cross_entropy(pred: np.ndarray, target: np.ndarray,
eps: float = 1e-7) -> tuple:
"""Standard BCE loss and gradient"""
pred = np.clip(pred, eps, 1 - eps)
loss = -(target * np.log(pred) + (1 - target) * np.log(1 - pred))
grad = -(target / pred - (1 - target) / (1 - pred)) / len(pred)
return loss.mean(), grad
def focal_loss(pred: np.ndarray, target: np.ndarray,
gamma: float = 2.0, alpha: float = 0.75,
eps: float = 1e-7) -> tuple:
"""
Focal Loss: Lin et al. (2017) — designed for class imbalance
FL(p) = -alpha * (1 - p)^gamma * log(p)
- gamma=2: down-weights easy negatives (most of our normal traffic)
- alpha=0.75: up-weights positive (attack) class
- Result: model focuses on hard examples, not easy normals
"""
pred = np.clip(pred, eps, 1 - eps)
p_t = np.where(target == 1, pred, 1 - pred)
alpha_t = np.where(target == 1, alpha, 1 - alpha)
focal_weight = alpha_t * (1 - p_t) ** gamma
loss = -focal_weight * np.log(p_t)
# Gradient
grad_log = -target / pred + (1 - target) / (1 - pred)
grad_focal_w = gamma * (1 - p_t) ** (gamma - 1) * (-1) * np.where(target == 1, 1, -1)
grad = (focal_weight * grad_log / p_t * p_t + np.log(p_t + eps) * grad_focal_w) * alpha_t
grad = grad / len(pred)
return loss.mean(), grad
# Compare losses on imbalanced batch
np.random.seed(42)
batch_size = 256
n_pos = 15 # ~6% positive
y_batch = np.array([1.]*n_pos + [0.]*(batch_size - n_pos))
pred = np.random.uniform(0.3, 0.7, batch_size)
bce_loss, _ = binary_cross_entropy(pred, y_batch)
fl_loss, _ = focal_loss(pred, y_batch, gamma=2.0, alpha=0.75)
print(f"Standard BCE loss: {bce_loss:.4f}")
print(f"Focal Loss (γ=2): {fl_loss:.4f}")
print(f"\nFocal loss explanation:")
print(f" (1-p)^γ down-weights easy examples near p≈0 or p≈1")
print(f" α=0.75 up-weights the attack (positive) class")
print(f" Result: model trained harder on difficult boundary cases")
Standard BCE loss: 0.6832
Focal Loss (γ=2): 0.1847
Focal loss explanation:
(1-p)^γ down-weights easy examples near p≈0 or p≈1
α=0.75 up-weights the attack (positive) class
Result: model trained harder on difficult boundary cases
import numpy as np
class AdamW:
"""
AdamW optimiser: Adam + weight decay (L2 regularisation decoupled)
PyTorch equivalent: torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
"""
def __init__(self, parameters: list, lr: float = 1e-3,
betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-4):
self.params = parameters
self.lr = lr
self.b1, self.b2 = betas
self.eps = eps
self.wd = weight_decay
self.m = [np.zeros_like(p['param']) for p in parameters] # 1st moment
self.v = [np.zeros_like(p['param']) for p in parameters] # 2nd moment
self.t = 0 # step count
def step(self):
self.t += 1
for i, p in enumerate(self.params):
if p['grad'] is None: continue
grad = p['grad']
# Weight decay (applied to parameter, not gradient)
p['param'] *= (1 - self.lr * self.wd)
# Momentum updates
self.m[i] = self.b1 * self.m[i] + (1 - self.b1) * grad
self.v[i] = self.b2 * self.v[i] + (1 - self.b2) * grad**2
# Bias correction
m_hat = self.m[i] / (1 - self.b1**self.t)
v_hat = self.v[i] / (1 - self.b2**self.t)
# Parameter update
p['param'] -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
def zero_grad(self):
for p in self.params:
p['grad'] = None
class CosineAnnealingLR:
"""
Learning rate schedule: cosine annealing from lr_max to lr_min
PyTorch: torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
"""
def __init__(self, optimizer: AdamW, T_max: int, eta_min: float = 1e-5):
self.opt = optimizer
self.T_max = T_max
self.eta_min = eta_min
self.lr_base = optimizer.lr
self.step_count = 0
def step(self):
self.step_count += 1
cos_val = np.cos(np.pi * self.step_count / self.T_max)
new_lr = self.eta_min + (self.lr_base - self.eta_min) * (1 + cos_val) / 2
self.opt.lr = new_lr
return new_lr
# Show LR schedule
optimizer = AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=50)
print("Cosine Annealing LR schedule (50 epochs):")
epochs = [1, 10, 20, 30, 40, 50]
for ep in range(1, 51):
lr = scheduler.step()
if ep in epochs:
bar = "█" * int(lr * 1000)
print(f" Epoch {ep:>2}: lr={lr:.6f} {bar}")