golang调度 调度流程
1. 初始化
调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等
func schedinit() { sched.maxmcount = 10000 // 设置m的最大值为10000 mcommoninit(_g_.m) //初始化当前m // 确认P的个数 // 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // 调整P的个数,这里是新分配procs个P // 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了 if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } ... }
procresize方法主要完成以下任务:
- 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
- 遍历p的缓存,将未初始化的p进行初始化
- 对于收缩的情况,将收缩的p进行回收处理
- 分别将空闲的p和有任务的p加入空闲链表和工作链表
下面是procresize()的源码:
//全局数据结构: allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable sched schedt //全局调度器(综述文中有介绍) // 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整 func procresize(nprocs int32) *p { ... old := gomaxprocs // 扩张allp数组 if nprocs > int32(len(allp)) { lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { // 分配nprocs个*p nallp := make([]*p, nprocs) copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // 初始化新的p for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) ... // 将pp保存到allp数组里, allp[i] = pp atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } ... } // 释放无用的p for i := nprocs; i < old; i++ { p := allp[i] // 任务转移 // 本地任务队列转换到全局队列 for p.runqhead != p.runqtail { p.runqtail-- gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() globrunqputhead(gp) } // 优先执行的也转移到全局 if p.runnext != 0 { globrunqputhead(p.runnext.ptr()) p.runnext = 0 } // 后台标记的g也转移 if gp := p.gcBgMarkWorker.ptr(); gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) p.gcBgMarkWorker.set(nil) } // 做一些内存释放等操作 ... } ... //将p放入队列 var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] // 如果是当前的M绑定的P,不放入P空闲链表 // 否则更改P的状态为_Pidle,放入P空闲链表 if _g_.m.p.ptr() == p { continue } p.status = _Pidle if runqempty(p) { pidleput(p)// 将空闲p放入全局空闲链表 } else { // 非空闲的通过绑定m,链起来 p.m.set(mget()) p.link.set(runnablePs) // 最后一个空闲的不加入空闲列表 直接返回去调度使用 runnablePs = p } } }
新建的无任务p都会被放到空闲链表中:
func pidleput(_p_ *p) { if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } _p_.link = sched.pidle //通过p的link形成链表 sched.pidle.set(_p_) // 将sched.npidle加1 atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic }
默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。
完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作
golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是代价很大,会触发STW
lock(&sched.lock) ret := int(gomaxprocs) unlock(&sched.lock) if n <= 0 || n == ret { return ret } // 有stw和重启世界的过程 stopTheWorld("GOMAXPROCS") // newprocs will be processed by startTheWorld newprocs = int32(n) startTheWorld() return ret
以上便是golang初始化调度器的所有步骤,具体:
- 调用schedinit,初始化maxmcount和gomaxprocs的数量
- sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
- schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
- 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。
2. g的创建
在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。
func newproc(siz int32, fn *funcval) { //用fn + PtrSize 获取第一个参数的地址,也就是argp //这里要了解一下go的堆栈 argp := add(unsafe.Pointer(&fn), sys.PtrSize) //用siz - 8 获取pc地址 (汇编实现) pc := getcallerpc() // 用g0的栈创建G对象 systemstack(func() { newproc1(fn, (*uint8)(argp), siz, pc) }) }
了解一下保存需要执行的业务函数的funcval:
funcval 是一个变长结构,第一个成员是函数指针,往后是fn的参数,个数长度可变,但是起始位置固定(在有参数的的情况下)。将*funcval的地址跳过一个指针长度(fn)便是参数的起始地址了。
// 上面的 add 是跳过这个 fn,到参数的起始位置 type funcval struct { fn uintptr // variable-size, fn-specific data here }
newproc()获取到参数的地址和callerpc,然后调用newproc1().
流程如下图:
代码如下:
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行 func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg // 从m中获取p _p_ := _g_.m.p.ptr() // 从gfree list获取g newg := gfget(_p_) // 如果没获取到g,则新建一个 if newg == nil { // 分配栈为 2k 大小的G对象 newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead // 添加到allg数组,防止gc扫描清除掉 allgadd(newg) } // 参数大小+稍微一点空间 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign // 新协程的栈顶计算,将栈顶减去参数占用的空间 sp := newg.stack.hi - totalSize spArg := sp // 如果有参数 if narg > 0 { // copy参数到栈上 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) ... //一些gc相关的工作省略 } // 初始化G的gobuf,保存sp,pc,任务函数等 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp // 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function // sched.g保存当前新的G newg.sched.g = guintptr(unsafe.Pointer(newg)) // 将当前的pc压入栈,保存g的任务函数为pc gostartcallfn(&newg.sched, fn) // gopc保存newproc的pc newg.gopc = callerpc // 任务函数的地址 newg.startpc = fn.fn ... // 更改当前g的状态为_Grunnable casgstatus(newg, _Gdead, _Grunnable) // 生成唯一的goid newg.goid = int64(_p_.goidcache) // 将当前新生成的g,放入队列 runqput(_p_, newg, true) // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } }
g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist
//从缓存列表获取一个空闲的g func gfget(_p_ *p) *g { retry: gp := _p_.gfree if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) { //本地空闲队列为空的时候,从全局中获取,需要加锁 lock(&sched.gflock) //一次转移最多32个空闲到本地p for _p_.gfreecnt < 32 { if sched.gfreeStack != nil { gp = sched.gfreeStack //获取g sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g } else if sched.gfreeNoStack != nil { gp = sched.gfreeNoStack sched.gfreeNoStack = gp.schedlink.ptr() } else { break } _p_.gfreecnt++ sched.ngfree-- gp.schedlink.set(_p_.gfree) _p_.gfree = gp } unlock(&sched.gflock) goto retry } // 获取到g if gp != nil { // 调整链表头及个数 _p_.gfree = gp.schedlink.ptr() _p_.gfreecnt-- // 堆栈为空就分配 if gp.stack.lo == 0 { // Stack was deallocated in gfput. Allocate a new one. systemstack(func() { gp.stack = stackalloc(_FixedStack) }) gp.stackguard0 = gp.stack.lo + _StackGuard } else { ... } } return gp }
当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。
func gfput(_p_ *p, gp *g) { // 处理堆栈 stksize := gp.stack.hi - gp.stack.lo // 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了) if stksize != _FixedStack { stackfree(gp.stack) gp.stack.lo = 0 gp.stack.hi = 0 gp.stackguard0 = 0 } // 处理p的复用链表 gp.schedlink.set(_p_.gfree) _p_.gfree = gp _p_.gfreecnt++ // 超过64个,放回部分到全局队列 if _p_.gfreecnt >= 64 { lock(&sched.gflock) for _p_.gfreecnt >= 32 { _p_.gfreecnt-- gp = _p_.gfree _p_.gfree = gp.schedlink.ptr() if gp.stack.lo == 0 { gp.schedlink.set(sched.gfreeNoStack) sched.gfreeNoStack = gp } else { gp.schedlink.set(sched.gfreeStack) sched.gfreeStack = gp } sched.ngfree++ } unlock(&sched.gflock) } }
malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。
//一般传入的堆栈大小默认为2k func malg(stacksize int32) *g { newg := new(g) if stacksize >= 0 { stacksize = round2(_StackSystem + stacksize)// 对齐 systemstack(func() { newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈 }) newg.stackguard0 = newg.stack.lo + _StackGuard // 设置 stackguard newg.stackguard1 = ^uintptr(0) } return newg }
创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g
var ( // 存储所有g的数组 allgs []*g // 保护allgs的互斥锁 allglock mutex allglen uintptr ) func allgadd(gp *g) { lock(&allglock) allgs = append(allgs, gp) allglen = uintptr(len(allgs)) unlock(&allglock) }
当获取到一个可用的g之后:
- 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
- 添加到g到p的本地队列
- p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
- 若存在有空闲的p及未自旋的m,调用wakep()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
// 尝试将G放到P的本地队列 func runqput(_p_ *p, gp *g, next bool) { if next { retryNext: oldnext := _p_.runnext // 将G赋值给_p_.runnext // 最新的G优先级最高,最可能先被执行。 // 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点, // 让别的core执行,这样才能充分利用多核,提高并发能 if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } gp = oldnext.ptr() } retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail // 如果本地队列还有剩余的位置,将G插入本地队列的尾部 if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } // 本地队列已满,放入全局队列 if runqputslow(_p_, gp, h, t) { return } goto retry } // 如果本地满了以后,一次将本地的一半的G转移到全局队列 func runqputslow(_p_ *p, gp *g, h, t uint32) bool { //首先转移一半到全局队列,省略 ... // 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。 lock(&sched.lock) globrunqputbatch(batch[0], batch[n], int32(n+1)) unlock(&sched.lock) return true }
放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务
到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力。
3. 系统线程m
在golang中有三种系统线程:
- 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
- 运行sysmon的线程
- 普通用户线程,用来与p绑定,运行g中的任务的线程,
主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。
在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:
// 尝试获取一个M来运行可运行的G func wakep() { // 如果有其他的M处于自旋状态,那么就不管了,直接返回 // 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。 if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) } // startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回 // 获取到P后,在尝试获取M,如果获取不到就新建一个M func startm(_p_ *p, spinning bool) { lock(&sched.lock) // 如果P为nil,则尝试获取一个空闲P if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) return } } // 获取一个空闲的M mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } // 如果获取不到,则新建一个,新建完成后就立即返回 newm(fn, _p_) return } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning //标记该M是否在自旋 mp.nextp.set(_p_) // 暂存P notewakeup(&mp.park) // 唤醒M }
上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。
下面从新建m开始:
func newm(fn func(), _p_ *p) { // 根据fn和p和绑定一个m对象 mp := allocm(_p_, fn) // 设置当前m的下一个p为_p_ mp.nextp.set(_p_) ... // 真正的分配os thread newm1(mp) } // 分配一个m,且不关联任何一个os thread func allocm(_p_ *p, fn func()) *m { _g_ := getg() _g_.m.locks++ // disable GC because it can be called from sysmon if _g_.m.p == 0 { acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。 } ... mp := new(m) mp.mstartfn = fn mcommoninit(mp) //初始化当前m // 给g0分配一定的堆栈 if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" { mp.g0 = malg(-1) //这些系统必须使用系统的栈 } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k } mp.g0.m = mp //绑定的p和当前m的p一样,解绑 if _p_ == _g_.m.p.ptr() { releasep() } return mp }
m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;
func mcommoninit(mp *m) { _g_ := getg() // g0 stack won't make sense for user (and is not necessary unwindable). if _g_ != _g_.m.g0 { callers(1, mp.createstack[:]) } lock(&sched.lock) if sched.mnext+1 < sched.mnext { throw("runtime: thread ID overflow") } mp.id = sched.mnext sched.mnext++ // m数量检查 checkmcount() ... // signal g创建初始化 mpreinit(mp) if mp.gsignal != nil { mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard } //加入全局m链表 mp.alllink = allm //链表 atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) }
func newm1(mp *m) { // 对cgo的处理 ... execLock.rlock() // Prevent process clone. // 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针 // 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中 newosproc(mp, unsafe.Pointer(mp.g0.stack.hi)) execLock.runlock() }
每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:
//linux // 分配一个系统线程,且完成 g0 和 g0上的栈分配 // 传入 mstart 函数,让线程执行 mstart func newosproc(mp *m, stk unsafe.Pointer) { // Disable signals during clone, so that the new thread starts // with signals disabled. It will enable them in minit. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) ... } //windows func newosproc(mp *m, stk unsafe.Pointer) { const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000 // stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go. const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit) thandle := stdcall6(_CreateThread, 0, stackSize, funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)), _STACK_SIZE_PARAM_IS_A_RESERVATION, 0) ... // Close thandle to avoid leaking the thread object if it exits. stdcall1(_CloseHandle, thandle) }
创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。
下面将描述获取一个空闲的m
在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建
func mget() *m { //从idle 的m链表中搞一个 mp := sched.midle.ptr() if mp != nil { sched.midle = mp.schedlink sched.nmidle-- } return mp }
被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。
// 停止M,使其休眠,但不会被系统回收 // 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行 // 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。 func stopm() { _g_ := getg() retry: lock(&sched.lock) mput(_g_.m) unlock(&sched.lock) // 在lock_futex.go 中 // 休眠,等待被唤醒 notesleep(&_g_.m.park) noteclear(&_g_.m.park) ... // 这里是被wakenote唤醒后的操作了 // 绑定p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 把mp添加到midle列表 func mput(mp *m) { mp.schedlink = sched.midle sched.midle.set(mp) sched.nmidle++ checkdead() }
到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:
- 系统线程调度上下文切换是有消耗的
- m本身是有占资源的(自身的栈内存,寄存器等)
4. 执行goroutine
m 执行 g有两个起点:
- 从m的启动函数(创建m的时候绑定的)mstart()开始,触发m的调度
- 调度过程中调用stopm()睡眠后,通过 notewakeup(&mp.park)恢复m的执行,并从stopm()的位置开始执行,重新调度。
mstart() 流程:
func mstart() { _g_ := getg() osStack := _g_.stack.lo == 0 // 检查栈边界,为0的话是系统栈 if osStack { // 处理系统栈 size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } _g_.stackguard0 = _g_.stack.lo + _StackGuard _g_.stackguard1 = _g_.stackguard0 mstart1(0) //启动m } // dummy一直为0,给getcallersp当参数 func mstart1(dummy int32) { _g_ := getg() if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } // 记录mstart1 函数结束后的地址pc和mstart1 函数参数到当前g的运行现场 save(getcallerpc(), getcallersp(unsafe.Pointer(&dummy))) asminit() // 初始化m minit() // 如果当前g的m是初始m0,执行mstartm0() if _g_.m == &m0 { // 对于初始m,需要一些特殊处理 mstartm0() } // 如果有m的起始任务函数,则执行,比如 sysmon 函数 if fn := _g_.m.mstartfn; fn != nil { fn() } // GC startworld的时候,会检查闲置m是否少于并发标记需求(needaddgcproc) // 新建m,设置 m.helpgc=-1,加入限制队列等待唤醒 if _g_.m.helpgc != 0 { _g_.m.helpgc = 0 stopm() } else if _g_.m != &m0 { // 绑定p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 进入调度,而且不会在返回 schedule() }
绑定号p之后,m拥有了可分配cache和执行队列,进入核心调度循环,核心调度从schedule函数开始,调度完一次之后会引导重新执行schedule,实现循环调度。
schedule方法会主要功能:尽可能给m找到可以运行的g,这其中主要是分为以下几种:
- 当前m已经指定了g。该情况下会将m与p解绑,然后m睡眠,等待被绑定的g被调度然后唤醒该m执行该g
- gc触发STW的时候,m直接睡眠
- gcmark(标记)阶段,大概有1/4的g用来并行标记,这里也会检测是否调度gc标记的g(gcBlackenEnabled!=0)
- 调度61次后会从全局的g队列中尝试获取g
- 全局队列中未获取到便去绑定p的本地任务队列获取g
- 还未获取便调用findrunnable()去尽可能获取,取不到便会睡眠,不返回。
- 获取到的g有绑定的m,交出当前的p和g,与指定的m绑定,唤醒指定的m,自己睡眠,等待唤醒。
- 执行获取到的g
综上,在该方法中,m在以下情况会休眠:
- 当m.lockedg != 0(m有绑定固定执行的g),m会在stoplockedm()解绑p并休眠,等待被绑定的g被其他m调度的时候来唤醒该m,直接被绑定的g
- sched.gcwaiting != 0(gc STW)m会休眠
- findrunnable()中想尽一切办法都没有获取到可执行的g的时候,m会休眠
- 获取到g的时候,g绑定了其他的m(gp.lockedm != 0),当前m会解绑p,休眠,然后唤醒g绑定的m,执行该g
当m休眠被唤醒的时候,并不会从固定的位置开始执行,会直接从休眠的位置开始执行。
以下是schedule方法的g获取流程(省略了lockm和lockg的处理以及gc STW的处理):
另外一个展示当m绑定了g即该m只能执行特定的g(m.lockedg != 0, g.lockedm != 0)以及检测gc STW的调度:
具体代码:
func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } // 如果当前M锁定了某个G,那么应该交出P,进入休眠 // 等待某个M调度拿到lockedg,然后唤醒lockedg的M if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. } top: // 如果当前GC需要停止整个世界(STW), 则调用gcstopm休眠当前的M if sched.gcwaiting != 0 { gcstopm() goto top } var gp *g var inheritTime bool // 如果当前GC正在标记阶段, 则查找有没有待运行的GC Worker, GC Worker也是一个G if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // 每隔61次调度,尝试从全局队列种获取G // ? 为何是61次? https://github.com/golang/go/issues/20168 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { // 从p的本地队列中获取 gp, inheritTime = runqget(_g_.m.p.ptr()) } if gp == nil { // 想尽办法找到可运行的G,找不到就不用返回了 gp, inheritTime = findrunnable() // blocks until work is available } // M即将要执行G,如果M还是spinning,那么重置为false if _g_.m.spinning { // 重置为非自旋,并根据需要唤醒或新建一个M来运行 resetspinning() } // 如果找到的G已经锁定M了,dolockOSThread和cgo会将G和M绑定 // 则用startlockedm执行,将P和G都交给对方lockedm,唤醒绑定M-lockedm,自己回空闲队列。 if gp.lockedm != 0 { startlockedm(gp) goto top } execute(gp, inheritTime) }
excute()方法将会去执行g
// 执行goroutine的任务函数 // 如果inheritTime=true,那么当前的G继承剩余的时间片,其实就是不让schedtick累加, // 这样的话就不会触发每61次从全局队列找G func execute(gp *g, inheritTime bool) { _g_ := getg() // 更改gp的状态为_Grunning casgstatus(gp, _Grunnable, _Grunning) // 置等待时间为0 gp.waitsince = 0 // 置可抢占标志为fasle gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard // 如果不是inheritTime,schedtick累加 if !inheritTime { _g_.m.p.ptr().schedtick++ } // 当前的M的G改为gp _g_.m.curg = gp // gp的M改为当前的M gp.m = _g_.m ... // gogo由汇编实现, runtime/asm_amd64.s // 实现当前的G切换到gp,然后用JMP跳转到G的任务函数 // 当任务函数执行完后会调用 goexit gogo(&gp.sched) }
gogo由汇编实现,主要是由g0切换到g栈,然后执行函数。
// 从g0栈切换到G栈,然后JMP到任务函数代码 TEXT runtime·gogo(SB), NOSPLIT, $16-8 MOVQ buf+0(FP), BX // gobuf MOVQ gobuf_g(BX), DX //G MOVQ 0(DX), CX // make sure g != nil get_tls(CX) MOVQ DX, g(CX) MOVQ gobuf_sp(BX), SP // restore SP 恢复sp寄存器值切换到g栈 MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) // clear to help garbage collector MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) MOVQ gobuf_pc(BX), BX // 获取G任务函数的地址 JMP BX // 转到任务函数执行
当调用任务函数结束返回的时候,会执行到我们在创建g流程中就初始化好的指令:goexit
TEXT runtime·goexit(SB),NOSPLIT,$0-0 BYTE $0x90 // NOP CALL runtime·goexit1(SB) // does not return 调用goexit1函数 // traceback from goexit1 must hit code range of goexit BYTE $0x90 // NOP
下面是goexit1()函数:
// 当goroutine结束后,会调用这个函数 func goexit1() { // 切换到g0执行goexit0 mcall(goexit0) } func goexit0(gp *g) { _g_ := getg() // gp的状态置为_Gdead casgstatus(gp, _Grunning, _Gdead) // 状态重置 gp.m = nil // G和M是否锁定 locked := gp.lockedm != 0 // G和M解除锁定 gp.lockedm = 0 _g_.m.lockedg = 0 ... // 处理G和M的清除工作 dropg() if _g_.m.lockedInt != 0 { print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n") throw("internal lockOSThread error") } _g_.m.lockedExt = 0 // 将G放入P的G空闲链表 gfput(_g_.m.p.ptr(), gp) // 再次进入调度 schedule() }
注意:无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从“g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈将初始化,从头开始。
至此一个m的调度流程已经清晰:
- 由创建m绑定的mstart()函数或者notewakeup()唤醒的执行位置(一般都在schedule()方法中休眠,详细请看上方有介绍)
- 进入schedule()方法,开始获取可执行的g,获取不到就休眠,等待wakep()调度,获取到p后调用execute()启动执行
- execute()调用gogo执行g
- gogo切换到g栈,并执行fn
- 结束后调用goexit()->goexit1()->goexit0()->sechedule()重新调度。
无论是mcall systemstack还是gogo都不会更新g0.sched栈现场,需要切换到g0的时候,直接从
“g_sched+gobuf_sp”读取地址恢复sp。在调用goexit0/schedule时,g0栈从头开始。mstart1中保存了g0的gobuf信息。
findrunable
该方法会想尽一切办法找到可以执行的任务,核心调度函数
这里逻辑较为复杂,下面将以代码中的两个标签top和stop将流程分开:
top label:
stop label:
上面是top和stop标签内的流程图,结合在一起便是findrunnable的全部流程,其中gcmark的调度部分有省略,将会在gc中详细描述。
结合代码看看这个方法:
// 找到一个可以运行的G,不找到就让M休眠,然后等待唤醒,直到找到一个G返回 func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // 此处和handoffp中的条件必须一致:如果findrunnable将返回G运行,则handoffp必须启动M. top: _p_ := _g_.m.p.ptr() // 如果gc正等着运行,停止M,也就是STW if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } // fing是执行finalizer的goroutine if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // local runq // 再尝试从本地队列中获取G if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 尝试从全局队列中获取G if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 从网络IO轮询器中找到就绪的G,把这个G变为可运行的G if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if gp := netpoll(false); gp != nil { // non-blocking // netpoll returns list of goroutines linked by schedlink. // 如果找到的可运行的网络IO的G列表,则把相关的G插入全局队列 injectglist(gp.schedlink.ptr()) // 更改G的状态为_Grunnable,以便下次M能找到这些G来执行 casgstatus(gp, _Gwaiting, _Grunnable) // goroutine trace事件记录-unpark if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. procs := uint32(gomaxprocs) // 如果其他P都是空闲的,就不从其他P哪里偷取G了 if atomic.Load(&sched.npidle) == procs-1 { goto stop } // 如果当前的M没在自旋 且 正在自旋的M数量大于等于正在使用的P的数量,那么block // 当GOMAXPROCS远大于1,但程序并行度低时,防止过多的CPU消耗。 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } // 如果M为非自旋,那么设置为自旋状态 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 随机选一个P,尝试从这P中偷取一些G for i := 0; i < 4; i++ { // 尝试四次 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g // 从allp[enum.position()]偷去一半的G,并返回其中的一个 if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // 当前的M找不到G来运行。如果此时P处于 GC mark 阶段 // 那么此时可以安全的扫描和黑化对象,和返回 gcBgMarkWorker 来运行 if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { // 设置gcMarkWorkerMode 为 gcMarkWorkerIdleMode _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode // 获取gcBgMarkWorker goroutine gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } allpSnapshot := allp // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } // 再次从全局队列中获取G if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } // 将当前对M和P解绑 if releasep() != _p_ { throw("findrunnable: wrong p") } // 将p放入p空闲链表 pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning // M取消自旋状态 if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again // 再次检查所有的P,有没有可以运行的G for _, _p_ := range allpSnapshot { // 如果p的本地队列有G if !runqempty(_p_) { lock(&sched.lock) // 获取另外一个空闲P _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { // 如果P不是nil,将M绑定P acquirep(_p_) // 如果是自旋,设置M为自旋 if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 返回到函数开头,从本地p获取G goto top } break } } // gcmark的goroutine,这里会控制这类g的数量 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto stop } } // poll network // 再次检查netpoll if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { gp := netpoll(true) // block until new work is available if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } // 实在找不到G,那就休眠吧 // 且此时的M一定不是自旋状态 stopm() goto top }
5. 系统调用
系统调用有用户态到内核态的切换,并且部分系统调用甚至会阻塞,当goroutine在处理系统调用的时候,如果不采取措施的话,会导致一个goroutine占用p时间过长,导致p中其他的goroutine无法及时调度。针对系统调用调度器做了一些操作,保证系统调用阻塞的同时,其他的goroutine可以被合理调度。
golang封装的系统到到最后都会调用到 Syscall() 或者 RawSyscall()这两个方法,RawSyscall是一个直接进行系统调用,而Syscall方法是做了部分处理,来配合go的调度器工作,以下代码省略了部分无关内容,展示了golang系统调用的过程:
EXT ·Syscall(SB),NOSPLIT,$0-56 CALL runtime·entersyscall(SB) //调用entersyscall进行调用前准备 MOVQ trap+0(FP), AX // syscall entry SYSCALL // linux amd64平台下的系统调用指令 CMPQ AX, $0xfffffffffffff001 JLS ok CALL runtime·exitsyscall(SB) //结束系统调用 RET ok: CALL runtime·exitsyscall(SB) RET // func RawSyscall(trap, a1, a2, a3 uintptr) (r1, r2, err uintptr) TEXT ·RawSyscall(SB),NOSPLIT,$0-56 MOVQ trap+0(FP), AX // syscall entry SYSCALL //linux amd64平台下的系统调用指令 CMPQ AX, $0xfffffffffffff001 JLS ok1 RET ok1: RET
明显SysCall比RawSyscall多调用了两个方法,entersyscall和exitsyscall,增加这两个函数的调用,让调度器有机会去对即将要进入系统调用的goroutine进行调整,方便调度。
entersyscall():
// 系统调用的时候调用该函数 // 进入系统调用,G将会进入_Gsyscall状态,也就是会被暂时挂起,直到系统调用结束。 // 此时M进入系统调用,那么P也会放弃该M。但是,此时M还指向P,在M从系统调用返回后还能找到P func entersyscall(dummy int32) { reentersyscall(getcallerpc(), getcallersp(unsafe.Pointer(&dummy))) } func reentersyscall(pc, sp uintptr) { _g_ := getg() _g_.m.locks++ // 让G进入_Gsyscall状态,此时G已经被挂起了,直到系统调用结束,才会让G重新写进入running casgstatus(_g_, _Grunning, _Gsyscall) //唤醒 sysmon m,这个监控长时间执行的g if atomic.Load(&sched.sysmonwait) != 0 { systemstack(entersyscall_sysmon) save(pc, sp) } // 这里很关键:P的M已经陷入系统调用,于是P忍痛放弃该M // 但是请注意:此时M还指向P,在M从系统调用返回后还能找到P _g_.m.mcache = nil _g_.m.p.ptr().m = 0 // P的状态变为Psyscall atomic.Store(&_g_.m.p.ptr().status, _Psyscall) }
该方法主要是为系统调用前做了准备工作:
- 修改g的状态为_Gsyscall
- 检查sysmon线程是否在执行,睡眠需要唤醒
- p放弃m,但是m依旧持有p的指针,结束调用后优先选择p
- 修改p的状态为_Psyscal
做好这些准备工作便可以真正的执行系统调用了。当该线程m长时间阻塞在系统调用的时候,一直在运行的sysmon线程会检测到该p的状态,并将其剥离,驱动其他的m(新建或获取)来调度执行该p上的任务,这其中主要是在retake方法中实现的,该方法还处理了goroutine抢占调度,这里省略,后面介绍抢占调度在介绍:
//实现go调度系统的抢占 func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status //p在系统调用中 if s == _Psyscall { t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } //没有可以调度的任务且时间阻塞时间未到阀值,直接跳过 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // 这里出发了系统调用长时间阻塞的调度 unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { n++ _p_.syscalltick++ //关键方法,将对长时间阻塞的p进行重新调度 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } else if s == _Prunning { //暂时省略 } } unlock(&allpLock) return uint32(n) }
当系统调用时间过长的时候,会调用handoffp()方法:
// p的切换,系统调用或者绑定M时使用 func handoffp(_p_ *p) { //当前p有任务或者全局任务队列有任务,触发一次调度 //startm()上文有描述,会获取一个m来调度当前p的任务,当前p为nil时,会调度其他p任务队列 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } //gc标记阶段且当前p有标记任务,触发调度 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } //有自旋m或空闲p,触发调度 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } ... //全局队列不为空 if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } //实在没任务,放入空闲队列 pidleput(_p_) unlock(&sched.lock) }
可以看到,通过handoffp方法,阻塞在系统调用的p会被重新调度,不会阻塞其他任务的执行。
没有空闲m的时候,这里有可能会创建出新的m来进行调度。
回到Syscall的执行流程中,当系统Syscall返回的时,会调用exitsyscall方法恢复调度:
// goroutine g退出系统调用。安排它再次在cpu上运行。 // 这个函数只能从go syscall库中调用,而不是从运行时使用的低级系统调用中调用 func exitsyscall(dummy int32) { _g_ := getg() // 重新获取p if exitsyscallfast() { casgstatus(_g_, _Gsyscall, _Grunning) return } // 没有获取到p,只能解绑当前g,重新调度该m了 mcall(exitsyscall0) }
exitsyscall会尝试重新绑定p,优先选择之前m绑定的p(进入系统的调用的时候,p只是单方面解绑了和m的关系,通过m依旧可以找到p):
func exitsyscallfast() bool { _g_ := getg() //stw,直接解绑p,然后退出 if sched.stopwait == freezeStopWait { _g_.m.mcache = nil _g_.m.p = 0 return false } // 如果之前附属的P尚未被其他M,尝试绑定该P if _g_.m.p != 0 && _g_.m.p.ptr().status == _Psyscall && atomic.Cas(&_g_.m.p.ptr().status, _Psyscall, _Prunning) { exitsyscallfast_reacquired() return true } // 否则从空闲P列表中取出一个来 oldp := _g_.m.p.ptr() _g_.m.mcache = nil _g_.m.p = 0 if sched.pidle != 0 { var ok bool systemstack(func() { ok = exitsyscallfast_pidle() }) if ok { return true } } return false }
当获取p失败的时候,只能选择重新调度:
func exitsyscall0(gp *g) { _g_ := getg() //修改g状态为 _Grunable casgstatus(gp, _Gsyscall, _Grunnable) dropg() //解绑 lock(&sched.lock) //尝试获取p _p_ := pidleget() if _p_ == nil { //未获取到p,g进入全局队列等待调度 globrunqput(gp) } else if atomic.Load(&sched.sysmonwait) != 0 { atomic.Store(&sched.sysmonwait, 0) notewakeup(&sched.sysmonnote) } unlock(&sched.lock) //获取到p,绑定,然后执行 if _p_ != nil { acquirep(_p_) execute(gp, false) // Never returns. } // m有绑定的g,解绑p然后绑定的g来唤醒,执行 if _g_.m.lockedg != 0 { stoplockedm() execute(gp, false) // Never returns. } //关联p失败了,休眠,等待唤醒,在进行调度。 stopm() schedule() // Never returns. }
上述便是golang系统调用的整个流程,大致如下:
- 业务调用封装好的系统调用函数,编译器翻译到Syscall
- 执行entersyscall()方法,修改g,p的状态,p单方面解绑m,并检查唤醒sysmon线程,检测系统调用。
- 当sysmon线程检测到系统调用阻塞时间过长的时候,调用retake,重新调度该p,让p上可执行的得以执行,不浪费资源
- 系统调用返回,进入exitsyscall方法,优先获取之前的p,如果该p已经被占有,重新获取空闲的p,绑定,然后继续执行该g。当获取不到p的时候,调用exitsyscall0,解绑g,休眠,等待下次唤醒调度。
6. 抢占调度
golang调度高效秘诀之一是它的抢占式调度。当任务函数执行的时间超过了一定的时间,
sysmon方法会不断的检测所有p上任务的执行情况,当有超过预定执行时间的g时,会发起抢占。这一切也是在retake函数中实现的,上文描述了该函数在系统调用中的功能,这里讲下该函数如何执行抢占。
// retake()函数会遍历所有的P,如果一个P处于执行状态, // 且已经连续执行了较长时间,就会被抢占。 // retake()调用preemptone()将P的stackguard0设为 // stackPreempt(关于stackguard的详细内容,可以参考 Split Stacks), // 这将导致该P中正在执行的G进行下一次函数调用时, // 导致栈空间检查失败。进而触发morestack()(汇编代码,位于asm_XXX.s中) // 然后进行一连串的函数调用,主要的调用过程如下: // morestack()(汇编代码)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule() // http://ga0.github.io/golang/2015/09/20/golang-runtime-scheduler.html func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] pd := &_p_.sysmontick s := _p_.status if s == _Psyscall { //系统调用部分可看系统调用的分析 ... } else if s == _Prunning { // 超时抢占 if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } unlock(&allpLock) return uint32(n) }
当检测到某个p上的任务执行超过一定时间后,调用preemptone对当前g进行抢占:
func preemptone(_p_ *p) bool { // 标记可抢占 gp.preempt = true // gorotuine 中的每个调用都会通过将当前堆栈指针与 gp->stackguard0 进行比较来检查堆栈溢出。 // 将 gp->stackguard0 设置为 stackPreempt 会将抢占折叠为正常的堆栈溢出检查。 gp.stackguard0 = stackPreempt return true }
可以看到只是设置了两个参数,并没有执行实际的抢占工作,事实上这个过程是异步的,将在其他的地方执行真正的抢占操作。
stackguard0本身是用来检测goroutine的栈是否需要扩充的,当设置为stackPreempt时,在执行函数的时候,便会触发栈扩充,调用morestack()方法,morestack会调用newstack,该方法会扩充g的栈空间,也兼职了goroutine的抢占功能。
preempt 为抢占的备用手段,在stackguard0设置stackPreempt且在newstack中未能被抢占时,该标记也会在其他地方设置stackguard0的值为stackPreempt,再次触发抢占。
func newstack() { thisg := getg() gp := thisg.m.curg // 注意:如果另一个线程即将尝试抢占gp,则stackguard0可能会在发生变化。 // 所以现在读一次,判断是否被抢占。 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { //以下情况不会被抢占 if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } } if preempt { casgstatus(gp, _Grunning, _Gwaiting) //gc扫描抢占 if gp.preemptscan { for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) { } if !gp.gcscandone { //扫描当前gp栈 gcw := &gp.m.p.ptr().gcw scanstack(gp, gcw) if gcBlackenPromptly { gcw.dispose() } gp.gcscandone = true } gp.preemptscan = false gp.preempt = false casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting) // This clears gcscanvalid. casgstatus(gp, _Gwaiting, _Grunning) gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // 恢复后继续执行 } //转换状态为 _Gwaiting casgstatus(gp, _Gwaiting, _Grunning) gopreempt_m(gp) // never return } ... }
以上关于gc的抢占可以先忽略,关注一下普通抢占:
func gopreempt_m(gp *g) { goschedImpl(gp) } // 将当前的G入全剧队列,然后调用调度器 func goschedImpl(gp *g) { status := readgstatus(gp) // 将gp的状态改为_Grunnable casgstatus(gp, _Grunning, _Grunnable) // 解除与当前M的关联 dropg() lock(&sched.lock) // 入全局队列 globrunqput(gp) unlock(&sched.lock) // 启动调度 xx() }
这里最终会取消m和g的绑定,并将g放入全局队列中,然后开始调度m执行新的任务
以上是golang抢占调度的基本内容,总结如下:
- 正常goroutine的抢占都时由监控线程的sysmon发起的,超时执行的goroutine会被打上可抢占的标志。(gc scan阶段也会发生抢占,主要是为了扫描正在运行的g的栈空间)
- 在任务的每个函数中,编译器会加上栈空间检测代码,有需要栈空间扩容或者抢占便会进入morestack,然后调用newstack方法
- newstack中会检测是否抢占和抢占类型。gc扫描触发的抢占回扫描当前g栈上的内容,然后继续执行当前g。而普通抢占则会解绑当前g,将g放入全局队列,然后继续调度。
sysmon
上文中的系统调用和抢占调度都离不开这个函数。现在简单介绍下,在以后分析完内存,gc后会做详细的介绍
sysmon独立的运行在一个特殊的m上,它定期执行一次,每次会做以下事情:
- 2分钟没有gc则触发一次gc
- 系统调用和抢占调度的实现
- 处理长时间未返回结果的
channel 可以让一个 goroutine 发送特定值到另一个 gouroutine 的通信机制。也就是说,gouroutine 之间能够通过 channel 进行通信。 1. chan 使用范例 ...