v / vlib / v2 / builder / gen_cleanc_parallel.v
196 lines · 176 sloc · 6.5 KB · e7738c112c787d477501fa4a87edd0e1d72159bd
Raw
1// Copyright (c) 2026 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module builder
5
6import runtime
7import time
8import v2.gen.cleanc
9
10const max_cleanc_pass5_jobs = 16
11
12$if !windows {
13 struct GenCleancChunkArgs {
14 worker voidptr // &cleanc.Gen — pre-cloned worker
15 work_items_ptr voidptr // &[]cleanc.Pass5WorkItem — work items to process
16 }
17
18 @[typedef]
19 struct C.pthread_t {}
20
21 fn C.pthread_create(thread &C.pthread_t, attr voidptr, start_routine fn (voidptr) voidptr, arg voidptr) int
22 fn C.pthread_join(thread C.pthread_t, retval voidptr) int
23 fn C.pthread_attr_init(attr voidptr) int
24 fn C.pthread_attr_setstacksize(attr voidptr, stacksize usize) int
25 fn C.pthread_attr_destroy(attr voidptr) int
26
27 fn gen_cleanc_chunk_thread(arg voidptr) voidptr {
28 a := unsafe { &GenCleancChunkArgs(arg) }
29 mut w := unsafe { &cleanc.Gen(a.worker) }
30 items := unsafe { &[]cleanc.Pass5WorkItem(a.work_items_ptr) }
31 w.gen_pass5_work_items(*items)
32 return unsafe { nil }
33 }
34}
35
36fn print_cleanc_parallel_step_time(stats_enabled bool, step string, elapsed time.Duration) {
37 if !stats_enabled {
38 return
39 }
40 println(' - C Gen/full ${step}: ${elapsed.milliseconds()}ms')
41}
42
43fn mark_cleanc_parallel_step(stats_enabled bool, mut sw time.StopWatch, stage_start time.Duration, step string) time.Duration {
44 if !stats_enabled {
45 return stage_start
46 }
47 now := sw.elapsed()
48 print_cleanc_parallel_step_time(true, step, time.Duration(now - stage_start))
49 return now
50}
51
52fn (mut b Builder) gen_cleanc_parallel(mut gen cleanc.Gen) {
53 stats_enabled := b.pref != unsafe { nil } && b.pref.stats
54 mut stats_sw := time.new_stopwatch()
55 mut stage_start := stats_sw.elapsed()
56 emit_indices := gen.gen_pass5_pre()
57 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start, 'pass 5 pre')
58
59 // Split large files into sub-file work items so no single huge file
60 // (e.g. ssa/builder.v) pins the whole parallel phase.
61 work_items := gen.build_pass5_work_items(emit_indices)
62 n_items := work_items.len
63 n_runtime_jobs := runtime.nr_jobs()
64 n_jobs := cleanc_parallel_pass5_job_count(n_runtime_jobs, n_items)
65
66 $if windows {
67 gen.gen_pass5_files(emit_indices)
68 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
69 'pass 5 files')
70 gen.gen_pass5_post()
71 _ = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start, 'pass 5 post')
72 return
73 } $else {
74 if n_items <= 1 || n_jobs <= 1 {
75 // Fallback to sequential
76 gen.gen_pass5_files(emit_indices)
77 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
78 'pass 5 files')
79 gen.gen_pass5_post()
80 _ = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start, 'pass 5 post')
81 return
82 }
83
84 mut thread_ids := []C.pthread_t{len: n_jobs}
85 mut args := []GenCleancChunkArgs{cap: n_jobs}
86 mut workers := []voidptr{cap: n_jobs}
87 // chunk_items: work items assigned to each worker.
88 // chunk_indices: unique file indices each worker touches (for
89 // new_pass5_worker's owned-file / cross-worker dedup bookkeeping).
90 mut chunk_items := [][]cleanc.Pass5WorkItem{cap: n_jobs}
91 mut chunk_indices := [][]int{cap: n_jobs}
92 mut chunk_file_seen := []map[int]bool{cap: n_jobs}
93 mut chunk_costs := []int{cap: n_jobs}
94
95 mut chunk_idx := n_jobs
96 if chunk_idx > n_items {
97 chunk_idx = n_items
98 }
99 for ci := 0; ci < chunk_idx; ci++ {
100 chunk_items << []cleanc.Pass5WorkItem{}
101 chunk_indices << []int{}
102 chunk_file_seen << map[int]bool{}
103 chunk_costs << 0
104 }
105 mut sorted_items := work_items.clone()
106 for i := 1; i < sorted_items.len; i++ {
107 mut j := i
108 for j > 0 && sorted_items[j - 1].cost < sorted_items[j].cost {
109 sorted_items[j - 1], sorted_items[j] = sorted_items[j], sorted_items[j - 1]
110 j--
111 }
112 }
113 for item in sorted_items {
114 mut target := 0
115 for ci := 1; ci < chunk_idx; ci++ {
116 if chunk_costs[ci] < chunk_costs[target] {
117 target = ci
118 }
119 }
120 chunk_items[target] << item
121 // Only the worker that emits a file's globals (a whole-file item, or the
122 // first slice of a split file — both carry emit_globals) takes file-level
123 // dedup ownership. A split file's later slices deliberately do NOT, so the
124 // file's lazily/transitively emitted fns stay blocked in those workers and
125 // only the owning worker can emit them; the explicit slice still emits via
126 // the owner-scoped bypass in gen_file_range. This closes the duplicate /
127 // reordered-emission hole that file-level ownership left open for files
128 // split across workers.
129 if item.emit_globals && item.file_idx !in chunk_file_seen[target] {
130 chunk_file_seen[target][item.file_idx] = true
131 chunk_indices[target] << item.file_idx
132 }
133 chunk_costs[target] += item.cost
134 }
135 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
136 'pass 5 chunk split')
137 for ci := 0; ci < chunk_idx; ci++ {
138 w := gen.new_pass5_worker(chunk_indices[ci], ci)
139 workers << voidptr(w)
140 }
141 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
142 'pass 5 worker setup')
143
144 // Set up args after all chunk_items are stable
145 for ci := 0; ci < chunk_idx; ci++ {
146 args << GenCleancChunkArgs{
147 worker: workers[ci]
148 work_items_ptr: unsafe { voidptr(&chunk_items[ci]) }
149 }
150 }
151
152 attr_buf := [64]u8{}
153 attr := unsafe { voidptr(&attr_buf[0]) }
154 C.pthread_attr_init(attr)
155 C.pthread_attr_setstacksize(attr, 64 * 1024 * 1024)
156
157 for ci := 0; ci < chunk_idx; ci++ {
158 C.pthread_create(unsafe { &thread_ids[ci] }, attr, gen_cleanc_chunk_thread,
159 unsafe { voidptr(&args[ci]) })
160 }
161 C.pthread_attr_destroy(attr)
162
163 // Wait for all workers
164 for ci := 0; ci < chunk_idx; ci++ {
165 C.pthread_join(thread_ids[ci], unsafe { nil })
166 }
167 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
168 'pass 5 worker run')
169
170 // Merge worker results in order
171 for ci := 0; ci < chunk_idx; ci++ {
172 w := unsafe { &cleanc.Gen(workers[ci]) }
173 gen.merge_pass5_worker(w)
174 }
175 stage_start = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start,
176 'pass 5 merge')
177 gen.print_pass5_file_times(8)
178
179 gen.gen_pass5_post()
180 _ = mark_cleanc_parallel_step(stats_enabled, mut stats_sw, stage_start, 'pass 5 post')
181 }
182}
183
184fn cleanc_parallel_pass5_job_count(n_runtime_jobs int, n_files int) int {
185 if n_runtime_jobs <= 0 || n_files <= 0 {
186 return 0
187 }
188 mut n_jobs := n_runtime_jobs
189 if n_jobs > max_cleanc_pass5_jobs {
190 n_jobs = max_cleanc_pass5_jobs
191 }
192 if n_jobs > n_files {
193 n_jobs = n_files
194 }
195 return n_jobs
196}
197