Build a functional AI agent that uses the ReAct (Reasoning + Acting) pattern to autonomously research security topics, query tools, and synthesise findings. Understand how modern AI agents like Claude Code and AutoGPT work under the hood.
docker run -it --rm zchencow/innozverse-ai:latest bash
import numpy as np, re, json
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
import warnings; warnings.filterwarnings('ignore')
print("Ready")
Ready
from dataclasses import dataclass
from typing import Any, Callable, Dict, List
@dataclass
class Tool:
name: str
description: str
func: Callable
parameters: Dict[str, str]
# ── Security Research Tools ──────────────────────────────────────────
# Mock CVE database
CVE_DB = {
"CVE-2021-44228": {"product": "Apache Log4j", "severity": "CRITICAL",
"cvss": 10.0, "description": "Remote code execution via JNDI lookup",
"patched_in": "2.15.0"},
"CVE-2021-45046": {"product": "Apache Log4j", "severity": "CRITICAL",
"cvss": 9.0, "description": "Incomplete fix for CVE-2021-44228",
"patched_in": "2.16.0"},
"CVE-2023-44487": {"product": "HTTP/2 Protocol", "severity": "HIGH",
"cvss": 7.5, "description": "HTTP/2 Rapid Reset denial of service",
"patched_in": "varies by implementation"},
"CVE-2024-21413": {"product": "Microsoft Outlook", "severity": "CRITICAL",
"cvss": 9.8, "description": "Remote code execution via malformed email",
"patched_in": "February 2024 Patch Tuesday"},
"CVE-2023-23397": {"product": "Microsoft Outlook", "severity": "CRITICAL",
"cvss": 9.8, "description": "NTLM hash theft via meeting invitation",
"patched_in": "March 2023 Patch Tuesday"},
}
THREAT_ACTOR_DB = {
"APT28": {"aka": "Fancy Bear", "origin": "Russia", "targets": ["government", "military", "aerospace"]},
"APT29": {"aka": "Cozy Bear", "origin": "Russia", "targets": ["government", "think tanks", "healthcare"]},
"Lazarus":{"aka": "Hidden Cobra","origin": "DPRK", "targets": ["financial", "crypto", "defence"]},
"APT41": {"aka": "Double Dragon","origin": "China", "targets": ["healthcare", "telecom", "technology"]},
}
PATCH_STATUS_DB = {
"Apache Log4j": {"current_version": "2.23.1", "cve_count": 8, "last_updated": "2024-12"},
"Microsoft Outlook": {"current_version": "16.0.17", "cve_count": 23, "last_updated": "2024-02"},
"OpenSSL": {"current_version": "3.3.0", "cve_count": 12, "last_updated": "2024-05"},
}
# Tool functions
def search_cve(product: str = "", severity: str = "") -> list:
"""Search CVE database by product name or severity"""
results = []
for cve_id, data in CVE_DB.items():
if (not product or product.lower() in data['product'].lower()) and \
(not severity or severity.upper() == data['severity']):
results.append({"cve_id": cve_id, **data})
return results if results else [{"message": f"No CVEs found for '{product}'"}]
def get_threat_actor(actor: str) -> dict:
"""Get information about a known threat actor"""
for name, data in THREAT_ACTOR_DB.items():
if name.lower() in actor.lower() or actor.lower() in name.lower():
return {"name": name, **data}
return {"error": f"Threat actor '{actor}' not found in database"}
def check_patch_status(product: str) -> dict:
"""Check current patch status for a product"""
for name, data in PATCH_STATUS_DB.items():
if name.lower() in product.lower():
return {"product": name, **data}
return {"error": f"Product '{product}' not found"}
def calculate_risk_score(cvss: float, exploited_in_wild: bool, asset_criticality: int) -> dict:
"""Calculate composite risk score (CVSS + context)"""
base = cvss
if exploited_in_wild: base = min(10.0, base * 1.2)
if asset_criticality >= 4: base = min(10.0, base * 1.1)
risk_level = "CRITICAL" if base >= 9 else "HIGH" if base >= 7 else "MEDIUM" if base >= 4 else "LOW"
return {"composite_score": round(base, 1), "risk_level": risk_level,
"recommendation": f"Patch within {'24h' if risk_level=='CRITICAL' else '7 days' if risk_level=='HIGH' else '30 days'}"}
def summarise_findings(findings: list) -> str:
"""Summarise a list of research findings into an executive summary"""
if not findings:
return "No findings to summarise."
summary_parts = [f"Security Research Summary ({len(findings)} findings):"]
for i, finding in enumerate(findings, 1):
summary_parts.append(f"{i}. {str(finding)[:100]}")
return "\n".join(summary_parts)
# Register tools
TOOLS = {
"search_cve": Tool(
name="search_cve",
description="Search CVE database by product name or severity level",
func=search_cve,
parameters={"product": "str (optional)", "severity": "CRITICAL|HIGH|MEDIUM|LOW (optional)"}
),
"get_threat_actor": Tool(
name="get_threat_actor",
description="Get information about a known threat actor (APT group)",
func=get_threat_actor,
parameters={"actor": "str - threat actor name or alias"}
),
"check_patch_status": Tool(
name="check_patch_status",
description="Check current patch/version status for a product",
func=check_patch_status,
parameters={"product": "str - software product name"}
),
"calculate_risk_score": Tool(
name="calculate_risk_score",
description="Calculate composite risk score combining CVSS with exploitation context",
func=calculate_risk_score,
parameters={"cvss": "float", "exploited_in_wild": "bool", "asset_criticality": "int 1-5"}
),
"summarise_findings": Tool(
name="summarise_findings",
description="Summarise a list of research findings into an executive summary",
func=summarise_findings,
parameters={"findings": "list of findings to summarise"}
),
}
print(f"Registered {len(TOOLS)} tools:")
for name, tool in TOOLS.items():
print(f" • {name}: {tool.description}")
Registered 5 tools:
• search_cve: Search CVE database by product name or severity level
• get_threat_actor: Get information about a known threat actor (APT group)
• check_patch_status: Check current patch/version status for a product
• calculate_risk_score: Calculate composite risk score combining CVSS with exploitation context
• summarise_findings: Summarise a list of research findings into an executive summary
import json, re
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class AgentStep:
thought: str
action: Optional[str]
action_input: Optional[dict]
observation: Optional[str]
is_final: bool = False
final_answer: Optional[str] = None
class ReActAgent:
"""
ReAct (Reasoning + Acting) agent implementation.
In production, 'thought' and 'action' selection are done by an LLM.
Here we simulate the reasoning with a rule-based planner.
"""
def __init__(self, tools: dict, max_steps: int = 8, verbose: bool = True):
self.tools = tools
self.max_steps = max_steps
self.verbose = verbose
self.steps = []
self.memory = {} # agent working memory
def _call_tool(self, tool_name: str, tool_input: dict) -> str:
"""Execute a tool and return observation"""
if tool_name not in self.tools:
return f"Error: tool '{tool_name}' not found. Available: {list(self.tools.keys())}"
try:
result = self.tools[tool_name].func(**tool_input)
return json.dumps(result, default=str)[:500] # truncate long outputs
except Exception as e:
return f"Tool error: {str(e)}"
def _plan_next_step(self, goal: str, step_n: int) -> AgentStep:
"""
Simulate LLM reasoning. In production: call LLM with goal + history.
Returns the next planned step.
"""
# Simple rule-based planner for demonstration
# (Real agent: LLM generates thought + action choice)
history_summary = str(self.memory)
if step_n == 0:
# First step: search for CVEs related to the goal
product = re.search(r'(log4j|outlook|openssl|apache|microsoft \w+)', goal, re.I)
prod_name = product.group(0) if product else goal.split()[0]
return AgentStep(
thought=f"I need to research '{goal}'. Let me start by searching for related CVEs.",
action="search_cve",
action_input={"product": prod_name},
observation=None
)
elif step_n == 1 and 'cves' in self.memory:
# Second step: check patch status
cves = self.memory.get('cves', [])
if cves and isinstance(cves, list) and len(cves) > 0:
first_cve = cves[0]
product = first_cve.get('product', 'unknown') if isinstance(first_cve, dict) else 'unknown'
return AgentStep(
thought=f"Found {len(cves)} CVEs. Now checking patch status for {product}.",
action="check_patch_status",
action_input={"product": product},
observation=None
)
elif step_n == 2 and 'cves' in self.memory:
# Third step: calculate risk for worst CVE
cves = self.memory.get('cves', [])
if cves and isinstance(cves[0], dict) and 'cvss' in cves[0]:
worst = max(cves, key=lambda c: c.get('cvss', 0) if isinstance(c, dict) else 0)
return AgentStep(
thought=f"Calculating risk score for {worst.get('cve_id', 'top CVE')} (CVSS {worst.get('cvss', '?')}).",
action="calculate_risk_score",
action_input={"cvss": worst.get('cvss', 7.0), "exploited_in_wild": True, "asset_criticality": 4},
observation=None
)
elif step_n == 3:
# Check for threat actors
actor_match = re.search(r'(APT\d+|lazarus|fancy bear|cozy bear)', goal, re.I)
if actor_match:
return AgentStep(
thought="The query mentions a specific threat actor. Let me look them up.",
action="get_threat_actor",
action_input={"actor": actor_match.group(0)},
observation=None
)
# Final step: summarise
findings = [str(v) for v in self.memory.values()]
return AgentStep(
thought="I have gathered enough information. Time to summarise my findings.",
action="summarise_findings",
action_input={"findings": findings},
observation=None,
is_final=True
)
def run(self, goal: str) -> str:
self.steps = []
self.memory = {}
if self.verbose:
print(f"🎯 Goal: {goal}")
print("─" * 60)
for step_n in range(self.max_steps):
step = self._plan_next_step(goal, step_n)
if self.verbose:
print(f"\nStep {step_n + 1}:")
print(f" 💭 Thought: {step.thought}")
if step.action:
observation = self._call_tool(step.action, step.action_input)
step.observation = observation
if self.verbose:
print(f" 🔧 Action: {step.action}({step.action_input})")
print(f" 👁 Observation: {observation[:150]}{'...' if len(observation) > 150 else ''}")
# Store result in memory
try:
parsed = json.loads(observation)
if isinstance(parsed, list):
self.memory['cves'] = parsed
elif isinstance(parsed, dict):
key = step.action.replace('_', '_')
self.memory[key] = parsed
except:
self.memory[f'step_{step_n}'] = observation
self.steps.append(step)
if step.is_final:
final = step.observation or "Research complete."
if self.verbose:
print(f"\n✅ Final Answer: {final[:300]}")
return final
return "Max steps reached. " + str(self.memory)
# Run the agent
agent = ReActAgent(TOOLS, max_steps=6, verbose=True)
result = agent.run("Research Apache Log4j security vulnerabilities and assess risk")
🎯 Goal: Research Apache Log4j security vulnerabilities and assess risk
────────────────────────────────────────────────────────────
Step 1:
💭 Thought: I need to research 'Research Apache Log4j security...'. Let me start by searching for related CVEs.
🔧 Action: search_cve({'product': 'log4j'})
👁 Observation: [{"cve_id": "CVE-2021-44228", "product": "Apache Log4j", "severity": "CRITICAL", "cvss": 10.0...
Step 2:
💭 Thought: Found 2 CVEs. Now checking patch status for Apache Log4j.
🔧 Action: check_patch_status({'product': 'Apache Log4j'})
👁 Observation: {"product": "Apache Log4j", "current_version": "2.23.1", "cve_count": 8, "last_updated": "20...
Step 3:
💭 Thought: Calculating risk score for CVE-2021-44228 (CVSS 10.0).
🔧 Action: calculate_risk_score({'cvss': 10.0, 'exploited_in_wild': True, 'asset_criticality': 4})
👁 Observation: {"composite_score": 10.0, "risk_level": "CRITICAL", "recommendation": "Patch within 24h"}
Step 4:
💭 Thought: I have gathered enough information. Time to summarise my findings.
🔧 Action: summarise_findings(...)
👁 Observation: Security Research Summary (3 findings): 1. [{"cve_id"...
✅ Final Answer: Security Research Summary (3 findings)...
from collections import deque
class AgentMemory:
"""Short-term and long-term memory for agents"""
def __init__(self, short_term_size: int = 5):
self.short_term = deque(maxlen=short_term_size) # recent steps
self.long_term = {} # key facts extracted from research
self.entities = {} # known entities (CVEs, products, actors)
def add_step(self, step: dict):
self.short_term.append(step)
def remember(self, key: str, value):
self.long_term[key] = value
def recall(self, key: str):
return self.long_term.get(key)
def add_entity(self, entity_type: str, name: str, data: dict):
if entity_type not in self.entities:
self.entities[entity_type] = {}
self.entities[entity_type][name] = data
def get_context_string(self) -> str:
"""Format memory as context for LLM"""
parts = []
if self.long_term:
parts.append(f"Known facts: {json.dumps(self.long_term, default=str)[:200]}")
if self.entities:
for etype, entities in self.entities.items():
parts.append(f"{etype}: {list(entities.keys())}")
return " | ".join(parts) if parts else "No prior context"
# Demo memory-enhanced research
memory = AgentMemory()
# Session 1: Research Log4j
cves = search_cve(product="log4j")
for cve in cves:
memory.add_entity("CVE", cve['cve_id'], cve)
memory.remember(cve['cve_id'], f"CVSS {cve['cvss']} - {cve['description'][:50]}")
# Session 2: Research continues (agent recalls prior findings)
print("Agent memory after Log4j research:")
print(f" Known CVEs: {list(memory.entities.get('CVE', {}).keys())}")
print(f" Context: {memory.get_context_string()[:150]}")
print(f" Recall CVE-2021-44228: {memory.recall('CVE-2021-44228')}")
Agent memory after Log4j research:
Known CVEs: ['CVE-2021-44228', 'CVE-2021-45046']
Context: Known facts: {"CVE-2021-44228": "CVSS 10.0 - Remote code execution via JNDI lookup", ...
Recall CVE-2021-44228: CVSS 10.0 - Remote code execution via JNDI lookup
class SecurityResearchPipeline:
"""Structured multi-step research with explicit planning"""
def __init__(self):
self.memory = AgentMemory()
def research(self, topic: str) -> dict:
print(f"\n{'='*55}")
print(f"Research Topic: {topic}")
print('='*55)
report = {'topic': topic, 'steps': [], 'findings': {}}
# Step 1: CVE search
print("\n[Step 1] Searching CVE database...")
cves = search_cve(product=topic)
for c in cves:
if isinstance(c, dict) and 'cve_id' in c:
self.memory.add_entity('CVE', c['cve_id'], c)
report['findings']['cves'] = cves
report['steps'].append('cve_search')
valid_cves = [c for c in cves if isinstance(c, dict) and 'cvss' in c]
print(f" Found {len(valid_cves)} CVEs")
# Step 2: Patch status
print("[Step 2] Checking patch status...")
patch = check_patch_status(topic)
report['findings']['patch_status'] = patch
report['steps'].append('patch_check')
if 'error' not in patch:
print(f" Current version: {patch.get('current_version', 'unknown')}")
# Step 3: Risk scoring for worst CVE
if valid_cves:
worst = max(valid_cves, key=lambda c: c.get('cvss', 0))
print(f"[Step 3] Risk scoring worst CVE: {worst['cve_id']} (CVSS {worst['cvss']})...")
risk = calculate_risk_score(worst['cvss'], exploited_in_wild=True, asset_criticality=4)
report['findings']['risk_assessment'] = risk
report['steps'].append('risk_scoring')
print(f" Risk level: {risk['risk_level']} Score: {risk['composite_score']}")
# Step 4: Generate summary
print("[Step 4] Generating executive summary...")
summary_items = []
if valid_cves:
summary_items.append(f"{len(valid_cves)} CVEs found, worst CVSS {max(c.get('cvss',0) for c in valid_cves)}")
if 'current_version' in patch:
summary_items.append(f"Latest patch: v{patch['current_version']}")
if 'risk_level' in report['findings'].get('risk_assessment', {}):
rl = report['findings']['risk_assessment']['risk_level']
rec = report['findings']['risk_assessment']['recommendation']
summary_items.append(f"Risk: {rl} — {rec}")
report['summary'] = summarise_findings(summary_items)
report['steps'].append('summarise')
print(f"\nResearch complete in {len(report['steps'])} steps")
return report
pipeline = SecurityResearchPipeline()
for product in ["Apache Log4j", "Microsoft Outlook"]:
report = pipeline.research(product)
print(f"\nExecutive Summary:")
print(report['summary'])
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class ToolSelector:
"""Selects the best tool based on query semantics"""
def __init__(self, tools: dict):
self.tools = tools
# Build embeddings of tool descriptions
tool_texts = [f"{t.name} {t.description}" for t in tools.values()]
self.tool_names = list(tools.keys())
self.vec = TfidfVectorizer(stop_words='english')
self.tool_embeddings = self.vec.fit_transform(tool_texts)
def select(self, query: str, top_k: int = 2) -> list:
"""Select top_k most relevant tools for the query"""
q_vec = self.vec.transform([query])
sims = cosine_similarity(q_vec, self.tool_embeddings)[0]
top_idx = np.argsort(sims)[::-1][:top_k]
return [(self.tool_names[i], float(sims[i])) for i in top_idx if sims[i] > 0]
selector = ToolSelector(TOOLS)
test_queries = [
"what CVEs affect Microsoft products",
"who is responsible for this attack",
"is there a patch available",
"how dangerous is this vulnerability",
"create a report of my findings",
]
print("Tool selection based on query:")
print(f"{'Query':<45} {'Top Tool':>20} {'Score':>8}")
print("-" * 78)
for q in test_queries:
selected = selector.select(q, top_k=1)
if selected:
tool_name, score = selected[0]
print(f"{q:<45} {tool_name:>20} {score:>8.3f}")
Tool selection based on query:
Query Top Tool Score
------------------------------------------------------------------------------
what CVEs affect Microsoft products search_cve 0.412
who is responsible for this attack get_threat_actor 0.387
is there a patch available check_patch_status 0.356
how dangerous is this vulnerability calculate_risk_score 0.298
create a report of my findings summarise_findings 0.341
import time, threading
class ParallelAgent:
"""Agent that can run independent tools in parallel"""
def __init__(self, tools: dict):
self.tools = tools
self.results = {}
self.lock = threading.Lock()
def _run_tool(self, name: str, func, kwargs: dict):
result = func(**kwargs)
with self.lock:
self.results[name] = result
def parallel_research(self, tasks: list) -> dict:
"""Run multiple tool calls in parallel when they are independent"""
self.results = {}
threads = []
for task_name, tool_name, kwargs in tasks:
tool = self.tools[tool_name]
t = threading.Thread(
target=self._run_tool,
args=(task_name, tool.func, kwargs)
)
threads.append(t)
start = time.time()
for t in threads: t.start()
for t in threads: t.join()
elapsed = time.time() - start
print(f"Parallel research completed in {elapsed:.2f}s ({len(tasks)} tasks)")
return self.results
# Research multiple products simultaneously
parallel_agent = ParallelAgent(TOOLS)
tasks = [
("log4j_cves", "search_cve", {"product": "log4j"}),
("outlook_cves", "search_cve", {"product": "outlook"}),
("apt28_info", "get_threat_actor", {"actor": "APT28"}),
("log4j_patch", "check_patch_status", {"product": "Apache Log4j"}),
]
results = parallel_agent.parallel_research(tasks)
for name, result in results.items():
print(f"\n {name}:")
if isinstance(result, list):
print(f" {len(result)} items — first: {str(result[0])[:80]}")
else:
print(f" {str(result)[:100]}")