并发与并行:

  • 并发(concurrency)是独立执行的运算的组合,是一种组织软件代码的模式;
  • 并发更多考虑的是代码结构,它的目标是产出简洁的代码(clean code);
  • 并行(parallelism)是指同时执行多项任务;
  • 并行更多的是关于代码的执行方式(多实例)。

Do not communicate by sharing memory; instead, share memory by communicating.

Go 始终推荐以 CSP (Communicating Sequential Process) 模型的风格构建并发程序。

select 语句是一种专属于并发的控制结构(control structure),它是把通道和协程这两个并发元素作为 Go 语言原生特性而不是一个库函数的的原因。用 select 可以轻易地实现控制结构,而用外部库函数则很难。

fan-in/fan-out:

  • fan-in 负责任务汇总,一个协程从多个通道中读取数据并 multiplex 到一个通道,直到所有输入源关闭为止。
  • fan-out 负责任务分发,多个协程去同一个通道读取数据,起到在多个 worker 中分配任务、使 CPU 和 I/O 并行化的作用。
    • 协程去 fan-in 后的通道获取任务。

TL;DR

在循环中开启协程要特别注意是否需要使用闭包。

通知+等待:

  • g1 通知 g2 结束并等待 g2 结束后再结束:g1ch1 写数据通知,等待读 ch2 成功后再退出;g2 读到 ch1 后结束工作、执行 cleanup,最后写 ch2
  • 一个协程等待多个子协程结束:子协程启动时 wg.Add(1)defer wg.Done();主协程 wg.Wait()

generator 模式

generator 是一个返回通道的函数

// https://go.dev/play/p/PcToIfOqBnR
func main() {
	c := boring("boring")
	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\n", <-c)
	}
}
func boring(msg string) <-chan string {
	c := make(chan string)
	go func() { // 要想写入无缓冲通道 c,又要能返回 c,自然要新起一个协程
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

上例中 boring 就是一个 generator,跟 service 很像,可以创建多个实例:

// https://go.dev/play/p/es0HhqgLkDf
func main() {
	joe := boring("Joe")
	ann := boring("Ann")
	for i := 0; i < 5; i++ { // 无缓冲通道的同步特性使 joe 通道和 ann 通道的值交替打印
		fmt.Println(<-joe)
		fmt.Println(<-ann)
	}
}

上例循环中两个打印操作互相阻塞,而向 ann 通道的发送操作可能比向 joe 通道的发送操作的速度要快,此时可以用 fan-in 写法(annjoe 并入一个通道)让 annjoe 不会互相阻塞:

// https://go.dev/play/p/U-dt-EbjVoQ
func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i := 0; i < 10; i++ {
		fmt.Println(<-c)
	}
}
// fan-in 也称为 multiplexer;fanIn 函数也是 generator
func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for {
			c <- <-input1
		}
	}()
	go func() {
		for {
			c <- <-input2
		}
	}()
	return c
}

采用 fan-in 的写法后,若想恢复交替打印(joeann 交替打印,但谁先开始打印不确定),可以定义包含通道的消息类型:

// https://go.dev/play/p/W7g4nR7ZfP5
func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i := 0; i < 5; i++ {
		msg1 := <-c
		fmt.Println(msg1.str)
		msg2 := <-c
		fmt.Println(msg2.str)
		msg1.wait <- true
		msg2.wait <- true
	}
}
type Message struct {
	str  string
	wait chan bool
}
func fanIn(inputs ...<-chan Message) <-chan Message {
	c := make(chan Message)
	// for _, input := range inputs 这种写法每次迭代 input 的地址是同一个,由于 c 还保持阻塞,迭代结束后 input 更新成 inputs 的最后一个元素,
	// 导致最终 fan-in 得到的只是 inputs 中的最后一个(此处是 "Ann"),"joe" 丢失了。https://go.dev/play/p/v_SFZm4lc_f
	// 可以用闭包改写,或采用以下写法。
	for i := range inputs {
		input := inputs[i]
		go func() {
			for {
				c <- <-input
			}
		}()
	}
	return c
}
func boring(msg string) <-chan Message {
	c := make(chan Message)
	waitForIt := make(chan bool)
	go func() {
		for i := 0; ; i++ {
			c <- Message{fmt.Sprintf("%s %d", msg, i), waitForIt}
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
			<-waitForIt
		}
	}()
	return c
}

select + fan-in

select 改写 fanIn

// https://go.dev/play/p/0Wk8MOz5e3O
func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() {
		for {
			select {
			case s := <-input1:
				c <-s
			case s := <-input2:
				c <-s
			}
		}
	}()
	return c
}

实现超时:

