Copy import numpy as np
class DifferentialPrivacy:
"""
Differential Privacy (DP) for gradient protection.
ε-DP guarantee: attacker cannot distinguish whether any individual
sample was included in training by observing the gradient.
Mechanism:
1. Clip gradient norm to sensitivity Δ (bounds contribution per sample)
2. Add Gaussian noise σ = Δ · √(2 ln(1.25/δ)) / ε
Privacy budget (ε,δ): lower ε = more private, less utility
"""
def __init__(self, epsilon: float = 1.0, delta: float = 1e-5,
max_grad_norm: float = 1.0):
self.epsilon = epsilon
self.delta = delta
self.max_grad_norm= max_grad_norm
self.noise_scale = max_grad_norm * np.sqrt(2 * np.log(1.25 / delta)) / epsilon
self.privacy_spent = 0.0
def clip_gradient(self, gradient: np.ndarray) -> np.ndarray:
"""Per-sample gradient clipping"""
norm = np.linalg.norm(gradient)
if norm > self.max_grad_norm:
gradient = gradient * (self.max_grad_norm / norm)
return gradient
def add_noise(self, gradient: np.ndarray, n_samples: int) -> np.ndarray:
"""Add calibrated Gaussian noise"""
noise = np.random.normal(0, self.noise_scale / n_samples, gradient.shape)
return gradient + noise
def privatise_gradient(self, gradient: np.ndarray, n_samples: int) -> np.ndarray:
"""Clip + noise — DP-SGD step"""
clipped = self.clip_gradient(gradient)
noisy = self.add_noise(clipped, n_samples)
# Track privacy budget (simplified Rényi DP accounting)
self.privacy_spent += self.epsilon / 10
return noisy
def privacy_report(self) -> dict:
level = "Strong" if self.epsilon < 1 else "Moderate" if self.epsilon < 5 else "Weak"
return {
'epsilon': self.epsilon,
'delta': self.delta,
'noise_scale': round(self.noise_scale, 4),
'privacy_level':level,
}
class DPFederatedClient(FederatedClient):
"""Federated client with differential privacy"""
def __init__(self, client_id: int, X: np.ndarray, y: np.ndarray,
epsilon: float = 1.0):
super().__init__(client_id, X, y)
self.dp = DifferentialPrivacy(epsilon=epsilon)
def local_train(self, global_weights: np.ndarray, n_local_epochs: int = 5,
lr: float = 0.01) -> tuple:
w = global_weights.copy()
for _ in range(n_local_epochs):
logits = self.X @ w
probs = 1 / (1 + np.exp(-np.clip(logits, -500, 500)))
grad = self.X.T @ (probs - self.y) / len(self.y)
# Apply DP to gradient
dp_grad = self.dp.privatise_gradient(grad, len(self.y))
w = w - lr * dp_grad
return w - global_weights, len(self.y)
# Compare: FedAvg vs DP-FedAvg at different ε
print("Privacy-Utility Tradeoff:\n")
print(f"{'Config':<25} {'Final AUC':>12} {'Noise Scale':>13}")
print("-" * 52)
for label, epsilon in [("FedAvg (no DP)", None), ("DP-FedAvg ε=10", 10),
("DP-FedAvg ε=1", 1), ("DP-FedAvg ε=0.1", 0.1)]:
test_clients = [DPFederatedClient(i, c.X, c.y, epsilon=epsilon)
if epsilon else FederatedClient(i, c.X, c.y)
for i, c in enumerate(clients)]
test_server = FederatedServer(20)
for _ in range(10):
updates = [tc.local_train(test_server.global_weights) for tc in test_clients]
test_server.fedavg(updates)
auc = test_server.evaluate_global(test_clients)
noise_s = f"{DifferentialPrivacy(epsilon).noise_scale:.4f}" if epsilon else "0"
print(f"{label:<25} {auc:>12.4f} {noise_s:>13}")