Copy docker run --rm zchencow/innozverse-go:latest go run - << 'EOF'
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Step 3: Context propagation through goroutines
func crawler(ctx context.Context, url string, depth int, results chan<- string) {
select {
case <-ctx.Done():
return
default:
}
results <- fmt.Sprintf("fetched: %s (depth=%d)", url, depth)
if depth <= 0 { return }
subURLs := []string{url + "/page1", url + "/page2"}
var wg sync.WaitGroup
for _, sub := range subURLs {
wg.Add(1)
go func(u string) {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
crawler(ctx, u, depth-1, results)
}(sub)
}
wg.Wait()
}
// Step 4: Timeout wrapper for any function
func withTimeout[T any](ctx context.Context, timeout time.Duration, fn func(context.Context) (T, error)) (T, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
type result struct {
val T
err error
}
ch := make(chan result, 1)
go func() {
v, err := fn(ctx)
ch <- result{v, err}
}()
select {
case r := <-ch:
return r.val, r.err
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
}
// Step 5: Cancellable pipeline
func source(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func doubler(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * 2:
case <-ctx.Done():
return
}
}
}()
return out
}
// Step 6: Graceful shutdown
type Server struct {
name string
running bool
}
func (s *Server) Start(ctx context.Context) {
s.running = true
fmt.Printf("[%s] started\n", s.name)
<-ctx.Done()
fmt.Printf("[%s] shutting down: %v\n", s.name, ctx.Err())
time.Sleep(5 * time.Millisecond) // simulate cleanup
s.running = false
fmt.Printf("[%s] stopped\n", s.name)
}
// Step 7: Context in worker pool
func poolWorker(ctx context.Context, id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok { return }
select {
case results <- job * job:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
// Step 8: Capstone — cancellable request processor
type Request struct {
ID int
Payload string
}
type Response struct {
ReqID int
Result string
Error error
}
func processWithContext(ctx context.Context, req Request) Response {
// Simulate variable processing time
delay := time.Duration(len(req.Payload)) * time.Millisecond
select {
case <-time.After(delay):
return Response{req.ID, "processed: " + req.Payload, nil}
case <-ctx.Done():
return Response{req.ID, "", ctx.Err()}
}
}
func main() {
bg := context.Background()
// Web crawler with cancellation
fmt.Println("=== Crawler ===")
ctx1, cancel1 := context.WithTimeout(bg, 200*time.Millisecond)
defer cancel1()
results := make(chan string, 20)
go func() {
crawler(ctx1, "https://example.com", 2, results)
close(results)
}()
count := 0
for r := range results { fmt.Println(" ", r); count++ }
fmt.Printf("Crawled %d pages\n", count)
// Timeout wrapper
fmt.Println("\n=== Timeout Wrapper ===")
val, err := withTimeout(bg, 100*time.Millisecond, func(ctx context.Context) (string, error) {
time.Sleep(10 * time.Millisecond)
return "data fetched", nil
})
fmt.Printf("result=%q err=%v\n", val, err)
_, err = withTimeout(bg, 5*time.Millisecond, func(ctx context.Context) (string, error) {
time.Sleep(100 * time.Millisecond)
return "too slow", nil
})
fmt.Printf("timeout result: err=%v\n", err)
// Cancellable pipeline
fmt.Println("\n=== Cancellable Pipeline ===")
ctx2, cancel2 := context.WithCancel(bg)
nums := source(ctx2, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
doubled := doubler(ctx2, nums)
count2 := 0
for n := range doubled {
fmt.Printf("%d ", n)
count2++
if count2 == 5 { cancel2(); break }
}
fmt.Printf("\nProcessed %d values before cancel\n", count2)
// Graceful shutdown
fmt.Println("\n=== Graceful Shutdown ===")
ctx3, cancel3 := context.WithCancel(bg)
servers := []*Server{{"api", false}, {"worker", false}, {"metrics", false}}
var wg sync.WaitGroup
for _, s := range servers {
wg.Add(1)
s := s
go func() { defer wg.Done(); s.Start(ctx3) }()
}
time.Sleep(20 * time.Millisecond)
fmt.Println("Initiating shutdown...")
cancel3()
wg.Wait()
fmt.Println("All servers stopped")
// Cancellable request processor
fmt.Println("\n=== Request Processor ===")
requests := []Request{
{1, "hi"}, // fast
{2, "hello world"}, // medium
{3, "x"}, // fast
}
ctx4, cancel4 := context.WithTimeout(bg, 50*time.Millisecond)
defer cancel4()
for _, req := range requests {
resp := processWithContext(ctx4, req)
if resp.Error != nil {
fmt.Printf(" req#%d: ERROR %v\n", resp.ReqID, resp.Error)
} else {
fmt.Printf(" req#%d: %s\n", resp.ReqID, resp.Result)
}
}
}
EOF