v / vlib / goroutines / scheduler.v
642 lines · 574 sloc · 15.0 KB · e21acca549c0df34c98beb389088dbf7cee4c4d0
Raw
1// Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4//
5// GMP Scheduler - translated from Go's runtime/proc.go.
6//
7// The scheduler distributes ready-to-run goroutines over worker threads.
8// Key functions translated from Go:
9// - schedule() -> scheduler_loop()
10// - findRunnable() -> find_runnable()
11// - execute() -> execute()
12// - newproc() -> goroutine_create()
13// - wakep() -> wake_p()
14// - runqput() -> runq_put()
15// - runqget() -> runq_get()
16// - stealWork() -> steal_work()
17module goroutines
18
19// import sync as _
20// import runtime as _
21
22// goroutine_create creates a new goroutine to run `f` with argument `arg`.
23// This is the equivalent of Go's `newproc` function.
24// Called by the compiler for `go expr()`.
25pub fn goroutine_create(f voidptr, arg voidptr, arg_size int) {
26 gp := newproc1(f, arg, arg_size)
27 // Put on global run queue so any M can pick it up.
28 // This is simpler than local queue + work stealing and avoids
29 // visibility issues when the creator is the main (non-scheduler) thread.
30 glob_runq_put(gp)
31 // Try to wake an idle P if needed
32 wake_p()
33}
34
35// newproc1 allocates and initializes a new G.
36// Translated from Go's newproc1 in proc.go.
37fn newproc1(f voidptr, arg voidptr, arg_size int) &Goroutine {
38 // Try to get a dead G from the local P's free list first
39 mut pp := get_current_p()
40 mut gp := if pp != unsafe { nil } {
41 gfget(mut pp)
42 } else {
43 unsafe { &Goroutine(nil) }
44 }
45 if gp == unsafe { nil } {
46 // Try global free list
47 gp = gfget_global()
48 }
49 if gp == unsafe { nil } {
50 // Allocate a new G
51 gp = &Goroutine{}
52 }
53
54 // Allocate or reuse stack
55 if gp.stack == unsafe { nil } {
56 stack_size := default_stack_size
57 gp.stack = unsafe { malloc(stack_size) }
58 gp.stack_size = stack_size
59 }
60
61 gp.fn_ptr = f
62 gp.fn_arg = arg
63 gp.status = .runnable
64 gp.preempt = false
65
66 // Assign goroutine ID
67 gp.id = assign_goid()
68
69 // Initialize context to run goroutine_entry
70 context_init(mut &gp.context, gp.stack, gp.stack_size, goroutine_trampoline, voidptr(gp))
71
72 // Track all goroutines
73 allgs_mu.acquire()
74 allgs << gp
75 allgs_mu.release()
76
77 return gp
78}
79
80// goroutine_trampoline is the entry point for all goroutines.
81// It calls the user function, then cleans up and returns to the scheduler.
82// Equivalent to Go's goexit -> goexit0.
83fn goroutine_trampoline(arg voidptr) {
84 mut gp := unsafe { &Goroutine(arg) }
85
86 // Call the actual function
87 if gp.fn_ptr != unsafe { nil } {
88 call_goroutine_fn(gp.fn_ptr, gp.fn_arg)
89 }
90
91 // Function returned - goroutine is done
92 goexit0(mut gp)
93}
94
95// call_goroutine_fn calls the goroutine's function pointer with the given argument.
96fn call_goroutine_fn(fn_ptr voidptr, arg voidptr) {
97 unsafe {
98 cb := GoFn(fn_ptr)
99 cb(arg)
100 }
101}
102
103// goexit0 handles goroutine cleanup after the user function returns.
104// Translated from Go's goexit0 in proc.go.
105fn goexit0(mut gp Goroutine) {
106 mut mp := get_current_m()
107
108 gp.status = .dead
109 gp.m = unsafe { nil }
110 gp.fn_ptr = unsafe { nil }
111 gp.fn_arg = unsafe { nil }
112 gp.wait_reason = ''
113 gp.preempt = false
114
115 // Dissociate G from M
116 if mp != unsafe { nil } {
117 mp.curg = unsafe { nil }
118 }
119
120 // Put the dead G on the free list for reuse
121 mut pp := get_current_p()
122 if pp != unsafe { nil } {
123 gfput(mut pp, gp)
124 } else {
125 gfput_global(gp)
126 }
127
128 // Switch back to the scheduler (g0 context).
129 // This returns to schedule_loop via execute's context_switch.
130 if mp != unsafe { nil } && mp.g0 != unsafe { nil } {
131 context_set(&mp.g0.context)
132 }
133}
134
135// schedule is the main scheduler entry point.
136// Finds a runnable goroutine and executes it. Never returns.
137// Translated from Go's schedule() in proc.go.
138pub fn schedule() {
139 mut mp := get_current_m()
140 if mp == unsafe { nil } {
141 return
142 }
143 mut pp := mp.p
144 if pp == unsafe { nil } {
145 return
146 }
147 pp.sched_tick++
148
149 // Find a runnable goroutine
150 gp, _ := find_runnable(mut mp, mut pp)
151 if gp == unsafe { nil } {
152 // No work found - park this M
153 park_m(mut mp)
154 return
155 }
156
157 // Execute the goroutine
158 execute(mut mp, mut pp, gp)
159}
160
161// find_runnable finds a runnable goroutine to execute.
162// Tries: local queue, global queue, work stealing from other P's.
163// Translated from Go's findRunnable() in proc.go.
164fn find_runnable(mut mp Machine, mut pp Processor) (&Goroutine, bool) {
165 // Check global queue every Nth tick for fairness (Go uses 61)
166 if pp.sched_tick % global_queue_check_interval == 0 {
167 gp := glob_runq_get()
168 if gp != unsafe { nil } {
169 return gp, false
170 }
171 }
172
173 // Try local run queue first
174 gp, inherit := runq_get(mut pp)
175 if gp != unsafe { nil } {
176 return gp, inherit
177 }
178
179 // Try global run queue
180 gp2 := glob_runq_get()
181 if gp2 != unsafe { nil } {
182 return gp2, false
183 }
184
185 // Try to steal work from other P's
186 gp3 := steal_work(mut pp)
187 if gp3 != unsafe { nil } {
188 return gp3, false
189 }
190
191 return unsafe { nil }, false
192}
193
194// execute runs a goroutine on the current M.
195// Translated from Go's execute() in proc.go.
196fn execute(mut mp Machine, mut pp Processor, gp &Goroutine) {
197 mut g := unsafe { gp }
198 // Associate G with M
199 mp.curg = g
200 g.m = mp
201 g.status = .running
202
203 // Switch context to the goroutine
204 context_switch(mut &mp.g0.context, &g.context)
205 // When we return here, the goroutine has yielded back to us
206 // The scheduler loop will be re-entered
207}
208
209// wake_p tries to wake an idle P to run goroutines.
210// Translated from Go's wakep() in proc.go.
211fn wake_p() {
212 // Check if there's an idle P
213 gsched.mu.acquire()
214 if gsched.npidle == 0 {
215 gsched.mu.release()
216 return
217 }
218 // Don't wake if there are already spinning M's
219 if gsched.nmspinning > 0 {
220 gsched.mu.release()
221 return
222 }
223 // Get an idle P
224 pp := pid_get()
225 if pp == unsafe { nil } {
226 gsched.mu.release()
227 return
228 }
229 gsched.mu.release()
230
231 // Start a new M for this P (or wake an idle one)
232 start_m(pp)
233}
234
235// park_m parks the current M - it goes to sleep waiting for work.
236fn park_m(mut mp Machine) {
237 // Release P and add M to idle list under lock
238 gsched.mu.acquire()
239 if mp.p != unsafe { nil } {
240 pid_put(mp.p)
241 mp.p = unsafe { nil }
242 }
243 mp.sched_link = gsched.midle
244 gsched.midle = mp
245 gsched.nmidle++
246 gsched.mu.release()
247
248 // Sleep until woken
249 mp.park.wait()
250
251 // Woken up - acquire a P and return to schedule_loop
252 acquire_p(mut mp)
253}
254
255// acquire_p tries to get an idle P for the given M.
256fn acquire_p(mut mp Machine) {
257 gsched.mu.acquire()
258 mut pp := pid_get()
259 gsched.mu.release()
260 if pp != unsafe { nil } {
261 wire_p(mut mp, mut pp)
262 }
263}
264
265// wire_p associates a P with an M.
266fn wire_p(mut mp Machine, mut pp Processor) {
267 mp.p = pp
268 pp.m = mp
269 pp.status = .running
270}
271
272// start_m starts or wakes an M to run the given P.
273// Translated from Go's startm() in proc.go.
274fn start_m(pp &Processor) {
275 gsched.mu.acquire()
276
277 // Try to get an idle M first
278 mut mp := gsched.midle
279 if mp != unsafe { nil } {
280 gsched.midle = mp.sched_link
281 gsched.nmidle--
282 gsched.mu.release()
283 // Give it the P and wake it
284 mut p := unsafe { pp }
285 wire_p(mut mp, mut p)
286 mp.park.post()
287 return
288 }
289
290 // No idle M available - create a new one
291 id := gsched.mnext
292 gsched.mnext++
293 gsched.mu.release()
294
295 new_m(id, pp)
296}
297
298// new_m creates a new OS thread (M) and associates it with a P.
299fn new_m(id i64, pp &Processor) {
300 mut mp := &Machine{
301 id: id
302 g0: &Goroutine{
303 status: .running
304 }
305 }
306 mut p := unsafe { pp }
307 wire_p(mut mp, mut p)
308 mp.thread = spawn m_thread_entry(mut mp)
309}
310
311// m_thread_entry is the entry point for new M (OS thread) goroutine scheduling loops.
312fn m_thread_entry(mut mp Machine) {
313 // Register this M as the current thread's Machine so that
314 // get_current_m()/get_current_p() work on worker threads.
315 set_current_m(mp)
316 // Enter the scheduling loop - this never returns
317 schedule_loop(mut mp)
318}
319
320// schedule_loop is the main loop for an M. It repeatedly finds and runs goroutines.
321fn schedule_loop(mut mp Machine) {
322 for {
323 if gsched.stopped {
324 return
325 }
326 mut pp := mp.p
327 if pp == unsafe { nil } {
328 acquire_p(mut mp)
329 pp = mp.p
330 if pp == unsafe { nil } {
331 // No P available, park
332 park_m(mut mp)
333 continue
334 }
335 }
336 pp.sched_tick++
337
338 gp, _ := find_runnable(mut mp, mut pp)
339 if gp == unsafe { nil } {
340 // No work - try spinning briefly before parking
341 if !mp.spinning {
342 mp.spinning = true
343 C.goroutines_atomic_fetch_add_i32(&gsched.nmspinning, 1)
344 }
345 // Spin a bit
346 mut found := false
347 for _ in 0 .. 20 {
348 gp2, _ := find_runnable(mut mp, mut pp)
349 if gp2 != unsafe { nil } {
350 mp.spinning = false
351 C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1)
352 execute(mut mp, mut pp, gp2)
353 found = true
354 break
355 }
356 // Yield to avoid burning CPU
357 proc_yield(10)
358 }
359 if !found {
360 if mp.spinning {
361 mp.spinning = false
362 C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1)
363 }
364 park_m(mut mp)
365 }
366 continue
367 }
368
369 if mp.spinning {
370 mp.spinning = false
371 C.goroutines_atomic_fetch_sub_i32(&gsched.nmspinning, 1)
372 }
373
374 execute(mut mp, mut pp, gp)
375 }
376}
377
378// steal_work tries to steal goroutines from other P's run queues.
379// Translated from Go's stealWork() in proc.go.
380fn steal_work(mut thisp Processor) &Goroutine {
381 n := gsched.allp.len
382 if n <= 1 {
383 return unsafe { nil }
384 }
385 // Randomize starting point to avoid contention
386 start := u32(C.rand()) % u32(n)
387 for i := u32(0); i < u32(n); i++ {
388 idx := (start + i) % u32(n)
389 pp := gsched.allp[idx]
390 if pp == thisp {
391 continue
392 }
393 // Try to steal half of the target's run queue
394 mut target := unsafe { pp }
395 gp := runq_steal(mut target, mut thisp)
396 if gp != unsafe { nil } {
397 return gp
398 }
399 }
400 return unsafe { nil }
401}
402
403// runq_steal steals half of pp's local run queue.
404// Translated from Go's runqgrab/runqsteal in proc.go.
405fn runq_steal(mut pp Processor, mut thisp Processor) &Goroutine {
406 t := C.goroutines_atomic_load_u32(&pp.runq_tail)
407 h := C.goroutines_atomic_load_u32(&pp.runq_head)
408 n := t - h
409 if n == 0 {
410 // Try runnext
411 next := pp.runnext
412 if next != unsafe { nil } {
413 if C.goroutines_atomic_cas_ptr(voidptr(&pp.runnext), voidptr(&next), unsafe { nil }) {
414 return next
415 }
416 }
417 return unsafe { nil }
418 }
419 // Steal half
420 steal := n - n / 2
421 mut first := unsafe { &Goroutine(nil) }
422 for i := u32(0); i < steal; i++ {
423 mut gp := pp.runq[(h + i) % local_queue_size]
424 if i == 0 {
425 first = gp
426 } else {
427 // Enqueue remaining stolen goroutines into thief's local run queue
428 runq_put(mut thisp, gp, false)
429 }
430 }
431 C.goroutines_atomic_fetch_add_u32(&pp.runq_head, steal)
432 return first
433}
434
435// Global run queue operations (translated from Go's globrunqput/get)
436fn glob_runq_put(gp &Goroutine) {
437 gsched.mu.acquire()
438 gsched.runq.push_back(gp)
439 gsched.mu.release()
440}
441
442fn glob_runq_get() &Goroutine {
443 gsched.mu.acquire()
444 gp := gsched.runq.pop()
445 gsched.mu.release()
446 return gp
447}
448
449// Local run queue operations (translated from Go's runqput/runqget)
450
451// runq_put puts a G on the local run queue.
452// If next is true, it goes into runnext for immediate scheduling.
453// Translated from Go's runqput() in proc.go.
454fn runq_put(mut pp Processor, gp &Goroutine, next bool) {
455 if next {
456 // Fast path: put as runnext
457 old := pp.runnext
458 pp.runnext = unsafe { gp }
459 if old == unsafe { nil } {
460 return
461 }
462 // Kick old runnext to the regular queue
463 runq_put(mut pp, old, false)
464 return
465 }
466
467 // Regular path: put on the ring buffer
468 h := C.goroutines_atomic_load_u32(&pp.runq_head)
469 t := pp.runq_tail
470 if t - h < local_queue_size {
471 pp.runq[t % local_queue_size] = unsafe { gp }
472 C.goroutines_atomic_store_u32(&pp.runq_tail, t + 1)
473 return
474 }
475 // Queue is full - put half on global queue
476 runq_put_slow(mut pp, gp, h, t)
477}
478
479// runq_put_slow moves half the local queue to the global queue.
480// Translated from Go's runqputslow() in proc.go.
481fn runq_put_slow(mut pp Processor, gp &Goroutine, h u32, t u32) {
482 n := (t - h) / 2
483 mut batch := GoroutineQueue{}
484 for i := u32(0); i < n; i++ {
485 g := pp.runq[(h + i) % local_queue_size]
486 batch.push_back(g)
487 }
488 C.goroutines_atomic_fetch_add_u32(&pp.runq_head, n)
489 batch.push_back(gp)
490
491 gsched.mu.acquire()
492 for !batch.empty() {
493 gsched.runq.push_back(batch.pop())
494 }
495 gsched.mu.release()
496}
497
498// runq_get gets a G from the local run queue.
499// Translated from Go's runqget() in proc.go.
500fn runq_get(mut pp Processor) (&Goroutine, bool) {
501 // Check runnext first (fast path)
502 next := pp.runnext
503 if next != unsafe { nil } {
504 pp.runnext = unsafe { nil }
505 return next, true
506 }
507
508 // Regular queue
509 for {
510 h := C.goroutines_atomic_load_u32(&pp.runq_head)
511 t := pp.runq_tail
512 if t == h {
513 return unsafe { nil }, false
514 }
515 gp := pp.runq[h % local_queue_size]
516 if C.goroutines_atomic_cas_u32(&pp.runq_head, &h, h + 1) {
517 return gp, false
518 }
519 }
520 return unsafe { nil }, false
521}
522
523// Idle P list operations
524fn pid_get() &Processor {
525 pp := gsched.pidle
526 if pp != unsafe { nil } {
527 gsched.pidle = pp.link
528 gsched.npidle--
529 mut p := unsafe { pp }
530 p.status = .running
531 p.link = unsafe { nil }
532 }
533 return pp
534}
535
536fn pid_put(pp &Processor) {
537 mut p := unsafe { pp }
538 p.status = .idle
539 p.m = unsafe { nil }
540 p.link = gsched.pidle
541 gsched.pidle = p
542 gsched.npidle++
543}
544
545// G free list operations (translated from Go's gfput/gfget)
546fn gfput(mut pp Processor, gp &Goroutine) {
547 mut g := unsafe { gp }
548 g.status = .dead
549 g.fn_ptr = unsafe { nil }
550 g.fn_arg = unsafe { nil }
551 g.sched_link = unsafe { nil }
552 pp.g_free.push(g)
553}
554
555fn gfget(mut pp Processor) &Goroutine {
556 return pp.g_free.pop()
557}
558
559fn gfput_global(gp &Goroutine) {
560 gsched.g_free_mu.acquire()
561 gsched.g_free.push(unsafe { gp })
562 gsched.g_free_count++
563 gsched.g_free_mu.release()
564}
565
566fn gfget_global() &Goroutine {
567 gsched.g_free_mu.acquire()
568 gp := gsched.g_free.pop()
569 if gp != unsafe { nil } {
570 gsched.g_free_count--
571 }
572 gsched.g_free_mu.release()
573 return gp
574}
575
576// assign_goid allocates a unique goroutine ID.
577// Uses per-P caching to avoid contention (like Go's goidcache).
578fn assign_goid() u64 {
579 mut pp := get_current_p()
580 if pp != unsafe { nil } && pp.goid_cache < pp.goid_cache_end {
581 id := pp.goid_cache
582 unsafe {
583 pp.goid_cache++
584 }
585 return id
586 }
587 // Refill cache from global counter
588 batch := u64(16)
589 id := C.goroutines_atomic_fetch_add_u64(&gsched.goid_gen, batch)
590 if pp != unsafe { nil } {
591 unsafe {
592 pp.goid_cache = id + 1
593 pp.goid_cache_end = id + batch
594 }
595 }
596 return id
597}
598
599// proc_yield spins for a short time (used during work stealing).
600fn proc_yield(count int) {
601 for _ in 0 .. count {
602 // CPU pause instruction to reduce power and contention
603 $if amd64 {
604 asm volatile amd64 {
605 pause
606 }
607 }
608 $if arm64 {
609 asm volatile arm64 {
610 yield
611 }
612 }
613 }
614}
615
616// get_current_m returns the M for the current OS thread.
617// Uses thread-local storage via C _Thread_local (see tls.c).
618fn get_current_m() &Machine {
619 return unsafe { &Machine(C.goroutines_get_current_m()) }
620}
621
622fn set_current_m(mp &Machine) {
623 C.goroutines_set_current_m(voidptr(mp))
624}
625
626// get_current_p returns the P for the current OS thread's M.
627fn get_current_p() &Processor {
628 mp := get_current_m()
629 if mp == unsafe { nil } {
630 return unsafe { nil }
631 }
632 return mp.p
633}
634
635// get_current_g returns the currently running G.
636pub fn get_current_g() &Goroutine {
637 mp := get_current_m()
638 if mp == unsafe { nil } {
639 return unsafe { nil }
640 }
641 return mp.curg
642}
643