Lab 11: Distributed Task Queue
Overview
Prerequisites
# Start Redis in background
docker run -d --name redis-lab -p 6379:6379 redis:7
# Install redis-py
pip install redisStep 1: Task Definition and Serialization
import json
import pickle
import uuid
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Optional
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
DEAD = "dead"
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
args: list = field(default_factory=list)
kwargs: dict = field(default_factory=dict)
status: str = TaskStatus.PENDING
retries: int = 0
max_retries: int = 3
created_at: float = field(default_factory=time.time)
started_at: Optional[float] = None
done_at: Optional[float] = None
result: Any = None
error: Optional[str] = None
priority: int = 0 # higher = more urgent
def serialize_task(task: Task) -> bytes:
return json.dumps(asdict(task)).encode()
def deserialize_task(data: bytes) -> Task:
return Task(**json.loads(data.decode()))
# Test serialization
task = Task(name="compute_sum", args=[1, 2, 3], kwargs={"start": 10})
serialized = serialize_task(task)
restored = deserialize_task(serialized)
print(f"Task ID: {task.id}")
print(f"Serialized size: {len(serialized)} bytes")
print(f"Round-trip: args={restored.args}, kwargs={restored.kwargs}")
print(f"Status: {restored.status}")Step 2: Redis-Backed Queue
Step 3: Worker Implementation
Step 4: Producer — Enqueue Tasks
Step 5: Exponential Backoff
Step 6: Dead Letter Queue Management
Step 7: Task Result TTL and Polling
Step 8: Capstone — Full Queue Demo
Summary
Concept
Implementation
Use Case
Last updated
