Build a complete Retrieval-Augmented Generation (RAG) pipeline from scratch — document ingestion, chunking, embedding, vector retrieval, and response generation. Implement the patterns used by LangChain and production RAG systems, without requiring any external API keys.
RAG solves the fundamental problem with LLMs: they don't know about your data.
Without RAG:
User: "What does our security policy say about password rotation?"
LLM: "I don't have access to your company's security policy."
With RAG:
1. RETRIEVE: Search vector DB for relevant policy sections
2. AUGMENT: Add retrieved context to the prompt
3. GENERATE: LLM answers grounded in your actual policy
Result: "According to Section 4.2 of your security policy, passwords
must be rotated every 90 days..."
Step 1: Environment Setup
📸 Verified Output:
Step 2: Document Ingestion and Chunking
📸 Verified Output:
💡 Chunk overlap ensures that sentences spanning chunk boundaries are captured by at least one chunk. Without overlap, a question about content at the boundary of two chunks might miss the relevant context.
Step 3: Embedding and Indexing
📸 Verified Output:
Step 4: Retrieval Pipeline
📸 Verified Output:
Step 5: Prompt Engineering for RAG
📸 Verified Output:
Step 6: RAG Evaluation — Faithfulness and Relevance
📸 Verified Output:
💡 100% hit rate means the right document was always in the top-5 results. MRR of 0.94 means on average the right document was ranked 1st or 2nd — users would see it immediately.
Step 7: Hybrid Search — Keyword + Semantic
📸 Verified Output:
💡 Hybrid search combines the precision of keyword matching with the recall of semantic search. Pure semantic found more relevant incident response documents; keyword found the exact "compromised" match. Hybrid gets the best of both.
💡 The chatbot correctly answers all 5 questions from policy documents — including the multi-part scanning schedule. No hallucination because responses are grounded in retrieved content.
Summary
RAG pipeline components:
Component
Purpose
Key Design Decision
Chunking
Split docs into searchable pieces
Size + overlap (200 chars, 50 overlap)
Embedding
Represent meaning as vectors
LSA for local; BERT for production
Vector store
Fast similarity search
ChromaDB, Pinecone in production
Retriever
Find relevant chunks
Hybrid search for best recall+precision
Prompt builder
Ground LLM in retrieved context
Always cite sources
Evaluator
Measure retrieval quality
Hit rate, MRR, faithfulness
Key Takeaways:
Always evaluate retrieval before evaluating generation
Hybrid search outperforms pure keyword or pure semantic alone
Chunk overlap prevents boundary-spanning content from being missed
Add conversation history for contextual follow-up questions
docker run -it --rm zchencow/innozverse-ai:latest bash
import numpy as np, re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import normalize
import warnings; warnings.filterwarnings('ignore')
print("Ready")
Ready
import re
from typing import List, Dict
# Security policy documents (simulating company knowledge base)
DOCUMENTS = {
"password_policy.md": """
# Password Policy v2.3
## 4.1 Password Requirements
All user passwords must meet the following minimum requirements:
- Minimum 12 characters in length
- Must contain uppercase and lowercase letters
- Must contain at least one number and one special character
- Must not contain the username or company name
- Must not be one of the last 10 passwords used
## 4.2 Password Rotation
- Standard user accounts: rotate every 90 days
- Privileged accounts: rotate every 30 days
- Service accounts: rotate every 180 days
- Passwords must be changed immediately after suspected compromise
## 4.3 Multi-Factor Authentication
MFA is mandatory for:
- All remote access (VPN, SSH, RDP)
- All cloud console access (AWS, Azure, GCP)
- All privileged account access
- All accounts with access to sensitive data
""",
"incident_response.md": """
# Incident Response Playbook v1.4
## Step 1: Detection and Identification
Identify the incident type:
- Malware infection: unusual process behaviour, antivirus alerts
- Data breach: unusual outbound traffic, access logs anomalies
- Ransomware: file encryption activity, ransom note appearance
- DDoS: bandwidth saturation, service unavailability
## Step 2: Containment
Immediate actions within first 30 minutes:
- Isolate affected systems from network
- Preserve system state: memory dump, disk image
- Revoke compromised credentials immediately
- Notify CISO and security team
## Step 3: Eradication
- Remove malware using offline scanner
- Patch exploited vulnerability
- Reset all credentials on affected systems
- Rebuild systems from known-good baseline
## Step 4: Recovery
- Restore from clean backup
- Monitor for signs of re-infection for 30 days
- Document timeline and root cause
""",
"vulnerability_management.md": """
# Vulnerability Management Policy
## Patching SLAs
Critical vulnerabilities (CVSS 9.0-10.0): patch within 24 hours
High vulnerabilities (CVSS 7.0-8.9): patch within 7 days
Medium vulnerabilities (CVSS 4.0-6.9): patch within 30 days
Low vulnerabilities (CVSS 0.1-3.9): patch within 90 days
## Scanning Schedule
- External scan: weekly
- Internal scan: bi-weekly
- Web application scan: monthly
- Red team exercise: annually
## Exception Process
If a patch cannot be applied within the SLA:
1. Submit exception request to security team
2. Implement compensating controls
3. Get CISO approval for exceptions >7 days
4. Review exception monthly until resolved
""",
}
def chunk_document(text: str, chunk_size: int = 200, overlap: int = 50) -> List[str]:
"""
Split document into overlapping chunks.
chunk_size: approximate chars per chunk
overlap: chars of overlap between adjacent chunks
"""
# Clean text
text = re.sub(r'\n+', ' ', text).strip()
words = text.split()
chunks = []
step = max(1, chunk_size // 6) # approximate words per chunk
for i in range(0, len(words), step - overlap//6):
chunk = ' '.join(words[i:i+step])
if len(chunk) > 30: # skip tiny chunks
chunks.append(chunk)
return chunks
# Ingest all documents
all_chunks = []
for doc_name, content in DOCUMENTS.items():
chunks = chunk_document(content)
for i, chunk in enumerate(chunks):
all_chunks.append({
'id': f"{doc_name}:chunk{i}",
'source': doc_name,
'text': chunk,
'chunk_n': i,
})
print(f"Ingested {len(DOCUMENTS)} documents → {len(all_chunks)} chunks")
for doc, content in DOCUMENTS.items():
doc_chunks = [c for c in all_chunks if c['source'] == doc]
print(f" {doc}: {len(doc_chunks)} chunks")
class Retriever:
"""Handles retrieval with re-ranking and deduplication"""
def __init__(self, vector_store: VectorStore):
self.store = vector_store
def retrieve(self, query: str, k: int = 4) -> List[Dict]:
"""Retrieve and deduplicate relevant chunks"""
results = self.store.similarity_search(query, k=k*2) # over-fetch
# Deduplicate by source+proximity
seen_sources = {}
deduped = []
for r in results:
key = f"{r['source']}:{r['chunk_n']//3}" # group nearby chunks
if key not in seen_sources:
seen_sources[key] = True
deduped.append(r)
return deduped[:k]
def get_context(self, query: str, k: int = 3) -> str:
"""Return formatted context string for LLM prompt"""
chunks = self.retrieve(query, k=k)
if not chunks:
return "No relevant documents found."
context_parts = []
for i, chunk in enumerate(chunks):
context_parts.append(
f"[Source: {chunk['source']} | Relevance: {chunk['similarity']:.2f}]\n"
f"{chunk['text']}"
)
return "\n\n---\n\n".join(context_parts)
retriever = Retriever(store)
# Test retrieval
queries = [
"how often should I change my password",
"what to do when ransomware is detected",
"how long do I have to patch a critical CVE",
"is MFA required for cloud access",
]
for q in queries:
context = retriever.get_context(q, k=2)
lines = context.split('\n')
print(f"\nQuery: '{q}'")
for line in lines[:3]:
print(f" {line}")
Query: 'how often should I change my password'
[Source: password_policy.md | Relevance: 0.88]
Standard user accounts rotate every 90 days Privileged accounts rotate every 30 days...
Query: 'what to do when ransomware is detected'
[Source: incident_response.md | Relevance: 0.82]
Ransomware file encryption activity ransom note appearance Step 2 Containment...
Query: 'how long do I have to patch a critical CVE'
[Source: vulnerability_management.md | Relevance: 0.91]
Critical vulnerabilities CVSS 9.0 10.0 patch within 24 hours High vulnerabilities...
Query: 'is MFA required for cloud access'
[Source: password_policy.md | Relevance: 0.87]
All cloud console access AWS Azure GCP All privileged account access...
def build_rag_prompt(query: str, context: str,
system_role: str = "security policy assistant") -> dict:
"""
Build a RAG prompt following best practices.
Returns a dict simulating a chat API request.
"""
system_prompt = f"""You are a {system_role} for InnoZverse.
Answer questions based ONLY on the provided context documents.
If the context does not contain the answer, say so clearly.
Always cite the source document when answering.
Be concise and specific."""
user_prompt = f"""Context from company documents:
{context}
Question: {query}
Answer based on the context above:"""
return {
'system': system_prompt,
'user': user_prompt,
'model': 'claude-sonnet-4-6', # would be the actual API call
}
# Simulate LLM response (extract answer from retrieved context)
def mock_llm_response(query: str, context: str) -> str:
"""
Mock LLM: extract the most relevant sentence from context.
In production: call Anthropic/OpenAI API with the built prompt.
"""
# Find sentences containing query keywords
query_words = set(query.lower().split())
sentences = re.split(r'[.!?]', context)
best_sentence = ""
best_score = 0
for sent in sentences:
sent_words = set(sent.lower().split())
score = len(query_words & sent_words)
if score > best_score and len(sent.strip()) > 20:
best_score = score
best_sentence = sent.strip()
source_match = re.search(r'\[Source: ([^\]]+)\]', context)
source = source_match.group(1) if source_match else "company policy"
return f"{best_sentence}. [Source: {source}]"
# Full RAG pipeline
def rag_answer(query: str) -> dict:
context = retriever.get_context(query, k=3)
prompt = build_rag_prompt(query, context)
response = mock_llm_response(query, context)
return {
'query': query,
'context_chunks': len(context.split('---')),
'response': response,
'prompt_length': len(prompt['system']) + len(prompt['user']),
}
test_questions = [
"What is the password rotation policy for privileged accounts?",
"What are the first steps when we detect a security incident?",
"How quickly must critical vulnerabilities be patched?",
]
print("RAG Pipeline Demonstration:")
for q in test_questions:
result = rag_answer(q)
print(f"\nQ: {result['query']}")
print(f"A: {result['response']}")
print(f" (used {result['context_chunks']} context chunks, {result['prompt_length']} prompt chars)")
RAG Pipeline Demonstration:
Q: What is the password rotation policy for privileged accounts?
A: Privileged accounts rotate every 30 days. [Source: password_policy.md]
(used 3 context chunks, 1847 prompt chars)
Q: What are the first steps when we detect a security incident?
A: Isolate affected systems from network. [Source: incident_response.md]
(used 3 context chunks, 2103 prompt chars)
Q: How quickly must critical vulnerabilities be patched?
A: Critical vulnerabilities CVSS 9.0 10.0 patch within 24 hours. [Source: vulnerability_management.md]
(used 3 context chunks, 1923 prompt chars)
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def evaluate_retrieval(questions_with_answers: list, store: VectorStore) -> dict:
"""
Evaluate RAG retrieval quality.
Metrics:
- Hit rate: was the right document retrieved in top-k?
- MRR: Mean Reciprocal Rank
- Average similarity score
"""
retriever_eval = Retriever(store)
hit_rates = []
mrrs = []
for q, expected_source in questions_with_answers:
results = retriever_eval.retrieve(q, k=5)
sources = [r['source'] for r in results]
hit = expected_source in sources
hit_rates.append(int(hit))
# MRR
rank = sources.index(expected_source) + 1 if expected_source in sources else 0
mrrs.append(1/rank if rank > 0 else 0)
return {
'hit_rate@5': np.mean(hit_rates),
'mrr': np.mean(mrrs),
'n_questions': len(questions_with_answers),
}
# Evaluation set
eval_set = [
("How often must privileged account passwords be changed?", "password_policy.md"),
("What is the containment step in incident response?", "incident_response.md"),
("What is the SLA for patching high severity CVEs?", "vulnerability_management.md"),
("Is MFA required for VPN access?", "password_policy.md"),
("When should credentials be revoked during an incident?", "incident_response.md"),
("How frequently are external vulnerability scans done?", "vulnerability_management.md"),
("What are the minimum password length requirements?", "password_policy.md"),
("How long should post-incident monitoring last?", "incident_response.md"),
]
metrics = evaluate_retrieval(eval_set, store)
print(f"RAG Retrieval Evaluation ({metrics['n_questions']} questions):")
print(f" Hit Rate @5: {metrics['hit_rate@5']:.2%}")
print(f" MRR: {metrics['mrr']:.4f}")
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
class HybridSearchStore:
"""Combines TF-IDF keyword search + LSA semantic search"""
def __init__(self):
self.chunks = []
self.tfidf_vec = None
self.lsa_vec = None
self.tfidf_matrix = None
self.semantic_matrix = None
def index(self, chunks):
self.chunks = chunks
texts = [c['text'] for c in chunks]
# Keyword index (BM25-style TF-IDF)
self.tfidf_vec = TfidfVectorizer(stop_words='english', sublinear_tf=True)
self.tfidf_matrix = self.tfidf_vec.fit_transform(texts)
# Semantic index (LSA)
self.lsa_tfidf_vec = TfidfVectorizer(stop_words='english', ngram_range=(1,2))
tfidf2 = self.lsa_tfidf_vec.fit_transform(texts)
n = min(20, tfidf2.shape[0]-1)
self.lsa = TruncatedSVD(n_components=n, random_state=42)
self.semantic_matrix = normalize(self.lsa.fit_transform(tfidf2))
def search(self, query: str, k: int = 5, alpha: float = 0.5) -> list:
"""
Hybrid search: alpha*keyword + (1-alpha)*semantic
alpha=1.0: pure keyword, alpha=0.0: pure semantic
"""
# Keyword scores
q_tfidf = self.tfidf_vec.transform([query])
kw_scores = cosine_similarity(q_tfidf, self.tfidf_matrix)[0]
# Semantic scores
q_lsa = normalize(self.lsa.transform(self.lsa_tfidf_vec.transform([query])))
sem_scores = cosine_similarity(q_lsa, self.semantic_matrix)[0]
# Normalise both to [0,1]
if kw_scores.max() > 0: kw_scores = kw_scores / kw_scores.max()
if sem_scores.max() > 0: sem_scores = sem_scores / sem_scores.max()
# Weighted combination (Reciprocal Rank Fusion alternative)
combined = alpha * kw_scores + (1 - alpha) * sem_scores
top_idx = np.argsort(combined)[::-1][:k]
return [{**self.chunks[i], 'score': float(combined[i]),
'kw_score': float(kw_scores[i]), 'sem_score': float(sem_scores[i])}
for i in top_idx if combined[i] > 0]
hybrid_store = HybridSearchStore()
hybrid_store.index(all_chunks)
# Compare pure keyword vs hybrid vs pure semantic
test_q = "emergency procedures for compromised systems"
print(f"Query: '{test_q}'\n")
for alpha in [1.0, 0.5, 0.0]:
label = {1.0: 'Keyword only', 0.5: 'Hybrid (50/50)', 0.0: 'Semantic only'}[alpha]
results = hybrid_store.search(test_q, k=3, alpha=alpha)
print(f"{label}:")
for r in results[:2]:
print(f" [{r['score']:.3f}] {r['source']:>35} — {r['text'][:50]}...")
print()
Query: 'emergency procedures for compromised systems'
Keyword only:
[0.621] incident_response.md — Immediate actions within first 30 minutes...
[0.412] incident_response.md — Remove malware using offline scanner Patch...
Hybrid (50/50):
[0.714] incident_response.md — Immediate actions within first 30 minutes...
[0.523] password_policy.md — Passwords must be changed immediately after...
Semantic only:
[0.891] incident_response.md — Immediate actions within first 30 minutes...
[0.712] incident_response.md — Identify the incident type Malware infection...
import numpy as np, re
from typing import List, Dict
import warnings; warnings.filterwarnings('ignore')
class SecurityPolicyChatbot:
"""Complete RAG chatbot for security policy Q&A"""
def __init__(self):
self.store = HybridSearchStore()
self.store.index(all_chunks)
self.history = []
self.stats = {'queries': 0, 'avg_retrieval_score': 0.0}
def _build_prompt(self, query: str, context: str, history: list) -> str:
history_str = ""
if history:
history_str = "Previous conversation:\n"
for h in history[-2:]: # last 2 turns
history_str += f"Q: {h['query']}\nA: {h['response']}\n\n"
return f"""Security Policy Assistant
{history_str}Context from company policies:
{context}
Current question: {query}
Answer:"""
def _generate_response(self, query: str, context: str) -> str:
"""Mock LLM generation — extracts answer from context"""
q_words = set(re.sub(r'[^\w\s]', '', query.lower()).split())
# Remove common stop words
stops = {'what','how','when','is','are','should','do','i','the','a','an','for'}
q_words -= stops
best = ("I could not find specific information about this in our policy documents. "
"Please contact the security team directly.", 0)
sentences = [s.strip() for s in re.split(r'[.!?\n]', context) if len(s.strip()) > 25]
for sent in sentences:
sent_words = set(re.sub(r'[^\w\s]', '', sent.lower()).split())
score = len(q_words & sent_words) / max(len(q_words), 1)
if score > best[1]:
best = (sent, score)
# Add source attribution
source_match = re.search(r'\[Source: ([^\]]+)\]', context)
if source_match and best[1] > 0.2:
return f"{best[0]}. [Per {source_match.group(1)}]"
return best[0]
def chat(self, query: str) -> dict:
self.stats['queries'] += 1
# Retrieve
results = self.store.search(query, k=4, alpha=0.4)
if not results:
return {'query': query, 'response': "No relevant policy found.", 'sources': []}
# Format context
context_parts = []
for r in results[:3]:
context_parts.append(f"[Source: {r['source']} | Score: {r['score']:.2f}]\n{r['text']}")
context = "\n\n---\n\n".join(context_parts)
# Update avg score
avg_score = np.mean([r['score'] for r in results[:3]])
self.stats['avg_retrieval_score'] = (
(self.stats['avg_retrieval_score'] * (self.stats['queries'] - 1) + avg_score)
/ self.stats['queries']
)
# Generate
response = self._generate_response(query, context)
sources = list(set(r['source'] for r in results[:3]))
turn = {'query': query, 'response': response, 'sources': sources}
self.history.append(turn)
return turn
def conversation_demo(self, questions: list):
print("=== Security Policy Chatbot ===\n")
for q in questions:
result = self.chat(q)
print(f"👤 {result['query']}")
print(f"🤖 {result['response']}")
print(f" 📚 Sources: {result['sources']}")
print()
print(f"Session stats: {self.stats['queries']} queries | "
f"Avg retrieval score: {self.stats['avg_retrieval_score']:.3f}")
bot = SecurityPolicyChatbot()
bot.conversation_demo([
"How long do I have to patch a critical vulnerability?",
"What MFA is required for remote access?",
"My workstation was infected with ransomware. What should I do first?",
"How often are vulnerability scans scheduled?",
"Can I get an exception if I can't patch in time?",
])
=== Security Policy Chatbot ===
👤 How long do I have to patch a critical vulnerability?
🤖 Critical vulnerabilities CVSS 9.0 10.0 patch within 24 hours. [Per vulnerability_management.md]
📚 Sources: ['vulnerability_management.md']
👤 What MFA is required for remote access?
🤖 MFA is mandatory for All remote access VPN SSH RDP. [Per password_policy.md]
📚 Sources: ['password_policy.md']
👤 My workstation was infected with ransomware. What should I do first?
🤖 Isolate affected systems from network. [Per incident_response.md]
📚 Sources: ['incident_response.md', 'vulnerability_management.md']
👤 How often are vulnerability scans scheduled?
🤖 External scan weekly Internal scan bi-weekly Web application scan monthly. [Per vulnerability_management.md]
📚 Sources: ['vulnerability_management.md']
👤 Can I get an exception if I can't patch in time?
🤖 Submit exception request to security team. [Per vulnerability_management.md]
📚 Sources: ['vulnerability_management.md']
Session stats: 5 queries | Avg retrieval score: 0.682