Lab 07: Distributed Training Patterns
Objective
Background
Single GPU: batch → forward → backward → update (1 GPU, 1× throughput)
Data parallel:each GPU gets a shard → gradients synced (N× throughput)
Model parallel:model split across GPUs (for 70B+ models)
Pipeline: layer groups on different GPUs (for very deep networks)
PyTorch DDP (DistributedDataParallel) is the standard approach for data parallelism.Step 1: Data Parallelism Simulation
docker run -it --rm zchencow/innozverse-ai:latest bashimport numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import time, warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
X, y = make_classification(n_samples=20000, n_features=20, n_informative=12,
weights=[0.94, 0.06], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_tr); X_te_s = scaler.transform(X_te)
class DataParallelWorker:
"""Simulates one GPU worker in data-parallel training"""
def __init__(self, worker_id: int, n_workers: int):
self.worker_id = worker_id
self.n_workers = n_workers
np.random.seed(42 + worker_id)
def get_shard(self, X: np.ndarray, y: np.ndarray) -> tuple:
"""Each worker gets a non-overlapping shard of data"""
n = len(X)
shard = n // self.n_workers
start = self.worker_id * shard
end = start + shard if self.worker_id < self.n_workers - 1 else n
return X[start:end], y[start:end]
def compute_local_gradients(self, X_shard: np.ndarray, y_shard: np.ndarray,
weights: np.ndarray) -> np.ndarray:
"""Compute gradient on local shard"""
# Logistic regression gradient: X.T @ (sigmoid(Xw) - y) / n
logits = X_shard @ weights
probs = 1 / (1 + np.exp(-np.clip(logits, -500, 500)))
errors = probs - y_shard
return X_shard.T @ errors / len(X_shard)
class AllReduceAggregator:
"""
All-reduce: synchronise gradients across all workers.
Algorithms:
- Ring all-reduce (used by NCCL, PyTorch DDP): O(N) communication
- Tree all-reduce: O(log N) latency
- Parameter server: centralised, simpler but bottleneck
Here: ring all-reduce simulation
"""
def ring_allreduce(self, gradients_list: list) -> np.ndarray:
"""
Ring all-reduce: N-1 scatter-reduce rounds + N-1 all-gather rounds
Each worker ends up with the global average gradient
"""
n_workers = len(gradients_list)
# Simulate: just average all gradients (result is identical to ring all-reduce)
avg_grad = np.mean(gradients_list, axis=0)
# In real ring all-reduce: each worker gets avg via ring communication pattern
return avg_grad
def allreduce_with_compression(self, gradients_list: list,
top_k_ratio: float = 0.1) -> np.ndarray:
"""
Gradient compression: send only top-K% largest gradients.
Reduces communication bandwidth by 10-100×.
Used by: 1-bit Adam, gradient sparsification, TopK sparsification.
"""
avg_grad = np.mean(gradients_list, axis=0)
# Sparsify: keep only top-K elements, zero rest
k = max(1, int(len(avg_grad) * top_k_ratio))
topk_idx = np.argsort(np.abs(avg_grad))[::-1][:k]
sparse_grad = np.zeros_like(avg_grad)
sparse_grad[topk_idx] = avg_grad[topk_idx]
return sparse_grad, float((k / len(avg_grad)) * 100) # compression ratio
# Simulate 4 workers
n_workers = 4
workers = [DataParallelWorker(i, n_workers) for i in range(n_workers)]
aggregator = AllReduceAggregator()
# Global model weights
np.random.seed(42)
weights = np.random.randn(X_tr_s.shape[1]) * 0.01
print("Data Parallel Training Simulation:")
print(f" Workers: {n_workers}")
print(f" Total data: {len(X_tr_s)} samples ({len(X_tr_s)//n_workers} per worker)\n")
for step in range(3):
# Each worker computes gradient on its shard
local_grads = []
for w in workers:
X_shard, y_shard = w.get_shard(X_tr_s, y_tr)
grad = w.compute_local_gradients(X_shard, y_shard, weights)
local_grads.append(grad)
# All-reduce: synchronise gradients
avg_grad = aggregator.ring_allreduce(local_grads)
# Update weights (SGD step)
weights -= 0.01 * avg_grad
# Measure gradient agreement across workers
grad_variance = np.var([g.mean() for g in local_grads])
print(f" Step {step+1}: grad_mean={avg_grad.mean():.6f} "
f"worker_variance={grad_variance:.6f} "
f"weight_norm={np.linalg.norm(weights):.4f}")
# Gradient compression
sparse_grad, ratio = aggregator.allreduce_with_compression(local_grads, top_k_ratio=0.1)
print(f"\nGradient compression (top-10%): {ratio:.0f}% transmitted, "
f"{100-ratio:.0f}% bandwidth saved")
print(f"Sparse gradient norm: {np.linalg.norm(sparse_grad):.4f} vs dense: {np.linalg.norm(avg_grad):.4f}")Step 2: Gradient Accumulation for Effective Large Batches
Step 3: Parameter Server Architecture
Step 4: Mixed Precision Training (FP16/BF16)
Step 5–8: Capstone — Distributed Training Benchmark
Summary
Strategy
When to Use
Real Framework
Further Reading
PreviousLab 06: MLflow — Experiment Tracking & Model RegistryNextLab 08: ML Monitoring & Drift Detection
Last updated
