Copy import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import warnings; warnings.filterwarnings('ignore')
class ThreatIntelKnowledgeGraph:
"""
Knowledge graph linking threat actors, TTPs, IOCs, and campaigns.
Nodes: threat_actors, ttps, iocs, campaigns
Edges: uses, attributed_to, shares_infrastructure
"""
def __init__(self):
self.entities = {} # name → {type, features}
self.relations = [] # (src, relation, dst)
self.embeddings = {} # node → embedding vector
def add_entity(self, name: str, entity_type: str, features: dict):
self.entities[name] = {'type': entity_type, 'features': features}
def add_relation(self, src: str, relation: str, dst: str):
self.relations.append((src, relation, dst))
def compute_embeddings(self, dim: int = 32):
"""Compute entity embeddings via TransE-style approach"""
np.random.seed(42)
for name, info in self.entities.items():
# Base embedding from entity type
type_embed = {'threat_actor': 0, 'ttp': 1, 'ioc': 2, 'campaign': 3}
base = np.zeros(dim)
base[type_embed.get(info['type'], 0) * (dim//4)] = 1.0
# Add feature noise
feat_vec = np.array(list(info['features'].values()))[:dim//2]
if len(feat_vec) < dim//2:
feat_vec = np.pad(feat_vec, (0, dim//2 - len(feat_vec)))
base[:len(feat_vec)] += feat_vec * 0.3
# Random component (simulates learned embedding)
self.embeddings[name] = base + np.random.randn(dim) * 0.1
# Normalise
for name in self.embeddings:
n = np.linalg.norm(self.embeddings[name])
self.embeddings[name] /= (n + 1e-8)
def find_similar_actors(self, actor: str, top_k: int = 3) -> list:
emb = self.embeddings[actor].reshape(1, -1)
actors = [(n, e) for n, e in self.embeddings.items()
if self.entities[n]['type'] == 'threat_actor' and n != actor]
sims = [(n, float(cosine_similarity(emb, e.reshape(1,-1))[0,0]))
for n, e in actors]
return sorted(sims, key=lambda x: x[1], reverse=True)[:top_k]
def attribute_campaign(self, campaign_iocs: list) -> dict:
"""Given IOCs from a campaign, attribute to most likely threat actor"""
scores = {}
for actor, info in self.entities.items():
if info['type'] != 'threat_actor': continue
# Count IOC matches via relations
actor_iocs = [dst for src, rel, dst in self.relations
if src == actor and rel == 'uses'
and self.entities.get(dst, {}).get('type') == 'ioc']
matches = len(set(campaign_iocs) & set(actor_iocs))
scores[actor] = matches / (len(campaign_iocs) + 1e-8)
return dict(sorted(scores.items(), key=lambda x: x[1], reverse=True))
# Build threat intelligence knowledge graph
kg = ThreatIntelKnowledgeGraph()
# Threat actors
for actor, feats in [
("APT29", {'sophistication': 5, 'persistence': 5, 'stealth': 5, 'targets_govt': 1}),
("APT28", {'sophistication': 4, 'persistence': 4, 'stealth': 3, 'targets_govt': 1}),
("Lazarus", {'sophistication': 4, 'persistence': 3, 'stealth': 3, 'targets_govt': 0}),
("FIN7", {'sophistication': 3, 'persistence': 4, 'stealth': 3, 'targets_govt': 0}),
("Carbanak", {'sophistication': 3, 'persistence': 3, 'stealth': 2, 'targets_govt': 0}),
]:
kg.add_entity(actor, 'threat_actor', feats)
# TTPs (MITRE ATT&CK)
for ttp in ['T1059', 'T1078', 'T1055', 'T1547', 'T1021', 'T1003']:
kg.add_entity(ttp, 'ttp', {'phase': hash(ttp) % 5})
# IOCs
for ioc in [f"ioc_{i:03d}" for i in range(20)]:
kg.add_entity(ioc, 'ioc', {'type': hash(ioc) % 4})
# Relations
actor_ttp_map = {
'APT29': ['T1059', 'T1078', 'T1055', 'T1547'],
'APT28': ['T1059', 'T1021', 'T1003', 'T1078'],
'Lazarus': ['T1055', 'T1021', 'T1059'],
'FIN7': ['T1078', 'T1003', 'T1021'],
'Carbanak': ['T1078', 'T1059', 'T1003'],
}
actor_ioc_map = {
'APT29': [f'ioc_{i:03d}' for i in range(0, 8)],
'APT28': [f'ioc_{i:03d}' for i in range(4, 12)],
'Lazarus': [f'ioc_{i:03d}' for i in range(8, 15)],
'FIN7': [f'ioc_{i:03d}' for i in range(12, 18)],
'Carbanak': [f'ioc_{i:03d}' for i in range(10, 17)],
}
for actor, ttps in actor_ttp_map.items():
for ttp in ttps: kg.add_relation(actor, 'uses', ttp)
for actor, iocs in actor_ioc_map.items():
for ioc in iocs: kg.add_relation(actor, 'uses', ioc)
kg.compute_embeddings(dim=32)
# Attribution test
campaign_iocs = ['ioc_000', 'ioc_002', 'ioc_005', 'ioc_007'] # APT29-like
attribution = kg.attribute_campaign(campaign_iocs)
similar = kg.find_similar_actors('APT29', top_k=3)
print("=== Threat Actor Attribution ===\n")
print(f"Incident IOCs: {campaign_iocs}")
print(f"\nAttribution scores:")
for actor, score in list(attribution.items())[:4]:
bar = "█" * int(score * 40)
print(f" {actor:<12}: {score:.3f} {bar}")
print(f"\nActors most similar to APT29 (embedding similarity):")
for actor, sim in similar:
print(f" {actor:<12}: {sim:.4f}")