Copy docker run --rm zchencow/innozverse-python:latest python3 -c "
import asyncio, time, random
# Step 3: Producer-Consumer with asyncio.Queue
async def producer(q: asyncio.Queue, items: list, name: str = 'producer'):
for item in items:
await asyncio.sleep(0.005)
await q.put(item)
await q.put(None) # sentinel
async def consumer(q: asyncio.Queue, results: list, worker_id: int):
while True:
item = await q.get()
if item is None:
await q.put(None) # forward sentinel to next worker
q.task_done()
break
await asyncio.sleep(0.01) # simulate processing
results.append({'worker': worker_id, 'item': item, 'result': item ** 2})
q.task_done()
async def run_pipeline():
q = asyncio.Queue(maxsize=10)
results = []
items = list(range(1, 21)) # 20 items
prod = asyncio.create_task(producer(q, items))
workers = [asyncio.create_task(consumer(q, results, i)) for i in range(4)]
await asyncio.gather(prod, *workers)
return sorted(results, key=lambda r: r['item'])
results = asyncio.run(run_pipeline())
print('=== Producer-Consumer ===')
print(f'Processed {len(results)} items by {len(set(r[\"worker\"] for r in results))} workers')
by_worker = {}
for r in results:
by_worker.setdefault(r['worker'], 0)
by_worker[r['worker']] += 1
print(f'Distribution: {dict(sorted(by_worker.items()))}')
print(f'Sample results: {[(r[\"item\"],r[\"result\"]) for r in results[:4]]}')
# Step 4: Retry with exponential backoff
async def flaky_service(attempt: int) -> str:
if attempt < 3: raise ConnectionError(f'Service unavailable (attempt {attempt})')
return f'Success on attempt {attempt}'
async def with_retry(coro_factory, max_retries: int = 5, base_delay: float = 0.01):
last_exc = None
for attempt in range(max_retries):
try:
return await asyncio.wait_for(coro_factory(attempt), timeout=2.0)
except (ConnectionError, asyncio.TimeoutError) as e:
last_exc = e
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt) + random.uniform(0, base_delay)
print(f' Retry {attempt+1}/{max_retries} after {delay:.3f}s: {e}')
await asyncio.sleep(delay)
raise RuntimeError(f'Failed after {max_retries} retries') from last_exc
async def run_retry():
print()
print('=== Retry with exponential backoff ===')
result = await with_retry(flaky_service)
print(f'Final: {result}')
asyncio.run(run_retry())
# Step 5: asyncio.timeout (Python 3.11+)
async def run_timeout():
print()
print('=== asyncio.timeout ===')
# Successful within timeout
async with asyncio.timeout(1.0):
await asyncio.sleep(0.01)
print('Task completed within 1.0s')
# Timeout exceeded
try:
async with asyncio.timeout(0.05):
await asyncio.sleep(10.0)
except TimeoutError:
print('Timed out after 0.05s (TimeoutError raised)')
asyncio.run(run_timeout())
# Step 6: Async context manager
class AsyncDBConnection:
def __init__(self, url: str):
self.url = url; self.connected = False; self.queries = 0
async def __aenter__(self):
await asyncio.sleep(0.01) # simulate connect
self.connected = True
print(f' Connected to {self.url}')
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.005) # simulate close
self.connected = False
print(f' Disconnected ({self.queries} queries executed)')
return False
async def execute(self, sql: str) -> list:
if not self.connected: raise RuntimeError('Not connected')
self.queries += 1
await asyncio.sleep(0.005)
return [{'result': f'{sql[:20]}...', 'row': i} for i in range(3)]
async def run_async_cm():
print()
print('=== Async context manager ===')
async with AsyncDBConnection('sqlite+async:///store.db') as conn:
rows = await conn.execute('SELECT * FROM products')
print(f' Query returned {len(rows)} rows')
rows2 = await conn.execute('SELECT COUNT(*) FROM orders')
print(f' Second query: {len(rows2)} rows')
asyncio.run(run_async_cm())
# Step 7: Async generator
async def stream_products(count: int, batch_size: int = 5):
'''Simulate a streaming API that yields products in batches.'''
for start in range(0, count, batch_size):
await asyncio.sleep(0.01)
batch = [
{'id': i, 'name': f'Product-{i}', 'price': i * 9.99}
for i in range(start, min(start + batch_size, count))
]
for item in batch:
yield item
async def run_async_gen():
print()
print('=== Async generator ===')
count = total = 0
async for product in stream_products(18):
count += 1
total += product['price']
print(f'Streamed {count} products, total value \${total:.2f}')
asyncio.run(run_async_gen())
# Step 8: Capstone — concurrent data pipeline
async def fetch_product(pid: int, sem: asyncio.Semaphore) -> dict:
async with sem:
await asyncio.sleep(0.01)
return {'id': pid, 'name': f'Product-{pid}', 'price': pid * 9.99, 'stock': pid % 50}
async def fetch_inventory(pid: int) -> int:
await asyncio.sleep(0.005)
return (pid * 7) % 100
async def enrich_product(p: dict) -> dict:
inventory = await fetch_inventory(p['id'])
return {**p, 'live_stock': inventory, 'value': p['price'] * inventory}
async def capstone_pipeline():
print()
print('=== Capstone: Async Data Pipeline ===')
t0 = time.perf_counter()
sem = asyncio.Semaphore(10) # max 10 concurrent fetches
product_ids = range(1, 31)
# Stage 1: fetch products concurrently
products = await asyncio.gather(*[fetch_product(i, sem) for i in product_ids])
# Stage 2: enrich concurrently with TaskGroup
enriched = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(enrich_product(p)) for p in products]
enriched = [t.result() for t in tasks]
# Stage 3: aggregate
total_value = sum(p['value'] for p in enriched)
in_stock = sum(1 for p in enriched if p['live_stock'] > 0)
top5 = sorted(enriched, key=lambda p: p['value'], reverse=True)[:5]
elapsed = time.perf_counter() - t0
print(f'Pipeline complete: {len(enriched)} products in {elapsed:.3f}s')
print(f'Total value: \${total_value:,.2f}')
print(f'In stock: {in_stock}/{len(enriched)}')
print('Top 5 by value:')
for p in top5:
print(f' {p[\"name\"]:15s} \${p[\"value\"]:8.2f} (stock={p[\"live_stock\"]})')
asyncio.run(capstone_pipeline())
"