Lab 03: LLM API Integration — Streaming, Tool Use, Structured Output

Objective

Master production LLM API integration: streaming responses, function calling with tool orchestration, structured JSON output with Pydantic validation, retry logic, and cost monitoring — using mock APIs compatible with any provider (OpenAI, Anthropic, Gemini).

Time: 55 minutes | Level: Advanced | Docker Image: zchencow/innozverse-ai:latest


Background

Calling an LLM API in production requires far more than response = client.chat(prompt):

Naive:        response = openai.chat(messages)
Production:   streaming + retry + timeout + cost tracking + 
              structured output + tool use + error handling

Step 1: Mock LLM Client

docker run -it --rm zchencow/innozverse-ai:latest bash
import json, time, random, hashlib
from typing import Generator, Optional, List, Dict, Any
import warnings; warnings.filterwarnings('ignore')

class MockLLMResponse:
    """Simulates LLM API response structure"""
    def __init__(self, content: str, model: str = "claude-sonnet-4-6",
                 input_tokens: int = 100, output_tokens: int = 50):
        self.content       = content
        self.model         = model
        self.input_tokens  = input_tokens
        self.output_tokens = output_tokens
        self.stop_reason   = "end_turn"

class MockLLMClient:
    """
    Mock LLM client — replace with:
        import anthropic; client = anthropic.Anthropic()
        import openai;    client = openai.OpenAI()
    
    Same interface works for any provider with minor adapter changes.
    """
    PRICING = {
        "claude-sonnet-4-6": {"input": 3.0, "output": 15.0},   # per million tokens
        "claude-haiku-3":    {"input": 0.25, "output": 1.25},
        "gpt-4o":            {"input": 5.0, "output": 15.0},
        "gpt-4o-mini":       {"input": 0.15, "output": 0.60},
    }

    def __init__(self, model: str = "claude-sonnet-4-6"):
        self.model      = model
        self.call_count = 0
        self.total_cost = 0.0

    def _estimate_tokens(self, text: str) -> int:
        return max(1, len(text) // 4)

    def _compute_cost(self, input_tokens: int, output_tokens: int) -> float:
        price = self.PRICING.get(self.model, {"input": 3.0, "output": 15.0})
        return (input_tokens * price["input"] + output_tokens * price["output"]) / 1_000_000

    def chat(self, messages: list, max_tokens: int = 1024,
             temperature: float = 0.7) -> MockLLMResponse:
        """Standard (non-streaming) completion"""
        self.call_count += 1
        prompt = " ".join(m.get("content", "") for m in messages)
        input_tokens  = self._estimate_tokens(prompt)
        output_tokens = random.randint(50, min(max_tokens, 300))
        cost = self._compute_cost(input_tokens, output_tokens)
        self.total_cost += cost

        # Generate deterministic mock response
        seed_text = prompt[:50]
        responses = [
            "Based on my analysis of the security event, I recommend immediate investigation of the anomalous traffic pattern. The indicators suggest a potential data exfiltration attempt.",
            "The CVE-2024-1234 vulnerability affects systems running the affected software version. Immediate patching is recommended within 24 hours.",
            "Analysis complete. Threat confidence: HIGH. Recommended action: isolate affected endpoint and collect forensic artifacts.",
            "The network traffic pattern is consistent with normal user behaviour. No immediate action required.",
        ]
        content = responses[hash(seed_text) % len(responses)]
        return MockLLMResponse(content, self.model, input_tokens, output_tokens)

    def stream(self, messages: list, max_tokens: int = 512) -> Generator:
        """Streaming completion — yields chunks as they arrive"""
        response = self.chat(messages, max_tokens)
        words = response.content.split()
        for i, word in enumerate(words):
            time.sleep(0.001)  # simulate network latency
            yield {'type': 'text', 'text': word + (' ' if i < len(words)-1 else '')}
        yield {'type': 'end', 'input_tokens': response.input_tokens,
               'output_tokens': response.output_tokens}

client = MockLLMClient(model="claude-sonnet-4-6")
print(f"Client ready: model={client.model}")
print(f"Pricing: ${client.PRICING[client.model]['input']:.2f} input / ${client.PRICING[client.model]['output']:.2f} output per 1M tokens")

📸 Verified Output:


Step 2: Streaming Response

📸 Verified Output:


Step 3: Function Calling / Tool Use

📸 Verified Output:


Step 4: Structured Output with Pydantic

📸 Verified Output:


Step 5: Retry Logic and Error Handling

📸 Verified Output:


Step 6: Prompt Templates and Context Management

📸 Verified Output:


Step 7: Cost Monitoring and Token Optimisation

📸 Verified Output:


Step 8: Capstone — Multi-Provider SOC Intelligence API

📸 Verified Output:


Summary

Feature
Implementation
Production Equivalent

Streaming

Yield chunks

anthropic.stream() / openai.stream()

Tool use

Detect intent → execute → re-prompt

tools= parameter

Structured output

Pydantic validation

response_format={"type": "json_object"}

Retry logic

Exponential backoff + jitter

tenacity library

Cost monitoring

Token × price

Langfuse / Helicone

Multi-provider

Routing rules + fallback

LiteLLM library

Further Reading

Last updated