得益于Go语言的精简语法和对并发控制的良好支持,我们可以迅速总结出高效而优雅的常用并发代码范式。

for-select模式

我们可以循环监听多个channel的消息实现多路复用,我们也可以使用for...range遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func normal_task(idx int) {
fmt.Printf("normal_task(%d)\n", idx)
}

// 死循环一直监听
func select_pattern(ctx context.Context) {
cnt := 0
for {
select {
case <-ctx.Done():
fmt.Printf("done\n")
return
default:
cnt++
cnt %= 100
normal_task(cnt)
time.Sleep(1 * time.Second)
}
}
}

// 范围for遍历
func select_range_pattern(ctx context.Context, arr []int) {
for _, v := range arr {
select {
case <-ctx.Done():
fmt.Printf("done\n")
return
default:
normal_task(v)
time.Sleep(1 * time.Second)
}
}
fmt.Printf("select_range_pattern finished\n")
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
select_pattern(ctx)
}()
time.Sleep(10 * time.Second)
cancel()
wg.Wait()

fmt.Println("-------stage2------------")
wg = sync.WaitGroup{}
wg.Add(1)
ctx, cancel = context.WithCancel(context.Background())
go func() {
defer wg.Done()
select_range_pattern(ctx, []int{1, 2, 3, 4, 5})
}()
time.Sleep(10 * time.Second)
cancel()
wg.Wait()
}

select-timeout模式

我们可以利用time.Afterselect的监听设置超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func time_pattern(ctx context.Context, ch chan int) {
select {
case <-ctx.Done():
fmt.Printf("canceled\n")
return
case a := <-ch:
fmt.Printf("recv msg: %d\n", a)
return
case <-time.After(2 * time.Second):
fmt.Printf("timeout 2s\n")
return
}
}

func main() {
wg = sync.WaitGroup{}
wg.Add(1)
ctx, cancel = context.WithCancel(context.Background())
ch := make(chan int)
go func() {
fmt.Println("发起了一个请求:")
time.Sleep(5 * time.Second)
ch <- 666
}()

go func() {
defer wg.Done()
time_pattern(ctx, ch)
}()
wg.Wait()
}

pipeline模式

流水线模式或者说管线模式,让多个协程通过管道串联起来分步骤依次处理任务

下面是一种实现方式