// https://go.dev/play/p/2Ue61Jl-VMo
func main() {
	c := boring("Joe") // boring 超过 1s 没有往通道发送数据,main 就会返回
	for {
		select {
		case s := <-c:
			fmt.Println(s)
		case <-time.After(1 * time.Second): // 若此处时间比 boring 中协程的睡眠时间长,此 case 就没机会被选中
			fmt.Println("bye")
			return
		}
	}
}

主动通知 boring 停止发送数据:

// https://go.dev/play/p/XHhufDGU0Di
func main() {
	rand.Seed(time.Now().UnixNano())
	quit := make(chan bool)
	c := boring("Joe", quit)
	for i := rand.Intn(10); i >= 0; i-- {
		fmt.Println(<-c)
	}
	quit <- true
}
func boring(msg string, quit <-chan bool) <-chan string {
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			select {
			case c <- fmt.Sprintf("%s %d", msg, i):
				// do nothing
			case <-quit:
				return
			}
		}
	}()
	return c
}

boring 接收到停止发送数据的信号后,要求 main 协程等自己完成清理工作后再退出:

// https://go.dev/play/p/zc6bBX6LhUa
func main() {
	rand.Seed(time.Now().UnixNano())
	quit := make(chan string)
	c := boring("Joe", quit)
	for i := rand.Intn(10); i >= 0; i-- {
		fmt.Println(<-c)
	}
	quit <- "bye"
	fmt.Printf("main exited after boring said %q\n", <-quit)
}
func boring(msg string, quit chan string) <-chan string {
	c := make(chan string)
	go func() {
		for i := 0; ; i++ {
			select {
			case c <- fmt.Sprintf("%s %d", msg, i):
				// do nothing
			case <-quit:
				// do some clean up work like deleting tmp files
				quit <- "I'm done"
				return
			}
		}
	}()
	return c
}

流水线模式

流水线的一种定义:

  • 是一种并发程序;
  • 是一系列由通道连接的阶段(stage)。
    • 每个阶段里一组协程运行同一个函数;
    • 某个阶段通过接收通道从流水线上游接收数据,处理数据,再通过发送通道将数据发送给流水线下游。

实际中应用流水线模式时,一个阶段可能不需要接收所有的输入即可进入下一个阶段,这种情况下,该阶段应及时停止等待生产者的其它数据,生产者也应停止生产数据。

需要有一种机制让处于流水线下游的阶段把停止接收输入的消息通知给上游的生产者。

流水线模式中,负责消费的阶段中用 for range 消费完负责生产的阶段生产的所有数据;生产者通过 close 通道来通知多个消费者生产结束。

  • close 的通道通过闭包传入内部协程并作为返回值,生产者可以调用 close 发出结束信号,消费者的 for range 语句收到后结束循环。

数的平方

// https://go.dev/play/p/-jfIlNfjtJe
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n*n
        }
        close(out)
    }()
    return out
}
func main() {
    c := gen(2, 3)
    out := sq(c)
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
    for n := range sq(sq(gen(2, 3, 4))) {
        fmt.Println(n) // 16 81 256
    }
}

fan-out/fan-in 写法:

// https://go.dev/play/p/6GDrZNulXwB
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c) // fan-in
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
func main() {
    in := gen(2, 3, 4)
    // fan-out (generator)
    c1 := sq(in)
    c2 := sq(in)
    c3 := sq(in)
    // fan-in
    for n := range merge(c1, c2, c3) {
        fmt.Println(n) // 4, 9, 16(顺序不确定)
    }
}

下游通知上游示例

需要预知要发送的信号个数的方案:

// https://go.dev/play/p/YlLETPikmeS
func main() {
    in := gen(2, 3)
    c1 := sq(in)
    c2 := sq(in)
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out)
    // go output(c) 协程 2 个
    done <- struct{}{}
    done <- struct{}{}
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
            wg.Done()
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c) // fan-in
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

通过关闭通道来广播:

// https://go.dev/play/p/yqIaugS5j9F
func main() {
    done := make(chan struct{})
    defer close(done)
    in := gen(done, 2, 3)
    c1 := sq(done, in)
    c2 := sq(done, in)
    out := merge(done, c1, c2)
    fmt.Println(<-out)
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ...
}

交替打印(循环流水线)

worker 协程之间协调执行的顺序和打印的内容。

