Lab 07: Distributed Training Patterns

Objective

Master distributed ML training: data parallelism, gradient aggregation, all-reduce algorithms, parameter servers, and pipeline parallelism — with verified numpy implementations of the core synchronisation patterns used by PyTorch DDP and DeepSpeed.

Time: 50 minutes | Level: Advanced | Docker Image: zchencow/innozverse-ai:latest


Background

Single GPU:   batch → forward → backward → update       (1 GPU, 1× throughput)
Data parallel:each GPU gets a shard → gradients synced  (N× throughput)
Model parallel:model split across GPUs                  (for 70B+ models)
Pipeline:     layer groups on different GPUs            (for very deep networks)

PyTorch DDP (DistributedDataParallel) is the standard approach for data parallelism.

Step 1: Data Parallelism Simulation

docker run -it --rm zchencow/innozverse-ai:latest bash
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import time, warnings; warnings.filterwarnings('ignore')

np.random.seed(42)

X, y = make_classification(n_samples=20000, n_features=20, n_informative=12,
                             weights=[0.94, 0.06], random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_tr_s = scaler.fit_transform(X_tr); X_te_s = scaler.transform(X_te)

class DataParallelWorker:
    """Simulates one GPU worker in data-parallel training"""

    def __init__(self, worker_id: int, n_workers: int):
        self.worker_id = worker_id
        self.n_workers = n_workers
        np.random.seed(42 + worker_id)

    def get_shard(self, X: np.ndarray, y: np.ndarray) -> tuple:
        """Each worker gets a non-overlapping shard of data"""
        n     = len(X)
        shard = n // self.n_workers
        start = self.worker_id * shard
        end   = start + shard if self.worker_id < self.n_workers - 1 else n
        return X[start:end], y[start:end]

    def compute_local_gradients(self, X_shard: np.ndarray, y_shard: np.ndarray,
                                  weights: np.ndarray) -> np.ndarray:
        """Compute gradient on local shard"""
        # Logistic regression gradient: X.T @ (sigmoid(Xw) - y) / n
        logits = X_shard @ weights
        probs  = 1 / (1 + np.exp(-np.clip(logits, -500, 500)))
        errors = probs - y_shard
        return X_shard.T @ errors / len(X_shard)


class AllReduceAggregator:
    """
    All-reduce: synchronise gradients across all workers.
    
    Algorithms:
    - Ring all-reduce (used by NCCL, PyTorch DDP): O(N) communication
    - Tree all-reduce: O(log N) latency
    - Parameter server: centralised, simpler but bottleneck
    
    Here: ring all-reduce simulation
    """

    def ring_allreduce(self, gradients_list: list) -> np.ndarray:
        """
        Ring all-reduce: N-1 scatter-reduce rounds + N-1 all-gather rounds
        Each worker ends up with the global average gradient
        """
        n_workers = len(gradients_list)
        # Simulate: just average all gradients (result is identical to ring all-reduce)
        avg_grad = np.mean(gradients_list, axis=0)
        # In real ring all-reduce: each worker gets avg via ring communication pattern
        return avg_grad

    def allreduce_with_compression(self, gradients_list: list,
                                    top_k_ratio: float = 0.1) -> np.ndarray:
        """
        Gradient compression: send only top-K% largest gradients.
        Reduces communication bandwidth by 10-100×.
        Used by: 1-bit Adam, gradient sparsification, TopK sparsification.
        """
        avg_grad = np.mean(gradients_list, axis=0)
        # Sparsify: keep only top-K elements, zero rest
        k = max(1, int(len(avg_grad) * top_k_ratio))
        topk_idx = np.argsort(np.abs(avg_grad))[::-1][:k]
        sparse_grad = np.zeros_like(avg_grad)
        sparse_grad[topk_idx] = avg_grad[topk_idx]
        return sparse_grad, float((k / len(avg_grad)) * 100)  # compression ratio

# Simulate 4 workers
n_workers = 4
workers   = [DataParallelWorker(i, n_workers) for i in range(n_workers)]
aggregator = AllReduceAggregator()

# Global model weights
np.random.seed(42)
weights = np.random.randn(X_tr_s.shape[1]) * 0.01

print("Data Parallel Training Simulation:")
print(f"  Workers:    {n_workers}")
print(f"  Total data: {len(X_tr_s)} samples ({len(X_tr_s)//n_workers} per worker)\n")

for step in range(3):
    # Each worker computes gradient on its shard
    local_grads = []
    for w in workers:
        X_shard, y_shard = w.get_shard(X_tr_s, y_tr)
        grad = w.compute_local_gradients(X_shard, y_shard, weights)
        local_grads.append(grad)

    # All-reduce: synchronise gradients
    avg_grad = aggregator.ring_allreduce(local_grads)

    # Update weights (SGD step)
    weights -= 0.01 * avg_grad

    # Measure gradient agreement across workers
    grad_variance = np.var([g.mean() for g in local_grads])
    print(f"  Step {step+1}: grad_mean={avg_grad.mean():.6f}  "
          f"worker_variance={grad_variance:.6f}  "
          f"weight_norm={np.linalg.norm(weights):.4f}")

# Gradient compression
sparse_grad, ratio = aggregator.allreduce_with_compression(local_grads, top_k_ratio=0.1)
print(f"\nGradient compression (top-10%): {ratio:.0f}% transmitted, "
      f"{100-ratio:.0f}% bandwidth saved")
print(f"Sparse gradient norm: {np.linalg.norm(sparse_grad):.4f} vs dense: {np.linalg.norm(avg_grad):.4f}")

📸 Verified Output:


Step 2: Gradient Accumulation for Effective Large Batches

📸 Verified Output:


Step 3: Parameter Server Architecture

📸 Verified Output:


Step 4: Mixed Precision Training (FP16/BF16)

📸 Verified Output:


Step 5–8: Capstone — Distributed Training Benchmark

📸 Verified Output:


Summary

Strategy
When to Use
Real Framework

Data parallelism

Most tasks, multiple GPUs

torch.nn.parallel.DistributedDataParallel

Gradient accumulation

Memory-constrained, large batch

loss.backward() every N steps

All-reduce

Multi-node synchronisation

NCCL, torch.distributed.all_reduce()

Mixed precision

Modern GPU (A100, H100)

torch.cuda.amp.autocast()

Parameter server

Simple async setups

Horovod, Ray Train

Further Reading

Last updated