TL;DR

父协程创建带超时信号的支持取消的 Context 传递给它的子协程们,子协程们(子协程可能是一个永不停止的后台任务)监听取消信号的通道(for { select {}}),若通道可读,表示取消信号已发出,子协程们按照各自的逻辑结束自己的工作并返回,释放资源,避免泄露。

父协程可以提前主动发出取消信号,若设置了超时时间,超时后也会发出取消信号。

子协程超时,通过调用共同祖先的 ContextCancelFunc 去通知兄弟子协程,这样可以实现一个子协程失败,另一个子协程若未开始执行就放弃执行的效果。

若可以预估一个操作的耗时,可以获取 Context 的截止时间,如果该操作无法在截止时间之前完成,则可以干脆不开始执行。实际上预估很困难。

已经开始执行的子任务,收到取消信号也无法终止,只能等其执行完毕,可以用一个新协程去 drain 子任务的传递返回结果的通道,避免空等。

func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    c := make(chan error, 1)
    req = req.WithContext(ctx)
    go func() { c <- f(http.DefaultClient.Do(req)) }()
    select {
    case <-ctx.Done():
        <-c // Wait for f to return. // maybe drain channel c in another goroutine
        return ctx.Err()
    case err := <-c:
        return err
    }
}

Context

Go 服务端会用独立的协程去处理每个客户端请求,请求处理器通常会开启额外的协程去操作数据库、执行 RPC 调用等操作,这样的一组相关协程通常都需要访问本次请求的关联参数,而当请求被取消或者超时后,处理该请求的一系列协程应该尽快退出(通常用 select 处理不同信号),让系统可以回收资源。

context 包可以跨过 API 界限,方便地在和某次请求相关的协程之间传递请求的关联参数、取消信号和截止信号。

服务端接收请求的处理器会创建一个 Context,处理器调用的其它服务端方法接收该 Context 作为参数,整条调用链上的函数必须传递 Context 或是基于收到的 Context 衍生出 Context 的副本

Context 类型:

// https://pkg.go.dev/context
type Context interface {
    Done() <-chan struct{}
    Err() error
    Deadline() (deadline time.Time, ok bool)
    Value(key interface{}) interface{}
}
  • Done:返回一个只读通道,该通道在 Context 取消或超时后关闭,起到取消信号的作用,接收到取消信号的方法应停止工作并返回。

  • ErrContext 被取消、Done 返回的通道关闭后,Err 提示 Context 被取消的原因。

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
    }
    
  • Deadline:返回 Context 何时会被取消。

    • 可以利用此截止时间设置 I/O 操作的超时时间。
    • 其它函数可以根据此截止时间决定是否开始工作,剩余时间太短可能就没必要开始了。
  • Value:返回某个 key 的值,没有则返回 nil。

通常,请求的处理器返回时和该请求相关的 Context 就可以被取消。

Context 的方法支持并发调用,可以将一个 Context 传入多个协程,取消该 Context 后可以通知到所有相关协程。

Context 结构体没有包含 Cancel 方法的原因和 Done 返回的通道是只读的原因相同:发送取消信号的函数和接收取消信号的函数通常不是同一个,当一个父操作为它的子操作开启新的协程时,子操作不应该有取消父操作的能力

