细说schedule
Publish on 2025-01-19

Overview

Go scheduler 在源码中的结构体为 schedt,保存调度器的状态信息、全局的可运行 G 队列等(也就是之前提到过的 global goroutine queue)。源码如下:

// 保存调度器的信息
type schedt struct {
    // accessed atomically. keep at top to ensure alignment on 32-bit systems.
    // 需以原子访问访问。
    // 保持在 struct 顶部,以使其在 32 位系统上可以对齐
    goidgen  uint64
    lastpoll uint64

    lock mutex

    // 由空闲的工作线程组成的链表
    midle        muintptr // idle m's waiting for work
    // 空闲的工作线程数量
    nmidle       int32    // number of idle m's waiting for work
    // 空闲的且被 lock 的 m 计数
    nmidlelocked int32    // number of locked m's waiting for work
    // 已经创建的工作线程数量
    mcount       int32    // number of m's that have been created
    // 表示最多所能创建的工作线程数量
    maxmcount    int32    // maximum number of m's allowed (or die)

    // goroutine 的数量,自动更新
    ngsys uint32 // number of system goroutines; updated atomically

    // 由空闲的 p 结构体对象组成的链表
    pidle      puintptr // idle p's
    // 空闲的 p 结构体对象的数量
    npidle     uint32
    nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

    // Global runnable queue.
    // 全局可运行的 G队列
    runqhead guintptr // 队列头
    runqtail guintptr // 队列尾
    runqsize int32 // 元素数量

    // Global cache of dead G's.
    // dead G 的全局缓存
    // 已退出的 goroutine 对象,缓存下来
    // 避免每次创建 goroutine 时都重新分配内存
    gflock       mutex
    gfreeStack   *g
    gfreeNoStack *g
    // 空闲 g 的数量
    ngfree       int32

    // Central cache of sudog structs.
    // sudog 结构的集中缓存
    sudoglock  mutex
    sudogcache *sudog

    // Central pool of available defer structs of different sizes.
    // 不同大小的可用的 defer struct 的集中缓存池
    deferlock mutex
    deferpool [5]*_defer

    gcwaiting  uint32 // gc is waiting to run
    stopwait   int32
    stopnote   note
    sysmonwait uint32
    sysmonnote note

    // safepointFn should be called on each P at the next GC
    // safepoint if p.runSafePointFn is set.
    safePointFn   func(*p)
    safePointWait int32
    safePointNote note

    profilehz int32 // cpu profiling rate

    // 上次修改 gomaxprocs 的纳秒时间
    procresizetime int64 // nanotime() of last change to gomaxprocs
    totaltime      int64 // ∫gomaxprocs dt up to procresizetime
}

在程序运行过程中,schedt 对象只有一份实体,它维护了调度器的所有信息。

在 proc.go 和 runtime2.go 文件中,有一些很重要全局的变量:

// 所有 g 的长度
allglen     uintptr

// 保存所有的 g
allgs    []*g

// 保存所有的 m
allm        *m

// 保存所有的 p,_MaxGomaxprocs = 1024
allp        [_MaxGomaxprocs + 1]*p

// p 的最大值,默认等于 ncpu
gomaxprocs  int32

// 程序启动时,会调用 osinit 函数获得此值
ncpu        int32

// 调度器结构体对象,记录了调度器的工作状态
sched       schedt

// 代表进程的主线程
m0           m

// m0 的 g0,即 m0.g0 = &g0,即全局的 g0
g0           g

在程序初始化时,这些全局变量都会被初始化为零值:指针被初始化为 nil 指针,切片被初始化为 nil 切片,int 被初始化为 0,结构体的所有成员变量按其类型被初始化为对应的零值。

因此程序刚启动时 allgs,allm 和allp 都不包含任何 g,m 和 p。

不仅是 Go 程序,系统加载可执行文件大概都会经过这几个阶段:

  1. 从磁盘上读取可执行文件,加载到内存
  2. 创建进程和主线程
  3. 为主线程分配栈空间
  4. 把由用户在命令行输入的参数拷贝到主线程的栈
  5. 把主线程放入操作系统的运行队列等待被调度

浅尝辄止

调整SP(stack pointer)到一个地址是 16 的倍数的位置

SUBQ	$(4*8+7), SP		// 2args 2auto
// 调整栈顶寄存器使其按 16 个字节对齐
ANDQ	$~15, SP

Untitled.png

Untitled.png

上面两张图中,左侧用箭头标注了 16 字节对齐的位置。第一步表示向下移动 39 B,第二步表示与 ~15 相与。

