Lab 11: Distributed Task Queue

Time: 60 minutes | Level: Architect | Docker: docker run -it --rm python:3.11-slim bash

Overview

Build a production-grade distributed task queue backed by Redis. Covers task serialization, worker process pools, retry with exponential backoff, dead letter queues, and result TTL management.

Prerequisites

# Start Redis in background
docker run -d --name redis-lab -p 6379:6379 redis:7

# Install redis-py
pip install redis

Step 1: Task Definition and Serialization

import json
import pickle
import uuid
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Optional
from enum import Enum

class TaskStatus(str, Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    DONE      = "done"
    FAILED    = "failed"
    DEAD      = "dead"

@dataclass
class Task:
    id:         str = field(default_factory=lambda: str(uuid.uuid4()))
    name:       str = ""
    args:       list = field(default_factory=list)
    kwargs:     dict = field(default_factory=dict)
    status:     str = TaskStatus.PENDING
    retries:    int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)
    started_at: Optional[float] = None
    done_at:    Optional[float] = None
    result:     Any = None
    error:      Optional[str] = None
    priority:   int = 0  # higher = more urgent

def serialize_task(task: Task) -> bytes:
    return json.dumps(asdict(task)).encode()

def deserialize_task(data: bytes) -> Task:
    return Task(**json.loads(data.decode()))

# Test serialization
task = Task(name="compute_sum", args=[1, 2, 3], kwargs={"start": 10})
serialized = serialize_task(task)
restored = deserialize_task(serialized)

print(f"Task ID: {task.id}")
print(f"Serialized size: {len(serialized)} bytes")
print(f"Round-trip: args={restored.args}, kwargs={restored.kwargs}")
print(f"Status: {restored.status}")

Step 2: Redis-Backed Queue

Step 3: Worker Implementation

Step 4: Producer — Enqueue Tasks

Step 5: Exponential Backoff

Step 6: Dead Letter Queue Management

Step 7: Task Result TTL and Polling

Step 8: Capstone — Full Queue Demo

📸 Verified Output (without Redis):

Summary

Concept
Implementation
Use Case

Task serialization

json + dataclass

Portable task format

Priority queue

Redis sorted set (ZADD/BZPOPMIN)

Priority-based scheduling

Running tracking

Redis hash

At-most-once delivery

Result TTL

Redis SETEX

Ephemeral result storage

Exponential backoff

2 ** attempt delay

Fault-tolerant retries

Dead letter queue

Redis list

Failed task forensics

Worker loop

Blocking dequeue

CPU-efficient polling

Batch enqueue

enqueue_batch

Throughput optimization

Last updated