Write concurrent Go programs using goroutines and channels: fan-out/fan-in, worker pools, select statements, and channel directions.
Time
35 minutes
Prerequisites
Lab 01–06
Tools
Docker image: zchencow/innozverse-go:latest
Lab Instructions
Step 1: Goroutines
dockerrun--rmzchencow/innozverse-go:latestgorun-<<'EOF'package mainimport ( "fmt" "sync" "time")func sayHello(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Goroutine %d: Hello!\n", id)}func countDown(from int) { for i := from; i >= 0; i-- { fmt.Printf("T-%d\n", i) time.Sleep(10 * time.Millisecond) }}func main() { // Launch goroutines and wait for all to finish var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go sayHello(i, &wg) } wg.Wait() fmt.Println("All goroutines done") // Goroutines are lightweight — spawn thousands easily fmt.Println("\nCounting down...") var wg2 sync.WaitGroup wg2.Add(1) go func() { defer wg2.Done() countDown(3) }() wg2.Wait() fmt.Println("Launch!")}EOF
💡 Goroutines cost ~2KB of stack (vs ~1MB for OS threads) and the runtime multiplexes them across CPU cores. You can have millions of goroutines simultaneously. sync.WaitGroup is the standard way to wait for a group to finish — Add(1) before launching, Done() in the goroutine (always via defer), Wait() to block.
📸 Verified Output:
Step 2: Channels
💡 Close a channel when the sender is done — receivers can use range ch to consume all values and stop automatically when the channel closes. Only the sender should close a channel, never the receiver. Sending to a closed channel panics. Receiving from a closed channel returns the zero value immediately.
📸 Verified Output:
Step 3: Select Statement
💡 select is the heart of Go concurrency. It waits for whichever channel is ready first. time.After(d) returns a channel that receives after duration d — perfect for timeouts. The default case makes select non-blocking. select {} (empty select) blocks forever — useful as a sleep(forever).
📸 Verified Output:
Step 4: Fan-out / Fan-in
📸 Verified Output:
Steps 5–8: Worker Pool, Done Channel, Mutex vs Channel, Capstone
docker run --rm zchencow/innozverse-go:latest go run - << 'EOF'
package main
import (
"fmt"
"math/rand"
)
func producer(ch chan<- int, count int) {
for i := 0; i < count; i++ {
ch <- rand.Intn(100)
}
close(ch) // signal no more values
}
func squarer(in <-chan int, out chan<- int) {
for n := range in { // range on channel receives until closed
out <- n * n
}
close(out)
}
func main() {
// Unbuffered channel — send/receive synchronize
nums := make(chan int)
squares := make(chan int)
go producer(nums, 5)
go squarer(nums, squares)
for sq := range squares {
fmt.Printf("square: %d\n", sq)
}
// Buffered channel — send doesn't block until buffer full
buf := make(chan string, 3)
buf <- "first"
buf <- "second"
buf <- "third"
// buf <- "fourth" would block (buffer full)
fmt.Println(<-buf)
fmt.Println(<-buf)
fmt.Println(<-buf)
// Channel directions in function signatures
// chan<- T: send-only <-chan T: receive-only
ping := make(chan string, 1)
pong := make(chan string, 1)
go func(in <-chan string, out chan<- string) {
msg := <-in
out <- msg + " PONG"
}(ping, pong)
ping <- "PING"
fmt.Println(<-pong)
}
EOF
square: 441
square: 6724
square: 1225
square: 961
square: 784
first
second
third
PING PONG
docker run --rm zchencow/innozverse-go:latest go run - << 'EOF'
package main
import (
"fmt"
"time"
)
func ticker(interval time.Duration, stop <-chan struct{}) <-chan time.Time {
out := make(chan time.Time)
go func() {
defer close(out)
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case tick := <-t.C:
out <- tick
case <-stop:
return
}
}
}()
return out
}
func main() {
// select: wait on multiple channels simultaneously
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "one"
ch2 <- "two"
// Non-deterministic which case fires first
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Println("from ch1:", msg)
case msg := <-ch2:
fmt.Println("from ch2:", msg)
}
}
// Timeout pattern
slow := make(chan string)
go func() {
time.Sleep(5 * time.Millisecond)
slow <- "response"
}()
select {
case msg := <-slow:
fmt.Println("Got:", msg)
case <-time.After(100 * time.Millisecond):
fmt.Println("Timeout!")
}
// Default case — non-blocking select
ready := make(chan bool, 1)
select {
case <-ready:
fmt.Println("Ready!")
default:
fmt.Println("Not ready (default)")
}
// Done signal via channel
stop := make(chan struct{})
ticks := ticker(10*time.Millisecond, stop)
count := 0
for range ticks {
count++
if count >= 3 { close(stop); break }
}
fmt.Println("Ticks received:", count)
}
EOF
from ch1: one
from ch2: two
Got: response
Not ready (default)
Ticks received: 3
docker run --rm zchencow/innozverse-go:latest go run - << 'EOF'
package main
import (
"fmt"
"sync"
"time"
)
// Fan-out: one channel → multiple workers
func fanOut[T, U any](input <-chan T, workers int, fn func(T) U) <-chan U {
out := make(chan U, workers)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for v := range input { out <- fn(v) }
}()
}
go func() { wg.Wait(); close(out) }()
return out
}
// Fan-in: merge multiple channels into one
func fanIn[T any](channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
ch := ch
go func() {
defer wg.Done()
for v := range ch { out <- v }
}()
}
go func() { wg.Wait(); close(out) }()
return out
}
// Pipeline stages
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums { out <- n }
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in { out <- n * n }
close(out)
}()
return out
}
func main() {
// Pipeline
src := generate(2, 3, 4, 5, 6, 7, 8)
squared := sq(src)
squaredAgain := sq(squared)
for n := range squaredAgain {
fmt.Printf("n^4: %d\n", n)
}
// Fan-out: parallel processing
jobs := make(chan int, 10)
for i := 1; i <= 8; i++ { jobs <- i }
close(jobs)
// 3 workers process jobs concurrently
results := fanOut(jobs, 3, func(n int) string {
time.Sleep(time.Duration(n) * time.Millisecond)
return fmt.Sprintf("job%d→%d", n, n*n)
})
var collected []string
for r := range results { collected = append(collected, r) }
fmt.Printf("Processed %d jobs\n", len(collected))
}
EOF