// https://go.dev/play/p/D5Eb3pDH6js
func worker(workerID int, chMyTurn, chNext chan int, chDone chan struct{}, limit int, wg *sync.WaitGroup) {
	defer func() {
		fmt.Printf("worker %d exited\n", workerID)
		wg.Done()
	}()
	for {
		select {
		case v := <-chMyTurn:
			fmt.Printf("worker %d prints %d\n", workerID, v)
			if v == limit {
				close(chDone)
				fmt.Printf("\nworker %d announced done\n", workerID)
				return
			}
			if chMyTurn != chNext {
				chNext <- v + 1 // 实际使用中可以根据流水线中不同步骤的预计工作量分别为之设置不同并发数
			} else {
                // 只有一个协程,必须新开协程以避免死锁
				go func() {
					chNext <- v + 1
				}()
			}
		case <-chDone:
			return
		}
	}
}

func main() {
	const numberOfWorkers = 4 // numberOfWorkers >= 1

	sigs := make([]chan int, numberOfWorkers)
	for i := range sigs {
		sigs[i] = make(chan int)
	}

	var wg sync.WaitGroup

	start, limit := 1, 123 // limit >= start
	chDone := make(chan struct{})
	for i := 0; i < numberOfWorkers; i++ {
		// 通道之间形成环状通信链路:ch1 -> ch2 -> ... -> chN -> ch1
		go worker(i, sigs[i], sigs[(i+1)%numberOfWorkers], chDone, limit, &wg)
		wg.Add(1)
	}
	sigs[0] <- start // main 协程启动子协程之间的协作

	wg.Wait() // 等待所有子协程退出

	fmt.Println("main goroutine exited")
}

超时和继续

超时示例:

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1 * time.Second) // 实际项目要用 time.After() 方法
    timeout <- true
}()

select 实现超时:

select {
case <-ch:
    // a read from ch has occurred
case <-timeout:
    // the read from ch has timed out
}

以下示例同时让多个协程去读数据库,只取第一个返回的作为结果。

func Query(conns []Conn, query string) Result {
    ch := make(chan Result)
    for _, conn := range conns {
        go func(c Conn) {
            select {
                case ch <- c.DoQuery(query):
            default:
            }
        }(conn)
    }
    return <-ch
}

代码解析:

  • for 循环中每个协程都会调用到 c.DoQuery()

  • 只有第一个返回的协程能写 ch 成功,其它协程的 c.DoQuery 执行完毕发现不能写 ch 通道(阻塞了),就走 default case 并返回,释放协程资源;

  • for 中的协程要能成功写入 ch 的前置条件是 <-ch 操作提前就绪(先有接收操作才不会阻塞),若 <-ch 操作在 c.DoQuery 返回前还未准备就绪,即 c.DoQuery 虽然已经返回结果但只能走 default case,导致执行结果丢失。解决办法是将 ch 改成缓冲通道,这样就不会因为没有提前准备好的接收操作而阻塞。

    // https://go.dev/play/p/8rXmmRwlORI
    // 模拟 <-ch 未准备就绪造成死锁
    func main() {
        ch := make(chan string)
        // ch := make(chan string, 10) // 用缓冲通道就可以修复 https://go.dev/play/p/0Axcd-4_2fm
        for i := 0; i < 10; i++ {
            go func(i int) {
                select {
                case ch <- DoQuery(i):
                    log.Printf("goroutine No.%d goes to DoQuery.\n", i)
                default:
                    log.Printf("goroutine No.%d goes to default case.\n", i)
                }
            }(i)
        }
        time.Sleep(1 * time.Second)
        log.Println(<-ch)
    }
    func DoQuery(i int) string {
        return fmt.Sprintf("goroutine %d returns hhh.", i)
    }
    

实用程序

计算目录中所有文件的哈希

串行处理:https://go.dev/play/p/HTAe-yLSL52

两阶段并行流水线:https://go.dev/play/p/sNL3oRo-3th

三阶段限制并发流水线:https://go.dev/play/p/gYTKhbIUxnW

Google search demo :fan-in,超时机制,返回最快返回结果的副本的结果。

实现 select case 优先级

// https://go.dev/play/p/f3odVslznKF
// https://github.com/kubernetes/kubernetes/blob/7509c4eb478a3ab94ff26be2b4068da53212d538/pkg/controller/nodelifecycle/scheduler/taint_manager.go#L244
func main() {
	c1 := make(chan int) // 优先级高
	c2 := make(chan int) // 优先级低
	go func() {
		for {
			select {
			case <-c1:
				doPrioritizedStuff()
			case <-c2:
			priority:
				for {
					// 内层 select 放 for 循环里可以应对 <-c1 多次发生的情况
					select {
					case <-c1:
						doPrioritizedStuff()
					default:
						fmt.Println("iterate within select")
						break priority
					}
				}
				doNormalStuff()
			}
		}
	}()
	time.Sleep(1 * time.Second)
}
// a bad attempt: https://go.dev/play/p/FhIm7hT7r41

References