存在两种情况,这也是第一步将 SP 下移的时候,多移了 7 个 Byte 的原因。第一张图里,与 ~15 相与的时候,SP 值减少了 1,第二张图则减少了 9。最后都是移位到了 16 字节对齐的位置。

两张图的共同点是 SP 与 argc 中间多出了 16 个字节的空位。

至于为什么进行 16 个字节对齐,就比较好理解了:因为 CPU 有一组 SSE 指令,这些指令中出现的内存地址必须是 16 的倍数。

初始化g0

g0 栈的作用就是为运行 runtime 代码提供一个“环境”。

这里直接叠甲过,看初始化g0完以后是啥样子:

Untitled.png

stack.hi和stack.lo分别指向栈的高位和低位。

主线程绑定m0

因为 m0 是全局变量,而 m0 又要绑定到工作线程才能执行。我们又知道,runtime 会启动多个工作线程,每个线程都会绑定一个 m0。而且,代码里还得保持一致,都是用 m0 来表示。这就要用到线程本地存储的知识了,也就是常说的 TLS(Thread Local Storage)。简单来说,TLS 就是线程本地的私有的全局变量。

如果需要在一个线程内部的各个函数调用都能访问、但其它线程不能访问的变量(被称为 static memory local to a thread,线程局部静态变量),就需要新的机制来实现。这就是 TLS。

设置 tls 的函数 runtime·settls(SB) 位于源码 src/runtime/sys_linux_amd64.s 处,主要内容就是通过一个系统调用将 fs 段基址设置成 m.tls[1] 的地址,而 fs 段基址又可以通过 CPU 里的寄存器 fs 来获取。

而每个线程都有自己的一组 CPU 寄存器值,操作系统在把线程调离 CPU 时会帮我们把所有寄存器中的值保存在内存中,调度线程来运行时又会从内存中把这些寄存器的值恢复到 CPU。

这样,工作线程代码就可以通过 fs 寄存器来找到 m.tls。

问题:为什么一定要将g0存储到tls中,明明还有一个

最终,

tls[0] = g0
m0.g0 = &g0
g0.m = &m0

就把 m0,g0,m.tls[0] 串联起来了。通过 m.tls[0] 可以找到 g0,通过 g0 可以找到 m0(通过 g 结构体的 m 字段)。并且,通过 m 的字段 g0,m0 也可以找到 g0。于是,主线程和 m0,g0 就关联起来了。

从这里还可以看到,保存在主线程本地存储中的值是 g0 的地址,也就是说工作线程的私有全局变量其实是一个指向 g 的指针而不是指向 m 的指针。

目前这个指针指向g0,表示代码正运行在 g0 栈。

于是,前面的图又增加了新的玩伴 m0:

Untitled.png

schedinit

首先回顾go程序初始化的过程:

  • call osinit。初始化系统核心数。
  • call schedinit。初始化调度器。
  • make & queue new G。创建新的 goroutine。
  • call runtime·mstart。调用 mstart,启动调度。
  • The new G calls runtime·main。在新的 goroutine 上运行 runtime.main 函数。

初始化 allm

schedt是一个全局变量,所有的线程都可以操作它(要加锁),同时,会在schedinit中完成对m0的初始化。

// 初始化 m
func mcommoninit(mp *m) {
    // 初始化过程中_g_ = g0
    _g_ := getg()

    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        callers(1, mp.createstack[:])
    }

    // random 初始化
    mp.fastrand = 0x49f6428a + uint32(mp.id) + uint32(cputicks())
    if mp.fastrand == 0 {
        mp.fastrand = 0x49f6428a
    }

    lock(&sched.lock)
    // 设置 m 的 id
    mp.id = sched.mcount
    sched.mcount++
    // 检查已创建系统线程是否超过了数量限制(10000)
    checkmcount()

    // ………………省略了初始化 gsignal

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    mp.alllink = allm

    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // ………………
}

因为 sched 是一个全局变量,多个线程同时操作 sched 会有并发问题,因此先要加锁,操作结束之后再解锁。

mp.id = sched.mcount
sched.mcount++
checkmcount()

可以看到,m0 的 id 是 0,并且之后创建的 m 的 id 是递增的。checkmcount() 函数检查已创建系统线程是否超过了数量限制(10000)。

mp.alllink = allm

将 m 挂到全局变量 allm 上,allm 是一个指向 m 的的指针。

atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))

这一行将 allm 变成 m 的地址,这样变成了一个循环链表。之后再新建 m 的时候,新 m 的 alllink 就会指向本次的 m,最后 allm 又会指向新创建的 m。

