v2 / examples / 1brc / solution / main.v
202 lines · 185 sloc · 4.38 KB · e2e5cf8db56f3562c7baa735061690be936bdf3e
Raw
1// vtest build: !windows // requires mmap
2import flag
3import math
4import os
5
6#include <sys/mman.h>
7
8fn C.mmap(addr voidptr, len u64, prot i32, flags i32, fd i32, offset i64) voidptr
9fn C.munmap(addr voidptr, len u64) i32
10
11struct MemoryMappedFile {
12 size u64
13mut:
14 data &u8
15 file os.File
16}
17
18fn mmap_file(path string) MemoryMappedFile {
19 mut mf := MemoryMappedFile{
20 file: os.open_file(path, 'r', 0) or { panic('fail') }
21 size: os.file_size(path)
22 data: C.NULL
23 }
24
25 mf.data = &u8(C.mmap(C.NULL, mf.size, C.PROT_READ, C.MAP_SHARED, mf.file.fd, 0))
26 return mf
27}
28
29fn (mut mf MemoryMappedFile) unmap() {
30 if C.munmap(mf.data, mf.size) != 0 {
31 panic('(${C.errno}) munmap() failed')
32 }
33 mf.file.close()
34}
35
36enum ReadState {
37 city
38 temp
39}
40
41struct Result {
42pub mut:
43 min i32
44 max i32
45 sum i32
46 count u32
47}
48
49fn format_value(value i32) string {
50 return '${value / 10}.${math.abs(value % 10)}'
51}
52
53fn print_results(results map[string]Result, print_nicely bool) {
54 mut output := []string{cap: results.len}
55 mut cities := results.keys()
56 cities.sort()
57 for city in cities {
58 v := results[city]
59 mean := f64(v.sum) / v.count / 10
60 output << '${city}=${format_value(v.min)}/${mean:.1f}/${format_value(v.max)}'
61 }
62 if print_nicely {
63 println(output.join('\n'))
64 } else {
65 println('{' + output.join(', ') + '}')
66 }
67}
68
69fn combine_results(results []map[string]Result) map[string]Result {
70 mut combined_result := map[string]Result{}
71 for result in results {
72 for city, r in result {
73 if city !in combined_result {
74 combined_result[city] = r
75 } else {
76 if r.max > combined_result[city].max {
77 combined_result[city].max = r.max
78 }
79 if r.min < combined_result[city].min {
80 combined_result[city].min = r.min
81 }
82 combined_result[city].sum += r.sum
83 combined_result[city].count += r.count
84 }
85 }
86 }
87 return combined_result
88}
89
90@[direct_array_access]
91fn process_chunk(addr &u8, from u64, to u64) map[string]Result {
92 mut results := map[string]Result{}
93 mut state := ReadState.city
94 mut city := ''
95 mut temp := i32(0)
96 mut mod := i32(1)
97 mut j := int(0)
98 for i in from .. to {
99 c := unsafe { u8(addr[i]) }
100 match state {
101 .city {
102 match c {
103 `;` {
104 state = .temp
105 city = unsafe { tos(addr[i - u64(j)], j) }
106 }
107 else {
108 j += 1
109 }
110 }
111 }
112 .temp {
113 match c {
114 `\n` {
115 temp *= mod
116 if city !in results {
117 results[city] = Result{
118 min: temp
119 max: temp
120 sum: temp
121 count: 1
122 }
123 } else {
124 if temp > results[city].max {
125 results[city].max = temp
126 }
127 if temp < results[city].min {
128 results[city].min = temp
129 }
130 results[city].sum += temp
131 results[city].count += 1
132 }
133 state = .city
134 temp = 0
135 mod = 1
136 j = 0
137 }
138 `-` {
139 mod = -1
140 }
141 `.` {}
142 else {
143 // ASCII 48 = '0' ... ASCII 57 = '9' => (ASCII value) - 48 = decimal value
144 temp = temp * 10 + (c - 48)
145 }
146 }
147 }
148 }
149 }
150 return results
151}
152
153fn process_in_parallel(mf MemoryMappedFile, thread_count u32) map[string]Result {
154 mut threads := []thread map[string]Result{}
155 approx_chunk_size := mf.size / thread_count
156 mut from := u64(0)
157 mut to := approx_chunk_size
158 for _ in 0 .. thread_count - 1 {
159 unsafe {
160 for mf.data[to] != `\n` {
161 to += 1
162 }
163 }
164 threads << spawn process_chunk(mf.data, from, to)
165 from = to + 1
166 to = from + approx_chunk_size
167 }
168 to = mf.size
169 threads << spawn process_chunk(mf.data, from, to)
170 res := threads.wait()
171 return combine_results(res)
172}
173
174fn main() {
175 mut fp := flag.new_flag_parser(os.args)
176 fp.version('1brc v1.0.0')
177 fp.skip_executable()
178 fp.application('1 billion rows challenge')
179 fp.description('The 1 billion rows challenge solved in V.\nFor details, see https://www.morling.dev/blog/one-billion-row-challenge/')
180 thread_count := u32(fp.int('threads', `n`, 1, 'number of threads for parallel processing.'))
181 print_nicely := fp.bool('human-readable', `h`, false,
182 'Print results with new lines rather than following challenge spec')
183 quiet := fp.bool('quiet', `q`, false,
184 'Suppress the results output (e.g., if you only care about timing)')
185 fp.limit_free_args_to_exactly(1)!
186 path := fp.remaining_parameters()[0]
187
188 mut mf := mmap_file(path)
189 defer {
190 mf.unmap()
191 }
192
193 results := if thread_count > 1 {
194 process_in_parallel(mf, thread_count)
195 } else {
196 process_chunk(mf.data, 0, mf.size)
197 }
198
199 if !quiet {
200 print_results(results, print_nicely)
201 }
202}
203