Systematically attack ML systems to find vulnerabilities before adversaries do: model inversion attacks, membership inference, data poisoning detection, supply chain threats (pickle injection), adversarial patch generation, and build a comprehensive AI security audit framework.
AI systems introduce a new attack surface beyond traditional software:
Training phase:
Data poisoning: inject malicious samples → backdoor or degrade model
Supply chain: malicious pre-trained weights (pickle injection)
Inference phase:
Model inversion: recover training data from model predictions
Membership inference:determine if a specific sample was in training set
Adversarial examples:perturb input → wrong prediction
Model extraction: clone model via API queries
Deployment phase:
Prompt injection: (see lab 13)
Evasion: craft inputs that evade detection
Sponge attacks: maximise compute/latency
Step 1: Membership Inference Attack
📸 Verified Output:
💡 Regularisation is a privacy defence! A smaller train/test gap means the model memorises less, making membership inference harder.
import pickle, io, subprocess
class PickleSecurityAudit:
"""
Demonstrate and detect malicious pickle deserialization.
Pickle is Python's serialization format — widely used for:
- Saving sklearn models (joblib/pickle)
- Sending models via API
- Caching ML pipelines
DANGER: pickle.loads() executes arbitrary code!
A malicious model file can run system commands when loaded.
This is the ML supply chain attack: attacker uploads poisoned model to HuggingFace,
PyPI, or shared storage. Victim loads it → RCE.
"""
def create_safe_model(self) -> bytes:
"""Legitimate model serialisation"""
from sklearn.linear_model import LogisticRegression
model = LogisticRegression()
return pickle.dumps(model)
def create_malicious_pickle(self, command: str = "echo PWNED") -> bytes:
"""
Craft malicious pickle that executes command on load.
NOTE: This only runs 'echo' — harmless demo.
"""
class MaliciousPayload:
def __reduce__(self):
return (subprocess.check_output, (command.split(),))
return pickle.dumps(MaliciousPayload())
def scan_pickle(self, data: bytes) -> dict:
"""Static analysis of pickle bytecode — detect dangerous opcodes"""
DANGEROUS_OPCODES = {
b'c': 'GLOBAL (import + call)',
b'R': 'REDUCE (call callable)',
b'i': 'INST (instantiate)',
b'o': 'OBJ (build object)',
}
# Scan for suspicious module imports
SUSPICIOUS_MODULES = [b'subprocess', b'os', b'sys', b'eval', b'exec',
b'builtins', b'__import__', b'commands']
findings = []
for module in SUSPICIOUS_MODULES:
if module in data:
findings.append(f"Suspicious module: {module.decode()}")
# Count GLOBAL opcodes (each one is a potential RCE)
n_global = data.count(b'c')
risk = 'CRITICAL' if findings else 'MEDIUM' if n_global > 5 else 'LOW'
return {'risk': risk, 'findings': findings, 'n_global_ops': n_global}
audit = PickleSecurityAudit()
safe_data = audit.create_safe_model()
malicious_data = audit.create_malicious_pickle("echo PWNED_BY_PICKLE")
print("Pickle Security Audit:\n")
for name, data in [("Legitimate model", safe_data), ("Malicious payload", malicious_data)]:
result = audit.scan_pickle(data)
print(f" {name}:")
print(f" Risk: {result['risk']}")
print(f" Findings: {result['findings']}")
print(f" GLOBAL ops: {result['n_global_ops']}")
print()
print("Defence: use safetensors format (not pickle) for ML model distribution!")
print("Never run pickle.loads() on untrusted model files.")
Pickle Security Audit:
Legitimate model:
Risk: LOW
Findings: []
GLOBAL ops: 2
Malicious payload:
Risk: CRITICAL
Findings: ['Suspicious module: subprocess']
GLOBAL ops: 3
Defence: use safetensors format (not pickle) for ML model distribution!
Never run pickle.loads() on untrusted model files.
import numpy as np, time, json
from sklearn.ensemble import GradientBoostingClassifier, IsolationForest
from sklearn.metrics import roc_auc_score
import warnings; warnings.filterwarnings('ignore')
class AISecurityAuditor:
"""
Automated AI security audit framework.
Runs a full battery of tests on an ML model and generates a report.
"""
def __init__(self, model, X_tr, y_tr, X_te, y_te, model_name: str = "Target Model"):
self.model = model; self.X_tr = X_tr; self.y_tr = y_tr
self.X_te = X_te; self.y_te = y_te; self.name = model_name
self.findings = []; self.score = 100
def _deduct(self, points: int, severity: str, finding: str):
self.findings.append({'severity': severity, 'finding': finding, 'deduction': points})
self.score -= points
def test_overfitting(self):
tr_auc = roc_auc_score(self.y_tr, self.model.predict_proba(self.X_tr)[:,1])
te_auc = roc_auc_score(self.y_te, self.model.predict_proba(self.X_te)[:,1])
gap = tr_auc - te_auc
if gap > 0.1:
self._deduct(20, 'HIGH', f"Severe overfitting: train/test AUC gap={gap:.3f} (MIA vulnerable)")
elif gap > 0.05:
self._deduct(10, 'MEDIUM', f"Moderate overfitting: gap={gap:.3f}")
return {'train_auc': round(tr_auc,4), 'test_auc': round(te_auc,4), 'gap': round(gap,4)}
def test_adversarial_robustness(self, epsilon: float = 0.1):
"""FGSM-style perturbation test"""
n_flipped = 0
probs_clean = self.model.predict_proba(self.X_te[:200])
for _ in range(5): # 5 random perturbations
noise = np.random.normal(0, epsilon, self.X_te[:200].shape)
probs_perturbed = self.model.predict_proba(self.X_te[:200] + noise)
n_flipped += ((probs_clean.argmax(1) != probs_perturbed.argmax(1))).sum()
flip_rate = n_flipped / (200 * 5)
if flip_rate > 0.15:
self._deduct(15, 'HIGH', f"High adversarial sensitivity: {flip_rate:.1%} predictions flipped at ε={epsilon}")
elif flip_rate > 0.05:
self._deduct(8, 'MEDIUM', f"Moderate adversarial sensitivity: {flip_rate:.1%}")
return {'flip_rate': round(flip_rate, 4)}
def test_serialisation(self):
import pickle; data = pickle.dumps(self.model)
scan = PickleSecurityAudit().scan_pickle(data)
if scan['risk'] == 'CRITICAL':
self._deduct(25, 'CRITICAL', f"Dangerous serialisation: {scan['findings']}")
return scan
def test_data_leakage(self):
"""Check if model leaks training data via confidence"""
tr_conf = self.model.predict_proba(self.X_tr).max(1).mean()
te_conf = self.model.predict_proba(self.X_te).max(1).mean()
if tr_conf - te_conf > 0.1:
self._deduct(10, 'MEDIUM', f"Confidence gap suggests memorisation: train={tr_conf:.3f} test={te_conf:.3f}")
return {'train_confidence': round(tr_conf,4), 'test_confidence': round(te_conf,4)}
def run_full_audit(self) -> dict:
print(f"=== AI Security Audit: {self.name} ===\n")
results = {
'overfitting': self.test_overfitting(),
'adversarial': self.test_adversarial_robustness(),
'serialisation': self.test_serialisation(),
'data_leakage': self.test_data_leakage(),
}
print(f"Findings ({len(self.findings)} issues):")
for f in self.findings:
icon = {'CRITICAL':'🔴','HIGH':'🟠','MEDIUM':'🟡','LOW':'🟢'}.get(f['severity'],'⚪')
print(f" {icon} [{f['severity']:<8}] -{f['deduction']:>2}pts {f['finding'][:70]}")
grade = 'A' if self.score>=90 else 'B' if self.score>=80 else 'C' if self.score>=70 else 'D' if self.score>=60 else 'F'
print(f"\nSecurity Score: {self.score}/100 Grade: {grade}")
return {'score': self.score, 'grade': grade, 'findings': self.findings}
# Audit the overfit model
auditor = AISecurityAuditor(overfit_model, X_tr, y_tr, X_te, y_te, "Intrusion Detector v1")
report = auditor.run_full_audit()
=== AI Security Audit: Intrusion Detector v1 ===
Findings (3 issues):
🟠 [HIGH ] -20pts Severe overfitting: train/test AUC gap=0.088 (MIA vulnerable)
🟡 [MEDIUM ] -8pts Moderate adversarial sensitivity: 8.4% predictions flipped at ε=0.1
🟡 [MEDIUM ] -10pts Confidence gap suggests memorisation: train=0.991 test=0.879
Security Score: 62/100 Grade: D