| 1 | module sync |
| 2 | |
| 3 | import time |
| 4 | import rand |
| 5 | |
| 6 | // how often to try to get data without blocking before to wait for semaphore |
| 7 | const spinloops = u32(750) |
| 8 | const spinloops_sem = u32(4000) |
| 9 | |
| 10 | enum BufferElemStat { |
| 11 | unused = 0 |
| 12 | writing |
| 13 | written |
| 14 | reading |
| 15 | } |
| 16 | |
| 17 | struct Subscription { |
| 18 | mut: |
| 19 | sem &Semaphore = unsafe { nil } |
| 20 | prev &&Subscription = unsafe { nil } |
| 21 | nxt &Subscription = unsafe { nil } |
| 22 | } |
| 23 | |
| 24 | // append_subscription keeps select waiters in FIFO order, so the oldest waiter |
| 25 | // is woken first when a channel becomes ready. |
| 26 | fn append_subscription(head &&Subscription, sub &Subscription) { |
| 27 | unsafe { |
| 28 | mut link := head |
| 29 | for *link != 0 { |
| 30 | link = &(*link).nxt |
| 31 | } |
| 32 | sub.prev = link |
| 33 | sub.nxt = nil |
| 34 | *link = sub |
| 35 | } |
| 36 | } |
| 37 | |
| 38 | pub enum Direction { |
| 39 | pop |
| 40 | push |
| 41 | } |
| 42 | |
| 43 | @[typedef] |
| 44 | pub struct C.atomic_uintptr_t {} |
| 45 | |
| 46 | pub struct Channel { |
| 47 | ringbuf &u8 = unsafe { nil } // queue for buffered channels |
| 48 | statusbuf &u8 = unsafe { nil } // flags to synchronize write/read in ringbuf |
| 49 | objsize u32 |
| 50 | mut: |
| 51 | // atomic |
| 52 | writesem Semaphore // to wake thread that wanted to write, but buffer was full |
| 53 | readsem Semaphore // to wake thread that wanted to read, but buffer was empty |
| 54 | writesem_im Semaphore |
| 55 | readsem_im Semaphore |
| 56 | write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait |
| 57 | read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait |
| 58 | adr_read C.atomic_uintptr_t // used to identify origin of writesem |
| 59 | adr_written C.atomic_uintptr_t // used to identify origin of readsem |
| 60 | write_free u32 // for queue state |
| 61 | read_avail u32 |
| 62 | buf_elem_write_idx u32 |
| 63 | buf_elem_read_idx u32 |
| 64 | // for select |
| 65 | write_subscriber &Subscription = unsafe { nil } |
| 66 | read_subscriber &Subscription = unsafe { nil } |
| 67 | write_sub_mtx &SpinLock |
| 68 | read_sub_mtx &SpinLock |
| 69 | closed u16 |
| 70 | close_err IError = none |
| 71 | pub: |
| 72 | cap u32 // queue length in #objects |
| 73 | } |
| 74 | |
| 75 | pub fn new_channel[T](n u32) &Channel { |
| 76 | st := if sizeof(T) > 0 { sizeof(T) } else { 1 } |
| 77 | if isreftype(T) { |
| 78 | return new_channel_st(n, st) |
| 79 | } else { |
| 80 | return new_channel_st_noscan(n, st) |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | fn new_channel_st(n u32, st u32) &Channel { |
| 85 | wsem := if n > 0 { n } else { 1 } |
| 86 | rsem := if n > 0 { u32(0) } else { 1 } |
| 87 | rbuf := if n > 0 { unsafe { malloc(int(n * st)) } } else { &u8(unsafe { nil }) } |
| 88 | sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(unsafe { nil }) } |
| 89 | mut ch := Channel{ |
| 90 | objsize: st |
| 91 | cap: n |
| 92 | write_free: n |
| 93 | read_avail: 0 |
| 94 | ringbuf: rbuf |
| 95 | statusbuf: sbuf |
| 96 | write_subscriber: unsafe { nil } |
| 97 | read_subscriber: unsafe { nil } |
| 98 | write_sub_mtx: new_spin_lock() |
| 99 | read_sub_mtx: new_spin_lock() |
| 100 | } |
| 101 | ch.writesem.init(wsem) |
| 102 | ch.readsem.init(rsem) |
| 103 | ch.writesem_im.init(0) |
| 104 | ch.readsem_im.init(0) |
| 105 | return &ch |
| 106 | } |
| 107 | |
| 108 | fn new_channel_st_noscan(n u32, st u32) &Channel { |
| 109 | $if gcboehm_opt ? { |
| 110 | wsem := if n > 0 { n } else { 1 } |
| 111 | rsem := if n > 0 { u32(0) } else { 1 } |
| 112 | rbuf := if n > 0 { unsafe { malloc_noscan(int(n * st)) } } else { &u8(unsafe { nil }) } |
| 113 | sbuf := if n > 0 { vcalloc_noscan(int(n * 2)) } else { &u8(unsafe { nil }) } |
| 114 | mut ch := Channel{ |
| 115 | objsize: st |
| 116 | cap: n |
| 117 | write_free: n |
| 118 | read_avail: 0 |
| 119 | ringbuf: rbuf |
| 120 | statusbuf: sbuf |
| 121 | write_subscriber: unsafe { nil } |
| 122 | read_subscriber: unsafe { nil } |
| 123 | write_sub_mtx: new_spin_lock() |
| 124 | read_sub_mtx: new_spin_lock() |
| 125 | } |
| 126 | ch.writesem.init(wsem) |
| 127 | ch.readsem.init(rsem) |
| 128 | ch.writesem_im.init(0) |
| 129 | ch.readsem_im.init(0) |
| 130 | return &ch |
| 131 | } $else { |
| 132 | return new_channel_st(n, st) |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | // close closes the channel and optionally stores an error that will be |
| 137 | // returned by receive operations that use `or {}` or `?` after the |
| 138 | // buffered values have been drained. |
| 139 | pub fn (mut ch Channel) close(errs ...IError) { |
| 140 | open_val := u16(0) |
| 141 | if !C.atomic_compare_exchange_strong_u16(&ch.closed, &open_val, 1) { |
| 142 | return |
| 143 | } |
| 144 | if errs.len > 0 { |
| 145 | ch.close_err = errs[0] |
| 146 | } |
| 147 | mut nulladr := unsafe { nil } |
| 148 | for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_written), voidptr(&nulladr), isize(-1)) { |
| 149 | nulladr = unsafe { nil } |
| 150 | } |
| 151 | ch.readsem_im.post() |
| 152 | ch.readsem.post() |
| 153 | ch.read_sub_mtx.lock() |
| 154 | if ch.read_subscriber != unsafe { nil } { |
| 155 | ch.read_subscriber.sem.post() |
| 156 | } |
| 157 | ch.read_sub_mtx.unlock() |
| 158 | ch.write_sub_mtx.lock() |
| 159 | if ch.write_subscriber != unsafe { nil } { |
| 160 | ch.write_subscriber.sem.post() |
| 161 | } |
| 162 | ch.write_sub_mtx.unlock() |
| 163 | ch.writesem.post() |
| 164 | if ch.cap == 0 { |
| 165 | C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, unsafe { nil }) |
| 166 | } |
| 167 | ch.writesem_im.post() |
| 168 | |
| 169 | // Do not destroy `read_sub_mtx` and `write_sub_mtx` here, |
| 170 | // because we can read from a closed channel later. |
| 171 | } |
| 172 | |
| 173 | @[inline] |
| 174 | fn (ch &Channel) closed_error() IError { |
| 175 | if ch.close_err !is None__ { |
| 176 | return ch.close_err |
| 177 | } |
| 178 | return error('channel closed') |
| 179 | } |
| 180 | |
| 181 | @[inline] |
| 182 | pub fn (mut ch Channel) len() int { |
| 183 | return int(C.atomic_load_u32(&ch.read_avail)) |
| 184 | } |
| 185 | |
| 186 | @[inline] |
| 187 | pub fn (mut ch Channel) closed() bool { |
| 188 | return C.atomic_load_u16(&ch.closed) != 0 |
| 189 | } |
| 190 | |
| 191 | @[inline] |
| 192 | pub fn (mut ch Channel) push(src voidptr) { |
| 193 | if ch.try_push_priv(src, false) == .closed { |
| 194 | panic('push on closed channel') |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | @[inline] |
| 199 | pub fn (mut ch Channel) try_push(src voidptr) ChanState { |
| 200 | return ch.try_push_priv(src, true) |
| 201 | } |
| 202 | |
| 203 | fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { |
| 204 | if C.atomic_load_u16(&ch.closed) != 0 { |
| 205 | return .closed |
| 206 | } |
| 207 | spinloops_sem_, spinloops_ := if no_block { u32(1), u32(1) } else { spinloops, spinloops_sem } |
| 208 | mut have_swapped := false |
| 209 | for { |
| 210 | mut got_sem := false |
| 211 | mut wradr := C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) |
| 212 | for wradr != C.NULL { |
| 213 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.write_adr), voidptr(&wradr), |
| 214 | isize(0)) |
| 215 | { |
| 216 | // there is a reader waiting for us |
| 217 | unsafe { C.memcpy(wradr, src, ch.objsize) } |
| 218 | mut nulladr := unsafe { nil } |
| 219 | for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_written), |
| 220 | voidptr(&nulladr), isize(wradr)) { |
| 221 | nulladr = unsafe { nil } |
| 222 | } |
| 223 | ch.readsem_im.post() |
| 224 | return .success |
| 225 | } |
| 226 | } |
| 227 | if no_block && ch.cap == 0 { |
| 228 | return .not_ready |
| 229 | } |
| 230 | // get token to read |
| 231 | for _ in 0 .. spinloops_sem_ { |
| 232 | if got_sem { |
| 233 | break |
| 234 | } |
| 235 | got_sem = ch.writesem.try_wait() |
| 236 | } |
| 237 | if !got_sem { |
| 238 | if no_block { |
| 239 | return .not_ready |
| 240 | } |
| 241 | ch.writesem.wait() |
| 242 | } |
| 243 | if C.atomic_load_u16(&ch.closed) != 0 { |
| 244 | ch.writesem.post() |
| 245 | return .closed |
| 246 | } |
| 247 | if ch.cap == 0 { |
| 248 | // try to advertise current object as readable |
| 249 | mut read_in_progress := false |
| 250 | C.atomic_store_ptr(unsafe { &voidptr(&ch.read_adr) }, src) |
| 251 | wradr = C.atomic_load_ptr(unsafe { &voidptr(&ch.write_adr) }) |
| 252 | if wradr != C.NULL { |
| 253 | mut src2 := src |
| 254 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.read_adr), voidptr(&src2), |
| 255 | isize(0)) |
| 256 | { |
| 257 | ch.writesem.post() |
| 258 | continue |
| 259 | } else { |
| 260 | read_in_progress = true |
| 261 | } |
| 262 | } |
| 263 | if !read_in_progress { |
| 264 | ch.read_sub_mtx.lock() |
| 265 | if ch.read_subscriber != unsafe { nil } { |
| 266 | ch.read_subscriber.sem.post() |
| 267 | } |
| 268 | ch.read_sub_mtx.unlock() |
| 269 | } |
| 270 | mut src2 := src |
| 271 | for sp := u32(0); sp < spinloops_ || read_in_progress; sp++ { |
| 272 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2), |
| 273 | isize(0)) |
| 274 | { |
| 275 | have_swapped = true |
| 276 | read_in_progress = true |
| 277 | break |
| 278 | } |
| 279 | src2 = src |
| 280 | } |
| 281 | mut got_im_sem := false |
| 282 | for sp := u32(0); sp < spinloops_sem_ || read_in_progress; sp++ { |
| 283 | got_im_sem = ch.writesem_im.try_wait() |
| 284 | if got_im_sem { |
| 285 | break |
| 286 | } |
| 287 | } |
| 288 | for { |
| 289 | if got_im_sem { |
| 290 | got_im_sem = false |
| 291 | } else { |
| 292 | ch.writesem_im.wait() |
| 293 | } |
| 294 | if C.atomic_load_u16(&ch.closed) != 0 { |
| 295 | if have_swapped |
| 296 | || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2), isize(0)) { |
| 297 | ch.writesem.post() |
| 298 | return .success |
| 299 | } else { |
| 300 | return .closed |
| 301 | } |
| 302 | } |
| 303 | if have_swapped |
| 304 | || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_read), voidptr(&src2), isize(0)) { |
| 305 | ch.writesem.post() |
| 306 | break |
| 307 | } else { |
| 308 | // this semaphore was not for us - repost in |
| 309 | ch.writesem_im.post() |
| 310 | if src2 == voidptr(-1) { |
| 311 | ch.readsem.post() |
| 312 | return .closed |
| 313 | } |
| 314 | src2 = src |
| 315 | } |
| 316 | } |
| 317 | return .success |
| 318 | } else { |
| 319 | // buffered channel |
| 320 | mut space_in_queue := false |
| 321 | mut wr_free := C.atomic_load_u32(&ch.write_free) |
| 322 | for wr_free > 0 { |
| 323 | space_in_queue = C.atomic_compare_exchange_weak_u32(&ch.write_free, &wr_free, |
| 324 | wr_free - 1) |
| 325 | if space_in_queue { |
| 326 | break |
| 327 | } |
| 328 | } |
| 329 | if space_in_queue { |
| 330 | mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) |
| 331 | for { |
| 332 | mut new_wr_idx := wr_idx + 1 |
| 333 | for new_wr_idx >= ch.cap { |
| 334 | new_wr_idx -= ch.cap |
| 335 | } |
| 336 | if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, |
| 337 | new_wr_idx) |
| 338 | { |
| 339 | break |
| 340 | } |
| 341 | } |
| 342 | mut wr_ptr := ch.ringbuf |
| 343 | mut status_adr := ch.statusbuf |
| 344 | unsafe { |
| 345 | wr_ptr += (wr_idx * ch.objsize) |
| 346 | status_adr += wr_idx * sizeof(u16) |
| 347 | } |
| 348 | mut expected_status := u16(BufferElemStat.unused) |
| 349 | for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, |
| 350 | u16(BufferElemStat.writing)) { |
| 351 | expected_status = u16(BufferElemStat.unused) |
| 352 | } |
| 353 | unsafe { |
| 354 | C.memcpy(wr_ptr, src, ch.objsize) |
| 355 | } |
| 356 | C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.written)) |
| 357 | C.atomic_fetch_add_u32(voidptr(&ch.read_avail), 1) |
| 358 | ch.readsem.post() |
| 359 | ch.read_sub_mtx.lock() |
| 360 | if ch.read_subscriber != unsafe { nil } { |
| 361 | ch.read_subscriber.sem.post() |
| 362 | } |
| 363 | ch.read_sub_mtx.unlock() |
| 364 | return .success |
| 365 | } else { |
| 366 | if no_block { |
| 367 | return .not_ready |
| 368 | } |
| 369 | ch.writesem.post() |
| 370 | } |
| 371 | } |
| 372 | } |
| 373 | // we should not get here but the V compiler want's to see a return statement |
| 374 | panic('unknown `try_push_priv` state') |
| 375 | } |
| 376 | |
| 377 | @[inline] |
| 378 | pub fn (mut ch Channel) pop(dest voidptr) bool { |
| 379 | return ch.try_pop_priv(dest, false) == .success |
| 380 | } |
| 381 | |
| 382 | // try_pop returns `.success` if an object is popped without blocking. |
| 383 | // Pass the destination as `mut`: `ch.try_pop(mut value)`, not `ch.try_pop(&value)`. |
| 384 | @[inline] |
| 385 | pub fn (mut ch Channel) try_pop(dest voidptr) ChanState { |
| 386 | return ch.try_pop_priv(dest, true) |
| 387 | } |
| 388 | |
| 389 | // try_pop_select_priv treats already closed channels as unavailable for non-blocking `select ... else`. |
| 390 | @[inline] |
| 391 | fn (mut ch Channel) try_pop_select_priv(dest voidptr) ChanState { |
| 392 | if C.atomic_load_u16(&ch.closed) != 0 { |
| 393 | return .closed |
| 394 | } |
| 395 | return ch.try_pop_priv(dest, true) |
| 396 | } |
| 397 | |
| 398 | fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { |
| 399 | spinloops_sem_, spinloops_ := if no_block { u32(1), u32(1) } else { spinloops, spinloops_sem } |
| 400 | mut have_swapped := false |
| 401 | mut write_in_progress := false |
| 402 | for { |
| 403 | mut got_sem := false |
| 404 | if ch.cap == 0 { |
| 405 | // unbuffered channel - first see if a `push()` has adversized |
| 406 | mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) |
| 407 | for rdadr != C.NULL { |
| 408 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.read_adr), voidptr(&rdadr), |
| 409 | isize(0)) |
| 410 | { |
| 411 | // there is a writer waiting for us |
| 412 | unsafe { C.memcpy(dest, rdadr, ch.objsize) } |
| 413 | mut nulladr := unsafe { nil } |
| 414 | for !C.atomic_compare_exchange_weak_ptr(voidptr(&ch.adr_read), |
| 415 | voidptr(&nulladr), isize(rdadr)) { |
| 416 | nulladr = unsafe { nil } |
| 417 | } |
| 418 | ch.writesem_im.post() |
| 419 | return .success |
| 420 | } |
| 421 | } |
| 422 | if no_block { |
| 423 | if C.atomic_load_u16(&ch.closed) == 0 { |
| 424 | return .not_ready |
| 425 | } else { |
| 426 | return .closed |
| 427 | } |
| 428 | } |
| 429 | } |
| 430 | // get token to read |
| 431 | for _ in 0 .. spinloops_sem_ { |
| 432 | if got_sem { |
| 433 | break |
| 434 | } |
| 435 | got_sem = ch.readsem.try_wait() |
| 436 | } |
| 437 | if !got_sem { |
| 438 | if no_block { |
| 439 | if C.atomic_load_u16(&ch.closed) == 0 { |
| 440 | return .not_ready |
| 441 | } else { |
| 442 | return .closed |
| 443 | } |
| 444 | } |
| 445 | ch.readsem.wait() |
| 446 | } |
| 447 | if ch.cap > 0 { |
| 448 | // try to get buffer token |
| 449 | mut obj_in_queue := false |
| 450 | mut rd_avail := C.atomic_load_u32(&ch.read_avail) |
| 451 | for rd_avail > 0 { |
| 452 | obj_in_queue = C.atomic_compare_exchange_weak_u32(&ch.read_avail, &rd_avail, |
| 453 | rd_avail - 1) |
| 454 | if obj_in_queue { |
| 455 | break |
| 456 | } |
| 457 | } |
| 458 | if obj_in_queue { |
| 459 | mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) |
| 460 | for { |
| 461 | mut new_rd_idx := rd_idx + 1 |
| 462 | for new_rd_idx >= ch.cap { |
| 463 | new_rd_idx -= ch.cap |
| 464 | } |
| 465 | if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, |
| 466 | new_rd_idx) |
| 467 | { |
| 468 | break |
| 469 | } |
| 470 | } |
| 471 | mut rd_ptr := ch.ringbuf |
| 472 | mut status_adr := ch.statusbuf |
| 473 | unsafe { |
| 474 | rd_ptr += rd_idx * ch.objsize |
| 475 | status_adr += rd_idx * sizeof(u16) |
| 476 | } |
| 477 | mut expected_status := u16(BufferElemStat.written) |
| 478 | for !C.atomic_compare_exchange_weak_u16(status_adr, &expected_status, |
| 479 | u16(BufferElemStat.reading)) { |
| 480 | expected_status = u16(BufferElemStat.written) |
| 481 | } |
| 482 | unsafe { |
| 483 | C.memcpy(dest, rd_ptr, ch.objsize) |
| 484 | } |
| 485 | C.atomic_store_u16(unsafe { &u16(status_adr) }, u16(BufferElemStat.unused)) |
| 486 | C.atomic_fetch_add_u32(voidptr(&ch.write_free), 1) |
| 487 | ch.writesem.post() |
| 488 | ch.write_sub_mtx.lock() |
| 489 | if ch.write_subscriber != unsafe { nil } { |
| 490 | ch.write_subscriber.sem.post() |
| 491 | } |
| 492 | ch.write_sub_mtx.unlock() |
| 493 | return .success |
| 494 | } |
| 495 | } |
| 496 | // try to advertise `dest` as writable |
| 497 | C.atomic_store_ptr(unsafe { &voidptr(&ch.write_adr) }, dest) |
| 498 | if ch.cap == 0 { |
| 499 | mut rdadr := C.atomic_load_ptr(unsafe { &voidptr(&ch.read_adr) }) |
| 500 | if rdadr != C.NULL { |
| 501 | mut dest2 := dest |
| 502 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.write_adr), voidptr(&dest2), |
| 503 | isize(0)) |
| 504 | { |
| 505 | ch.readsem.post() |
| 506 | continue |
| 507 | } else { |
| 508 | write_in_progress = true |
| 509 | } |
| 510 | } |
| 511 | } |
| 512 | if ch.cap == 0 && !write_in_progress { |
| 513 | ch.write_sub_mtx.lock() |
| 514 | if ch.write_subscriber != unsafe { nil } { |
| 515 | ch.write_subscriber.sem.post() |
| 516 | } |
| 517 | ch.write_sub_mtx.unlock() |
| 518 | } |
| 519 | mut dest2 := dest |
| 520 | for sp := u32(0); sp < spinloops_ || write_in_progress; sp++ { |
| 521 | if C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_written), voidptr(&dest2), |
| 522 | isize(0)) |
| 523 | { |
| 524 | have_swapped = true |
| 525 | break |
| 526 | } else if dest2 == voidptr(-1) { |
| 527 | ch.readsem.post() |
| 528 | return .closed |
| 529 | } |
| 530 | dest2 = dest |
| 531 | } |
| 532 | mut got_im_sem := false |
| 533 | for sp := u32(0); sp < spinloops_sem_ || write_in_progress; sp++ { |
| 534 | got_im_sem = ch.readsem_im.try_wait() |
| 535 | if got_im_sem { |
| 536 | break |
| 537 | } |
| 538 | } |
| 539 | for { |
| 540 | if got_im_sem { |
| 541 | got_im_sem = false |
| 542 | } else { |
| 543 | ch.readsem_im.wait() |
| 544 | } |
| 545 | if have_swapped |
| 546 | || C.atomic_compare_exchange_strong_ptr(voidptr(&ch.adr_written), voidptr(&dest2), isize(0)) { |
| 547 | ch.readsem.post() |
| 548 | break |
| 549 | } else { |
| 550 | // this semaphore was not for us - repost in |
| 551 | ch.readsem_im.post() |
| 552 | if dest2 == voidptr(-1) { |
| 553 | ch.readsem.post() |
| 554 | return .closed |
| 555 | } |
| 556 | dest2 = dest |
| 557 | } |
| 558 | } |
| 559 | break |
| 560 | } |
| 561 | return .success |
| 562 | } |
| 563 | |
| 564 | // channel_select waits `timeout` on any of `channels[i]` until one of them can |
| 565 | // push (`dir[i] == .push`) or pop (`dir[i] == .pop`) the object referenced by |
| 566 | // `objrefs[i]`. `timeout = time.infinite` means wait unlimited time. |
| 567 | // `timeout <= 0` means return immediately if no transaction can be performed |
| 568 | // without waiting. It returns the selected channel index, `-1` on timeout, and |
| 569 | // `-2` when all channels are closed. |
| 570 | pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { |
| 571 | skip_closed_pop := timeout < 0 |
| 572 | actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout } |
| 573 | closed_pop_mode := if skip_closed_pop { |
| 574 | SelectClosedPopMode.skip |
| 575 | } else { |
| 576 | SelectClosedPopMode.closed |
| 577 | } |
| 578 | return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode) |
| 579 | } |
| 580 | |
| 581 | enum SelectClosedPopMode { |
| 582 | closed |
| 583 | ready |
| 584 | skip |
| 585 | } |
| 586 | |
| 587 | // channel_select_lang is used by the language `select` implementation. |
| 588 | // Closed receive cases stay selectable for blocking/timed selects to match |
| 589 | // plain `<-ch` semantics, while `select ... else` still skips them. |
| 590 | fn channel_select_lang(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration) int { |
| 591 | skip_closed_pop := timeout < 0 |
| 592 | actual_timeout := if skip_closed_pop { time.Duration(0) } else { timeout } |
| 593 | closed_pop_mode := if skip_closed_pop { |
| 594 | SelectClosedPopMode.skip |
| 595 | } else { |
| 596 | SelectClosedPopMode.ready |
| 597 | } |
| 598 | return channel_select_priv(mut channels, dir, mut objrefs, actual_timeout, closed_pop_mode) |
| 599 | } |
| 600 | |
| 601 | fn channel_select_priv(mut channels []&Channel, dir []Direction, mut objrefs []voidptr, timeout time.Duration, closed_pop_mode SelectClosedPopMode) int { |
| 602 | $if debug_channels ? { |
| 603 | assert channels.len == dir.len |
| 604 | assert dir.len == objrefs.len |
| 605 | } |
| 606 | mut subscr := []Subscription{len: channels.len} |
| 607 | mut sem := unsafe { Semaphore{} } |
| 608 | sem.init(0) |
| 609 | for i, ch in channels { |
| 610 | subscr[i].sem = unsafe { &sem } |
| 611 | sub_mtx, subscriber := if dir[i] == .push { |
| 612 | ch.write_sub_mtx, &ch.write_subscriber |
| 613 | } else { |
| 614 | ch.read_sub_mtx, &ch.read_subscriber |
| 615 | } |
| 616 | sub_mtx.lock() |
| 617 | unsafe { |
| 618 | append_subscription(subscriber, &subscr[i]) |
| 619 | } |
| 620 | sub_mtx.unlock() |
| 621 | } |
| 622 | stopwatch := if timeout == time.infinite || timeout <= 0 { |
| 623 | time.StopWatch{} |
| 624 | } else { |
| 625 | time.new_stopwatch() |
| 626 | } |
| 627 | mut event_idx := -1 // negative index means `timed out` |
| 628 | |
| 629 | outer: for { |
| 630 | rnd := rand.intn(channels.len) or { 0 } |
| 631 | mut num_closed := 0 |
| 632 | mut ready_closed_idx := -1 |
| 633 | for j, _ in channels { |
| 634 | mut i := j + rnd |
| 635 | if i >= channels.len { |
| 636 | i -= channels.len |
| 637 | } |
| 638 | stat := if dir[i] == .push { |
| 639 | channels[i].try_push_priv(objrefs[i], true) |
| 640 | } else if closed_pop_mode == .skip { |
| 641 | channels[i].try_pop_select_priv(objrefs[i]) |
| 642 | } else { |
| 643 | channels[i].try_pop_priv(objrefs[i], true) |
| 644 | } |
| 645 | if stat == .success { |
| 646 | event_idx = i |
| 647 | break outer |
| 648 | } else if stat == .closed && dir[i] == .pop && closed_pop_mode == .ready { |
| 649 | unsafe { |
| 650 | C.memset(objrefs[i], 0, channels[i].objsize) |
| 651 | } |
| 652 | if ready_closed_idx == -1 { |
| 653 | ready_closed_idx = i |
| 654 | } |
| 655 | num_closed++ |
| 656 | } else if stat == .closed { |
| 657 | num_closed++ |
| 658 | } |
| 659 | } |
| 660 | if ready_closed_idx >= 0 { |
| 661 | event_idx = ready_closed_idx |
| 662 | break outer |
| 663 | } |
| 664 | if num_closed == channels.len { |
| 665 | event_idx = -2 |
| 666 | break outer |
| 667 | } |
| 668 | if timeout <= 0 { |
| 669 | break outer |
| 670 | } |
| 671 | if timeout != time.infinite { |
| 672 | remaining := timeout - stopwatch.elapsed() |
| 673 | if !sem.timed_wait(remaining) { |
| 674 | break outer |
| 675 | } |
| 676 | } else { |
| 677 | sem.wait() |
| 678 | } |
| 679 | } |
| 680 | // reset subscribers |
| 681 | for i, ch in channels { |
| 682 | sub_mtx := if dir[i] == .push { |
| 683 | ch.write_sub_mtx |
| 684 | } else { |
| 685 | ch.read_sub_mtx |
| 686 | } |
| 687 | sub_mtx.lock() |
| 688 | unsafe { |
| 689 | *subscr[i].prev = subscr[i].nxt |
| 690 | } |
| 691 | if unsafe { subscr[i].nxt != 0 } { |
| 692 | subscr[i].nxt.prev = subscr[i].prev |
| 693 | subscr[i].nxt.sem.post() |
| 694 | } |
| 695 | sub_mtx.unlock() |
| 696 | } |
| 697 | sem.destroy() |
| 698 | return event_idx |
| 699 | } |
| 700 | |