我们可以从已有的 Context 中衍生出新的 Context,这些 Context 构成一个树形结构,当一个 Context 被取消时,由它衍生出来的所有 Context 也会被取消。

  • Background 返回一个非 nil 的空 Context,它是所有 Context 树的根,永远不会被取消、没有超时、没有值,通常在 maininit 和测试中使用,作为接收到的请求的顶层 Context
  • TODO 返回一个非 nil 的空 Context,适用于不确定使用哪个 ContextContext 还不可用(其它函数还没有扩展,尚不能接收 Context 参数)时的情况。
  • WithCancelWithDeadlineWithTimeout 可以从 Context 中衍生出新的 Context 并返回一个 CancelFunc
    • 衍生出来的 Context 可以先于父 Context 被取消。
    • CancelFunc 会取消子孙 Context、移除父 Context 对子 Context 的引用、停止相关的计时器,即会释放和 Context 相关的资源,在当前 Context 下的操作执行完毕后应尽早调用 CancelFunc。若未能调用 CancelFunc 会导致子孙 Context 内存泄露,直到父 Context 取消或计时器到时间后才会被回收(go vet 工具可以检查这一问题)。
    • CancelFunc 作用是命令一个操作停止工作,它不会等待该操作执行完毕。
    • CancelFunc 并发安全,第一次调用后后续再去调用没有任何效果。
    • 在使用多副本时,WithCancel 可以取消冗余的请求。
    • WithTimeout 可以用来为发往后端服务的请求设置超时时间,超过超时时间后,Context 自动被取消。
  • WithValue 可以为 Context 绑定值。
    • 用于携带和请求相关的数据;
    • key 要支持相等比较,且为了避免和其它使用 Context 的包发生冲突,不要使用语言内置的类型,而是为 key 定义新的类型;
    • val 要是并发安全的。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
type CancelFunc func()
func WithValue(parent Context, key, val interface{}) Context

使用 Context 的规则:

  1. 不将 Context 保存在结构体中,而是显式地将它传递给每一个需要的函数,Context 要是函数的第一个参数,通常命名为 ctx

    • Context 不保存在结构体中而是直接传递,使用起来清晰,而且可以方便地为每个函数调用设置独立的超时、取消等信号。若 Context 保存在结构体中,会让 Context 的作用范围不清晰,而且如果不能有效地设置独立的超时、取消信号,则可能会让任务堆积。这一规则的例外情况是保留接口的向后兼容性的场景,但即使是这种场景,首选的方案也是复制出一个功能相同但是额外接收 Context 参数的新方法。

      type Worker struct {
          ctx context.Context
      }
      func New(ctx context.Context) *Worker {
          return &Worker{ctx: ctx}
      }
      func (w *Worker) Fetch() (*Work, error) {
          _ = w.ctx // A shared w.ctx is used for cancellation, deadlines, and metadata.
      }
      func (w *Worker) Process(work *Work) error {
          _ = w.ctx // A shared w.ctx is used for cancellation, deadlines, and metadata.
      }
      // 迫使 Fetch 和 Process 共享一个 Context,而它们的调用者可能有设置独立 Context 的需求
      
  2. 不要传递 nil Context,不确定使用哪个 Context 时传 context.TODO

  3. Value 方法只用来存储需要在进程和接口间流转的请求数据,不要用来传递函数的可选参数。

Google 的实践是收到的请求和对外发送的请求的调用链路上所有函数的第一个参数都要是 Context,这样做的好处是可以方便地控制超时、取消以及数据在程序中的流转。

Context 是为请求中的数据和取消操作定义的一个通用接口,可以方便开发者之间共享代码。

实现原理

cancelCtxtimerCtxvalueCtx 都继承了 Context;3 种 Context 实现可以互为父节点,组合成不同的应用形式。

cancelCtx

// https://github.com/golang/go/blob/bc51e930274a5d5835ac8797978afc0864c9e30c/src/context/context.go#L344
type cancelCtx struct {
	Context
	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}
func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err // 设置 err 来说明关闭原因
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()
	if removeFromParent {
		removeChild(c.Context, c) // c.Context 是父 Context
	}
}
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c) // 将自身添加到父节点
	return &c, func() { c.cancel(true, Canceled) }
}

timerCtx

timerCtxcancelCtx 基础上增加了 deadline,用于标识自动调用 cancel 的时间,timer 就是一个触发自动 cancel 的定时器。

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.
	deadline time.Time
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() { // 自动触发 cancel
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

valueCtx

type valueCtx struct {
	Context
	key, val interface{}
}
func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key) // 父节点的 value
}

References