Go 并发模式之 Context
Contents
TL;DR
父协程创建带超时信号的支持取消的 Context
传递给它的子协程们,子协程们(子协程可能是一个永不停止的后台任务)监听取消信号的通道(for { select {}}
),若通道可读,表示取消信号已发出,子协程们按照各自的逻辑结束自己的工作并返回,释放资源,避免泄露。
父协程可以提前主动发出取消信号,若设置了超时时间,超时后也会发出取消信号。
子协程超时,通过调用共同祖先的 Context
的 CancelFunc
去通知兄弟子协程,这样可以实现一个子协程失败,另一个子协程若未开始执行就放弃执行的效果。
若可以预估一个操作的耗时,可以获取 Context
的截止时间,如果该操作无法在截止时间之前完成,则可以干脆不开始执行。实际上预估很困难。
已经开始执行的子任务,收到取消信号也无法终止,只能等其执行完毕,可以用一个新协程去 drain 子任务的传递返回结果的通道,避免空等。
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
c := make(chan error, 1)
req = req.WithContext(ctx)
go func() { c <- f(http.DefaultClient.Do(req)) }()
select {
case <-ctx.Done():
<-c // Wait for f to return. // maybe drain channel c in another goroutine
return ctx.Err()
case err := <-c:
return err
}
}
Context
Go 服务端会用独立的协程去处理每个客户端请求,请求处理器通常会开启额外的协程去操作数据库、执行 RPC 调用等操作,这样的一组相关协程通常都需要访问本次请求的关联参数,而当请求被取消或者超时后,处理该请求的一系列协程应该尽快退出(通常用 select
处理不同信号),让系统可以回收资源。
context
包可以跨过 API 界限,方便地在和某次请求相关的协程之间传递请求的关联参数、取消信号和截止信号。
服务端接收请求的处理器会创建一个 Context
,处理器调用的其它服务端方法接收该 Context
作为参数,整条调用链上的函数必须传递 Context
或是基于收到的 Context
衍生出 Context
的副本。
Context
类型:
// https://pkg.go.dev/context
type Context interface {
Done() <-chan struct{}
Err() error
Deadline() (deadline time.Time, ok bool)
Value(key interface{}) interface{}
}
-
Done
:返回一个只读通道,该通道在Context
取消或超时后关闭,起到取消信号的作用,接收到取消信号的方法应停止工作并返回。 -
Err
:Context
被取消、Done
返回的通道关闭后,Err
提示Context
被取消的原因。for { select { case <-ctx.Done(): return ctx.Err() default: } }
-
Deadline
:返回Context
何时会被取消。- 可以利用此截止时间设置 I/O 操作的超时时间。
- 其它函数可以根据此截止时间决定是否开始工作,剩余时间太短可能就没必要开始了。
-
Value
:返回某个key
的值,没有则返回 nil。
通常,请求的处理器返回时和该请求相关的 Context
就可以被取消。
Context
的方法支持并发调用,可以将一个 Context
传入多个协程,取消该 Context
后可以通知到所有相关协程。
Context
结构体没有包含 Cancel
方法的原因和 Done
返回的通道是只读的原因相同:发送取消信号的函数和接收取消信号的函数通常不是同一个,当一个父操作为它的子操作开启新的协程时,子操作不应该有取消父操作的能力。
我们可以从已有的 Context
中衍生出新的 Context
,这些 Context
构成一个树形结构,当一个 Context
被取消时,由它衍生出来的所有 Context
也会被取消。
Background
返回一个非 nil 的空Context
,它是所有Context
树的根,永远不会被取消、没有超时、没有值,通常在main
、init
和测试中使用,作为接收到的请求的顶层Context
。TODO
返回一个非 nil 的空Context
,适用于不确定使用哪个Context
或Context
还不可用(其它函数还没有扩展,尚不能接收Context
参数)时的情况。WithCancel
、WithDeadline
和WithTimeout
可以从Context
中衍生出新的Context
并返回一个CancelFunc
。- 衍生出来的
Context
可以先于父Context
被取消。 CancelFunc
会取消子孙Context
、移除父Context
对子Context
的引用、停止相关的计时器,即会释放和Context
相关的资源,在当前Context
下的操作执行完毕后应尽早调用CancelFunc
。若未能调用CancelFunc
会导致子孙Context
内存泄露,直到父Context
取消或计时器到时间后才会被回收(go vet
工具可以检查这一问题)。CancelFunc
作用是命令一个操作停止工作,它不会等待该操作执行完毕。CancelFunc
并发安全,第一次调用后后续再去调用没有任何效果。- 在使用多副本时,
WithCancel
可以取消冗余的请求。 WithTimeout
可以用来为发往后端服务的请求设置超时时间,超过超时时间后,Context
自动被取消。
- 衍生出来的
WithValue
可以为Context
绑定值。- 用于携带和请求相关的数据;
key
要支持相等比较,且为了避免和其它使用Context
的包发生冲突,不要使用语言内置的类型,而是为key
定义新的类型;val
要是并发安全的。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
type CancelFunc func()
func WithValue(parent Context, key, val interface{}) Context
使用 Context
的规则:
-
不将
Context
保存在结构体中,而是显式地将它传递给每一个需要的函数,Context
要是函数的第一个参数,通常命名为ctx
;-
Context
不保存在结构体中而是直接传递,使用起来清晰,而且可以方便地为每个函数调用设置独立的超时、取消等信号。若Context
保存在结构体中,会让Context
的作用范围不清晰,而且如果不能有效地设置独立的超时、取消信号,则可能会让任务堆积。这一规则的例外情况是保留接口的向后兼容性的场景,但即使是这种场景,首选的方案也是复制出一个功能相同但是额外接收Context
参数的新方法。type Worker struct { ctx context.Context } func New(ctx context.Context) *Worker { return &Worker{ctx: ctx} } func (w *Worker) Fetch() (*Work, error) { _ = w.ctx // A shared w.ctx is used for cancellation, deadlines, and metadata. } func (w *Worker) Process(work *Work) error { _ = w.ctx // A shared w.ctx is used for cancellation, deadlines, and metadata. } // 迫使 Fetch 和 Process 共享一个 Context,而它们的调用者可能有设置独立 Context 的需求
-
-
不要传递 nil
Context
,不确定使用哪个Context
时传context.TODO
; -
Value
方法只用来存储需要在进程和接口间流转的请求数据,不要用来传递函数的可选参数。
Google 的实践是收到的请求和对外发送的请求的调用链路上所有函数的第一个参数都要是 Context
,这样做的好处是可以方便地控制超时、取消以及数据在程序中的流转。
Context
是为请求中的数据和取消操作定义的一个通用接口,可以方便开发者之间共享代码。
实现原理
cancelCtx
、timerCtx
、valueCtx
都继承了 Context
;3 种 Context
实现可以互为父节点,组合成不同的应用形式。
cancelCtx
// https://github.com/golang/go/blob/bc51e930274a5d5835ac8797978afc0864c9e30c/src/context/context.go#L344
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err // 设置 err 来说明关闭原因
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c) // c.Context 是父 Context
}
}
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c) // 将自身添加到父节点
return &c, func() { c.cancel(true, Canceled) }
}
timerCtx
timerCtx
在 cancelCtx
基础上增加了 deadline
,用于标识自动调用 cancel
的时间,timer
就是一个触发自动 cancel
的定时器。
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() { // 自动触发 cancel
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
valueCtx
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key) // 父节点的 value
}
References