Untitled.png

上图中,1 将 m0 挂在 allm 上。之后,若新创建 m,则 m1 会和 m0 相连。

初始化 allp

跳过一些其他的初始化代码,继续往后看:

procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n
}
if procs > _MaxGomaxprocs {
    procs = _MaxGomaxprocs
}

这里就是设置 procs,它决定创建 P 的数量。ncpu 这里已经被赋上了系统的核心数,因此代码里不设置 GOMAXPROCS 也是没问题的。这里还限制了 procs 的最大值,为 1024。

来看最后一个核心的函数:

// src/runtime/proc.go

func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || old > _MaxGomaxprocs || nprocs <= 0 || nprocs > _MaxGomaxprocs {
        throw("procresize: invalid arg")
    }

    // ……………………

    // update statistics
    // 更新数据
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // 初始化所有的 P
    for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            // 申请新对象
            pp = new(p)
            pp.id = i
            // pp 的初始状态为 stop
            pp.status = _Pgcstop
            pp.sudogcache = pp.sudogbuf[:0]
            for i := range pp.deferpool {
                pp.deferpool[i] = pp.deferpoolbuf[i][:0]
            }
            // 将 pp 存放到 allp 处
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }

        // ……………………

    }

    // 释放多余的 P。由于减少了旧的 procs 的数量,因此需要释放
    // ……………………

    // 获取当前正在运行的 g 指针,初始化时 _g_ = g0
    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        // 继续使用当前 P
        _g_.m.p.ptr().status = _Prunning
    } else {
        // 初始化时执行这个分支

        // ……………………

        _g_.m.p = 0
        _g_.m.mcache = nil
        // 取出第 0 号 p
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 将 p0 和 m0 关联起来
        acquirep(p)
        if trace.enabled {
            traceGoStart()
        }
    }
    var runnablePs *p
    // 下面这个 for 循环把所有空闲的 p 放入空闲链表
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        // allp[0] 跟 m0 关联了,不会进行之后的“放入空闲链表”
        if _g_.m.p.ptr() == p {
            continue
        }

        // 状态转为 idle
        p.status = _Pidle
        // p 的 LRQ 里没有 G
        if runqempty(p) {
            // 放入全局空闲链表
            pidleput(p)
        } else {
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    // 返回有本地任务的 P 链表
    return runnablePs
}

代码比较长,这个函数不仅是初始化的时候会执行到,在中途改变 procs 的值的时候,仍然会调用它。所有存在很多一般不用关心的代码,因为一般不会在中途重新设置 procs 的值。我把初始化无关的代码删掉了,这样会更清晰一些。

函数先是从堆上创建了 nproc 个 P,并且把 P 的状态设置为 _Pgcstop,现在全局变量 allp 里就维护了所有的 P。

接着,调用函数 acquirep 将 p0 和 m0 关联起来。我们来详细看一下:

func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    acquirep1(_p_)

    // have p; write barriers now allowed
    _g_ := getg()
    _g_.m.mcache = _p_.mcache

    // ……………………
}

先调用 acquirep1 函数真正地进行关联,之后,将 p0 的 mcache 资源赋给 m0。再来看 acquirep1:

func acquirep1(_p_ *p) {
    _g_ := getg()

    // ……………………

    _g_.m.p.set(_p_)
    _p_.m.set(_g_.m)
    _p_.status = _Prunning
}

可以看到就是一些字段相互设置,执行完成后:

g0.m.p = p0
p0.m = m0

并且,p0 的状态变成了 _Prunning。

接下来是一个循环,它将除了 p0 的所有非空闲的 P,放入 P 链表 runnablePs,并返回给 procresize 函数的调用者,并由调用者来“调度”这些 P。

总结

  1. 使用 make([]p, nprocs) 初始化全局变量 allp,即 allp = make([]p, nprocs)
  2. 循环创建并初始化 nprocs 个 p 结构体对象并依次保存在 allp 切片之中
  3. 把 m0 和 allp[0] 绑定在一起,即 m0.p = allp[0],allp[0].m = m0
  4. 把除了 allp[0] 之外的所有 p 放入到全局变量 sched 的 pidle 空闲队列之中

说明一下,最后一步,代码里是将所有空闲的 P 放入到调度器的全局空闲队列;对于非空闲的 P(本地队列里有 G 待执行),则是生成一个 P 链表,返回给 procresize 函数的调用者。

最后我们将 allp 和 allm 都添加到图上:

Untitled.png

© 2024 humbornjo :: based on 
nobloger  ::  rss