Lab 17: Anomaly Detection for Security Logs
Objective
Background
Security applications:
- Failed login spikes (brute force detection)
- Unusual data transfer volumes (exfiltration)
- Rare process executions (malware)
- Geographic login anomalies (credential stuffing)
- Off-hours access patterns (insider threat)Step 1: Statistical Baseline Detector
docker run -it --rm zchencow/innozverse-ai:latest bashimport numpy as np
from sklearn.preprocessing import StandardScaler
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
def simulate_login_logs(n_days: int = 30, events_per_day: int = 200) -> dict:
"""Simulate enterprise login log features"""
n = n_days * events_per_day
hours = np.random.choice(range(8, 19), n) # business hours
attempts = np.random.poisson(1.2, n).clip(1, 5) # login attempts
duration = np.random.exponential(30, n).clip(1, 300) # session seconds
data_mb = np.random.exponential(50, n).clip(0, 500) # data transferred
from_ip = np.random.randint(0, 50, n) # IP pool (50 known)
# Inject anomalies: brute force on days 20-22
anomaly_mask = np.zeros(n, dtype=bool)
bf_idx = np.where(np.arange(n) // events_per_day >= 20)[0][:100]
attempts[bf_idx] = np.random.randint(8, 50, len(bf_idx)) # many attempts
hours[bf_idx] = np.random.choice([2, 3, 4], len(bf_idx)) # off-hours
from_ip[bf_idx] = np.random.randint(200, 250, len(bf_idx)) # unknown IPs
anomaly_mask[bf_idx] = True
X = np.column_stack([hours, attempts, duration, data_mb, from_ip])
return {'X': X, 'labels': anomaly_mask,
'features': ['hour', 'attempts', 'duration_s', 'data_mb', 'src_ip_id']}
logs = simulate_login_logs()
X, y_true = logs['X'], logs['labels']
scaler = StandardScaler(); X_s = scaler.fit_transform(X)
class StatisticalAnomalyDetector:
"""Z-score based anomaly detection — simple but effective for univariate features"""
def __init__(self, threshold: float = 3.0):
self.threshold = threshold
self.means = None; self.stds = None
def fit(self, X: np.ndarray):
self.means = X.mean(0); self.stds = X.std(0)
def score(self, X: np.ndarray) -> np.ndarray:
z = np.abs((X - self.means) / (self.stds + 1e-8))
return z.max(1) # max z-score across features
def predict(self, X: np.ndarray) -> np.ndarray:
return (self.score(X) >= self.threshold).astype(int)
stat_det = StatisticalAnomalyDetector(threshold=3.5)
stat_det.fit(X_s[:4000])
preds = stat_det.predict(X_s)
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
scores = stat_det.score(X_s)
auc = roc_auc_score(y_true, scores)
print(f"Statistical Detector (Z-score threshold={stat_det.threshold}):")
print(f" AUC: {auc:.4f}")
print(f" Precision: {precision_score(y_true, preds):.4f}")
print(f" Recall: {recall_score(y_true, preds):.4f}")
print(f" F1: {f1_score(y_true, preds):.4f}")
print(f" Flagged: {preds.sum()} events ({preds.mean():.1%})")Step 2: Isolation Forest
Step 3: Local Outlier Factor
Step 4: Autoencoder Reconstruction Detector
Step 5–8: Capstone — Ensemble SIEM Anomaly Engine
Summary
Method
Labels
Strength
Weakness
Further Reading
Last updated
