Lab 07: Kafka-Go

Time: 45 minutes | Level: Advanced | Docker: docker run -it --rm golang:1.22-alpine sh

Overview

Use segmentio/kafka-go to build producers and consumers: topic management, consumer groups, partition routing via message keys, context-based cancellation, and exactly-once semantics concepts.


Step 1: kafka-go Concepts

Kafka Architecture:
  Producer ──► Topic ──► Partition[0] ──► Consumer Group A
                     └── Partition[1] ──► Consumer Group A
                     └── Partition[2] ──► Consumer Group A
                                      └── Consumer Group B (independent)

Message key → consistent partition routing (same key → same partition)
Consumer group → each partition assigned to exactly one consumer in the group

Step 2: Writer (Producer)

// producer.go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func newWriter(brokers []string, topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:  kafka.TCP(brokers...),
		Topic: topic,

		// Balancer: routes messages to partitions
		// kafka.LeastBytes    → minimize partition size
		// kafka.RoundRobin    → even distribution
		// kafka.Hash          → key-based (consistent routing!)
		Balancer: &kafka.Hash{},

		// Batching
		BatchSize:    100,
		BatchTimeout: 10 * time.Millisecond,

		// Required acks before considering a write successful
		RequiredAcks: kafka.RequireAll, // -1: all in-sync replicas

		// Async writes (fire-and-forget), lower latency but no error feedback
		Async: false,

		// Compression
		Compression: kafka.Snappy,
	}
}

func produce(w *kafka.Writer, userID string, eventType string, payload []byte) error {
	msg := kafka.Message{
		Key:   []byte(userID),   // same userID → same partition → ordered
		Value: payload,
		Headers: []kafka.Header{
			{Key: "event-type", Value: []byte(eventType)},
			{Key: "timestamp", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
		},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	return w.WriteMessages(ctx, msg)
}

func produceBatch(w *kafka.Writer, msgs []kafka.Message) error {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	return w.WriteMessages(ctx, msgs...)
}

Step 3: Reader (Consumer)


Step 4: Consumer Group Rebalancing

💡 Rebalancing: When a consumer joins or leaves a group, Kafka rebalances partition assignments. During rebalance, consumption pauses briefly. Use FetchMessage + CommitMessages for manual control.


Step 5: Exactly-Once Semantics (Concept)


Step 6: Topic Management


Step 7: API Demo (Without Live Kafka)

📸 Verified Output:


Step 8: Capstone — Producer + Consumer Pattern


Summary

Concept
API
Notes

Producer

kafka.Writer{Balancer: &kafka.Hash{}}

Key → partition routing

Consumer

kafka.NewReader(kafka.ReaderConfig{GroupID: ...})

Group auto-rebalance

Manual commit

r.FetchMessage + r.CommitMessages

At-least-once

Auto commit

r.ReadMessage

Simpler, same semantics

Batch send

w.WriteMessages(ctx, msgs...)

Higher throughput

Cancellation

context.WithCancel(ctx)

Graceful shutdown

Key Takeaways:

  • Same message key → same partition → ordered delivery for that key

  • Consumer groups provide horizontal scaling with each partition owned by one consumer

  • FetchMessage + CommitMessages gives you control over at-least-once delivery

  • Exactly-once requires idempotent consumer (dedup) or transactional API

  • Always handle ctx.Err() in consumer loop for graceful shutdown

Last updated