Lab 07: Kafka-Go
Overview
Step 1: kafka-go Concepts
Kafka Architecture:
Producer ──► Topic ──► Partition[0] ──► Consumer Group A
└── Partition[1] ──► Consumer Group A
└── Partition[2] ──► Consumer Group A
└── Consumer Group B (independent)
Message key → consistent partition routing (same key → same partition)
Consumer group → each partition assigned to exactly one consumer in the groupStep 2: Writer (Producer)
// producer.go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func newWriter(brokers []string, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
// Balancer: routes messages to partitions
// kafka.LeastBytes → minimize partition size
// kafka.RoundRobin → even distribution
// kafka.Hash → key-based (consistent routing!)
Balancer: &kafka.Hash{},
// Batching
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
// Required acks before considering a write successful
RequiredAcks: kafka.RequireAll, // -1: all in-sync replicas
// Async writes (fire-and-forget), lower latency but no error feedback
Async: false,
// Compression
Compression: kafka.Snappy,
}
}
func produce(w *kafka.Writer, userID string, eventType string, payload []byte) error {
msg := kafka.Message{
Key: []byte(userID), // same userID → same partition → ordered
Value: payload,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(eventType)},
{Key: "timestamp", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return w.WriteMessages(ctx, msg)
}
func produceBatch(w *kafka.Writer, msgs []kafka.Message) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return w.WriteMessages(ctx, msgs...)
}Step 3: Reader (Consumer)
Step 4: Consumer Group Rebalancing
Step 5: Exactly-Once Semantics (Concept)
Step 6: Topic Management
Step 7: API Demo (Without Live Kafka)
Step 8: Capstone — Producer + Consumer Pattern
Summary
Concept
API
Notes
Last updated
