Copy import numpy as np
from sklearn.ensemble import (RandomForestClassifier, GradientBoostingClassifier,
VotingClassifier)
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.metrics import roc_auc_score, classification_report
import warnings; warnings.filterwarnings('ignore')
np.random.seed(42)
# Generate enterprise SIEM dataset: 50,000 events, 6% attacks
X, y = make_classification(n_samples=50000, n_features=25, n_informative=15,
weights=[0.94, 0.06], random_state=42)
scaler = StandardScaler(); X_s = scaler.fit_transform(X)
X_tr, X_te, y_tr, y_te = train_test_split(X_s, y, test_size=0.2,
stratify=y, random_state=42)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
class EnsembleDetector:
"""
Production ML ensemble for SIEM threat detection.
Models chosen for diversity (different inductive biases):
- GBM: strong sequential learner, good on tabular
- RF: parallel trees, robust to noise
- MLP: captures non-linear interactions
- LR: linear baseline, calibrated probabilities
Fusion: soft voting (average probabilities)
Post-processing: Platt scaling for calibration
"""
def __init__(self):
self.models = {
'gbm': GradientBoostingClassifier(n_estimators=200, max_depth=5,
learning_rate=0.05, subsample=0.8,
class_weight=None, random_state=42),
'rf': RandomForestClassifier(n_estimators=200, max_depth=15,
class_weight='balanced', random_state=42),
'mlp': MLPClassifier(hidden_layer_sizes=(128, 64, 32), max_iter=300,
random_state=42),
'lr': LogisticRegression(C=1.0, max_iter=1000,
class_weight='balanced', random_state=42),
}
self.calibrator = LogisticRegression(max_iter=1000)
self.is_fitted = False
def fit(self, X: np.ndarray, y: np.ndarray):
print("Training ensemble models:")
for name, model in self.models.items():
model.fit(X, y)
auc = roc_auc_score(y, model.predict_proba(X)[:,1])
print(f" {name:<6}: train AUC={auc:.4f}")
# Calibrate ensemble output
raw_probs = self._raw_predict(X)
self.calibrator.fit(raw_probs.reshape(-1,1), y)
self.is_fitted = True
def _raw_predict(self, X: np.ndarray) -> np.ndarray:
probs = np.array([m.predict_proba(X)[:,1] for m in self.models.values()])
return probs.mean(0)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
raw = self._raw_predict(X)
cal = self.calibrator.predict_proba(raw.reshape(-1,1))
return cal
def explain(self, X: np.ndarray, feature_names: list, top_k: int = 5) -> list:
"""Feature importance via RF (permutation-based proxy)"""
importances = self.models['rf'].feature_importances_
top_idx = np.argsort(importances)[::-1][:top_k]
return [(feature_names[i], round(float(importances[i]),4)) for i in top_idx]
detector = EnsembleDetector()
feat_names = [f"feature_{i:02d}" for i in range(25)]
detector.fit(X_tr, y_tr)
# Evaluate
probs_te = detector.predict_proba(X_te)[:,1]
auc = roc_auc_score(y_te, probs_te)
preds = (probs_te >= 0.5).astype(int)
print(f"\nEnsemble Test AUC: {auc:.4f}")
print(classification_report(y_te, preds, target_names=['Benign','Attack'], digits=4))
print("Top features:", detector.explain(X_te, feat_names))