Copy import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
class VAE:
"""
Variational Autoencoder for anomaly detection.
Architecture: x → Encoder(μ, log_var) → z ~ N(μ,σ) → Decoder → x_reconstructed
"""
def __init__(self, input_dim: int, latent_dim: int = 8, hidden: int = 32):
k = np.sqrt(2/input_dim)
self.W_enc1 = np.random.randn(input_dim, hidden) * k
self.b_enc1 = np.zeros(hidden)
self.W_mu = np.random.randn(hidden, latent_dim) * np.sqrt(2/hidden)
self.b_mu = np.zeros(latent_dim)
self.W_lv = np.random.randn(hidden, latent_dim) * np.sqrt(2/hidden)
self.b_lv = np.zeros(latent_dim)
self.W_dec1 = np.random.randn(latent_dim, hidden) * np.sqrt(2/latent_dim)
self.b_dec1 = np.zeros(hidden)
self.W_dec2 = np.random.randn(hidden, input_dim) * np.sqrt(2/hidden)
self.b_dec2 = np.zeros(input_dim)
self.lr = 0.001
def encode(self, x: np.ndarray) -> tuple:
h = np.maximum(0, x @ self.W_enc1 + self.b_enc1)
mu = h @ self.W_mu + self.b_mu
lv = h @ self.W_lv + self.b_lv
return mu, lv
def reparameterise(self, mu: np.ndarray, log_var: np.ndarray) -> np.ndarray:
"""Reparameterisation trick: z = μ + ε·σ where ε ~ N(0,1)"""
std = np.exp(0.5 * log_var)
eps = np.random.randn(*mu.shape)
return mu + eps * std
def decode(self, z: np.ndarray) -> np.ndarray:
h = np.maximum(0, z @ self.W_dec1 + self.b_dec1)
return h @ self.W_dec2 + self.b_dec2
def elbo_loss(self, x: np.ndarray, x_recon: np.ndarray,
mu: np.ndarray, log_var: np.ndarray) -> float:
"""ELBO = reconstruction + KL divergence"""
recon = np.mean((x - x_recon) ** 2)
kl = -0.5 * np.mean(1 + log_var - mu**2 - np.exp(log_var))
return recon + 0.01 * kl # β-VAE style: weight KL term
def anomaly_score(self, x: np.ndarray, n_samples: int = 10) -> np.ndarray:
"""
Anomaly score = average reconstruction error over multiple samples.
High score → anomalous (doesn't fit learned normal distribution)
"""
scores = []
for _ in range(n_samples):
mu, lv = self.encode(x)
z = self.reparameterise(mu, lv)
x_recon = self.decode(z)
scores.append(np.mean((x - x_recon)**2, axis=1))
return np.mean(scores, axis=0)
def train_step(self, x: np.ndarray) -> float:
# Forward
mu, lv = self.encode(x)
z = self.reparameterise(mu, lv)
x_recon = self.decode(z)
loss = self.elbo_loss(x, x_recon, mu, lv)
# Simplified gradient update
recon_err = x_recon - x
self.W_dec2 -= self.lr * np.outer(np.maximum(0, z @ self.W_dec1 + self.b_dec1).mean(0), recon_err.mean(0)) * 0.001
return loss
# Generate network traffic data
X_normal, _ = make_classification(n_samples=3000, n_features=20, n_informative=12,
weights=[1.0, 0], random_state=42)
X_attack, _ = make_classification(n_samples=200, n_features=20, n_informative=12,
weights=[0, 1.0], random_state=99)
X_attack[:, :5] += 3.0 # attacks have shifted features
scaler = StandardScaler()
X_norm_s = scaler.fit_transform(X_normal)
X_att_s = scaler.transform(X_attack)
# Train VAE on normal traffic only
vae = VAE(input_dim=20, latent_dim=8, hidden=32)
print("Training VAE on normal traffic (5 epochs):")
for epoch in range(5):
idx = np.random.permutation(len(X_norm_s))
losses = [vae.train_step(X_norm_s[idx[i:i+64]]) for i in range(0, len(X_norm_s)-64, 64)]
print(f" Epoch {epoch+1}: loss={np.mean(losses):.4f}")
# Evaluate anomaly detection
from sklearn.metrics import roc_auc_score, classification_report
X_test = np.vstack([X_norm_s[:500], X_att_s[:100]])
y_test = np.array([0]*500 + [1]*100)
scores = vae.anomaly_score(X_test, n_samples=5)
auc = roc_auc_score(y_test, scores)
threshold = np.percentile(scores[:500], 95)
preds = (scores >= threshold).astype(int)
print(f"\nVAE Anomaly Detection:")
print(f" ROC-AUC: {auc:.4f}")
print(f" Threshold (95th %ile normal): {threshold:.4f}")
from sklearn.metrics import precision_score, recall_score, f1_score
print(f" Precision: {precision_score(y_test, preds):.4f}")
print(f" Recall: {recall_score(y_test, preds):.4f}")
print(f" F1: {f1_score(y_test, preds):.4f}")