| 1 | // Copyright (c) 2019-2024 Alexander Medvednikov. All rights reserved. |
| 2 | // Use of this source code is governed by an MIT license |
| 3 | // that can be found in the LICENSE file. |
| 4 | module sync |
| 5 | |
| 6 | import time |
| 7 | |
| 8 | #flag -lpthread |
| 9 | |
| 10 | @[trusted] |
| 11 | fn C.pthread_mutex_init(voidptr, voidptr) i32 |
| 12 | fn C.pthread_mutex_lock(voidptr) i32 |
| 13 | fn C.pthread_mutex_trylock(voidptr) i32 |
| 14 | fn C.pthread_mutex_unlock(voidptr) i32 |
| 15 | fn C.pthread_mutex_destroy(voidptr) i32 |
| 16 | fn C.pthread_rwlockattr_init(voidptr) i32 |
| 17 | fn C.pthread_rwlockattr_setkind_np(voidptr, i32) i32 |
| 18 | fn C.pthread_rwlock_init(voidptr, voidptr) i32 |
| 19 | fn C.pthread_rwlock_rdlock(voidptr) i32 |
| 20 | fn C.pthread_rwlock_wrlock(voidptr) i32 |
| 21 | fn C.pthread_rwlock_tryrdlock(voidptr) i32 |
| 22 | fn C.pthread_rwlock_trywrlock(voidptr) i32 |
| 23 | fn C.pthread_rwlock_unlock(voidptr) i32 |
| 24 | fn C.pthread_rwlock_destroy(voidptr) i32 |
| 25 | fn C.pthread_condattr_init(voidptr) i32 |
| 26 | fn C.pthread_condattr_setpshared(voidptr, i32) i32 |
| 27 | fn C.pthread_condattr_destroy(voidptr) i32 |
| 28 | fn C.pthread_cond_init(voidptr, voidptr) i32 |
| 29 | fn C.pthread_cond_signal(voidptr) i32 |
| 30 | fn C.pthread_cond_wait(voidptr, voidptr) i32 |
| 31 | fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) i32 |
| 32 | fn C.pthread_cond_destroy(voidptr) i32 |
| 33 | |
| 34 | @[typedef] |
| 35 | pub struct C.pthread_mutex_t {} |
| 36 | |
| 37 | @[typedef] |
| 38 | pub struct C.pthread_cond_t {} |
| 39 | |
| 40 | @[typedef] |
| 41 | pub struct C.pthread_rwlock_t {} |
| 42 | |
| 43 | @[typedef] |
| 44 | pub struct C.pthread_rwlockattr_t {} |
| 45 | |
| 46 | @[typedef] |
| 47 | pub struct C.sem_t {} |
| 48 | |
| 49 | // [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. |
| 50 | @[heap] |
| 51 | pub struct Mutex { |
| 52 | mutex C.pthread_mutex_t |
| 53 | } |
| 54 | |
| 55 | @[heap] |
| 56 | pub struct RwMutex { |
| 57 | mutex C.pthread_rwlock_t |
| 58 | inited u32 |
| 59 | } |
| 60 | |
| 61 | struct RwMutexAttr { |
| 62 | attr C.pthread_rwlockattr_t |
| 63 | } |
| 64 | |
| 65 | @[typedef] |
| 66 | pub struct C.pthread_condattr_t {} |
| 67 | |
| 68 | struct CondAttr { |
| 69 | attr C.pthread_condattr_t |
| 70 | } |
| 71 | |
| 72 | /* |
| 73 | MacOSX has no unnamed semaphores and no `timed_wait()` at all |
| 74 | so we emulate the behaviour with other devices |
| 75 | */ |
| 76 | @[heap] |
| 77 | pub struct Semaphore { |
| 78 | mtx C.pthread_mutex_t |
| 79 | cond C.pthread_cond_t |
| 80 | mut: |
| 81 | count u32 |
| 82 | } |
| 83 | |
| 84 | // new_mutex creates and initialises a new mutex instance on the heap, then returns a pointer to it. |
| 85 | pub fn new_mutex() &Mutex { |
| 86 | mut m := &Mutex{} |
| 87 | m.init() |
| 88 | return m |
| 89 | } |
| 90 | |
| 91 | // init initialises the mutex. It should be called once before the mutex is used, |
| 92 | // since it creates the associated resources needed for the mutex to work properly. |
| 93 | pub fn (mut m Mutex) init() { |
| 94 | should_be_zero(C.pthread_mutex_init(&m.mutex, C.NULL)) |
| 95 | } |
| 96 | |
| 97 | // new_rwmutex creates a new read/write mutex instance on the heap, and returns a pointer to it. |
| 98 | pub fn new_rwmutex() &RwMutex { |
| 99 | mut m := &RwMutex{} |
| 100 | m.init() |
| 101 | return m |
| 102 | } |
| 103 | |
| 104 | // init initialises the RwMutex instance. It should be called once before the rw mutex is used, |
| 105 | // since it creates the associated resources needed for the mutex to work properly. |
| 106 | pub fn (mut m RwMutex) init() { |
| 107 | a := RwMutexAttr{} |
| 108 | should_be_zero(C.pthread_rwlockattr_init(&a.attr)) |
| 109 | // Give writer priority over readers |
| 110 | C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) |
| 111 | should_be_zero(C.pthread_rwlock_init(&m.mutex, &a.attr)) |
| 112 | C.atomic_store_u32(&m.inited, 1) |
| 113 | } |
| 114 | |
| 115 | fn (mut m RwMutex) lazy_init() { |
| 116 | if C.atomic_load_u32(&m.inited) == 0 { |
| 117 | mut expected := u32(0) |
| 118 | if C.atomic_compare_exchange_strong_u32(&m.inited, &expected, 1) { |
| 119 | a := RwMutexAttr{} |
| 120 | C.pthread_rwlockattr_init(&a.attr) |
| 121 | C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) |
| 122 | C.pthread_rwlock_init(&m.mutex, &a.attr) |
| 123 | } |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | // lock locks the mutex instance (`lock` is a keyword). |
| 128 | // If the mutex was already locked, it will block, till it is unlocked. |
| 129 | @[inline] |
| 130 | pub fn (mut m Mutex) lock() { |
| 131 | C.pthread_mutex_lock(&m.mutex) |
| 132 | } |
| 133 | |
| 134 | // try_lock try to lock the mutex instance and return immediately. |
| 135 | // If the mutex was already locked, it will return false. |
| 136 | @[inline] |
| 137 | pub fn (mut m Mutex) try_lock() bool { |
| 138 | return C.pthread_mutex_trylock(&m.mutex) == 0 |
| 139 | } |
| 140 | |
| 141 | // unlock unlocks the mutex instance. The mutex is released, and one of |
| 142 | // the other threads, that were blocked, because they called lock can continue. |
| 143 | @[inline] |
| 144 | pub fn (mut m Mutex) unlock() { |
| 145 | C.pthread_mutex_unlock(&m.mutex) |
| 146 | } |
| 147 | |
| 148 | // destroy frees the resources associated with the mutex instance. |
| 149 | // Note: the mutex itself is not freed. |
| 150 | pub fn (mut m Mutex) destroy() { |
| 151 | should_be_zero(C.pthread_mutex_destroy(&m.mutex)) |
| 152 | } |
| 153 | |
| 154 | // rlock locks the given RwMutex instance for reading. |
| 155 | // If the mutex was already locked, it will block, and will try to get the lock, |
| 156 | // once the lock is released by another thread calling unlock. |
| 157 | // Once it succeds, it returns. |
| 158 | // Note: there may be several threads that are waiting for the same lock. |
| 159 | // Note: RwMutex has separate read and write locks. |
| 160 | @[inline] |
| 161 | pub fn (mut m RwMutex) rlock() { |
| 162 | m.lazy_init() |
| 163 | should_be_zero(C.pthread_rwlock_rdlock(&m.mutex)) |
| 164 | } |
| 165 | |
| 166 | // lock locks the given RwMutex instance for writing. |
| 167 | // If the mutex was already locked, it will block, till it is unlocked, |
| 168 | // then it will try to get the lock, and if it can, it will return, otherwise |
| 169 | // it will continue waiting for the mutex to become unlocked. |
| 170 | // Note: there may be several threads that are waiting for the same lock. |
| 171 | // Note: RwMutex has separate read and write locks. |
| 172 | @[inline] |
| 173 | pub fn (mut m RwMutex) lock() { |
| 174 | m.lazy_init() |
| 175 | should_be_zero(C.pthread_rwlock_wrlock(&m.mutex)) |
| 176 | } |
| 177 | |
| 178 | // try_rlock try to lock the given RwMutex instance for reading and return immediately. |
| 179 | // If the mutex was already locked, it will return false. |
| 180 | @[inline] |
| 181 | pub fn (mut m RwMutex) try_rlock() bool { |
| 182 | return C.pthread_rwlock_tryrdlock(&m.mutex) == 0 |
| 183 | } |
| 184 | |
| 185 | // try_wlock try to lock the given RwMutex instance for writing and return immediately. |
| 186 | // If the mutex was already locked, it will return false. |
| 187 | @[inline] |
| 188 | pub fn (mut m RwMutex) try_wlock() bool { |
| 189 | return C.pthread_rwlock_trywrlock(&m.mutex) == 0 |
| 190 | } |
| 191 | |
| 192 | // destroy frees the resources associated with the rwmutex instance. |
| 193 | // Note: the mutex itself is not freed. |
| 194 | pub fn (mut m RwMutex) destroy() { |
| 195 | should_be_zero(C.pthread_rwlock_destroy(&m.mutex)) |
| 196 | } |
| 197 | |
| 198 | // runlock unlocks the RwMutex instance, locked for reading. |
| 199 | // Note: Windows SRWLocks have different function to unlocking. |
| 200 | // To have a common portable API, there are two methods for |
| 201 | // unlocking here as well, even though that they do the same |
| 202 | // on !windows platforms. |
| 203 | @[inline] |
| 204 | pub fn (mut m RwMutex) runlock() { |
| 205 | C.pthread_rwlock_unlock(&m.mutex) |
| 206 | } |
| 207 | |
| 208 | // unlock unlocks the RwMutex instance, locked for writing. |
| 209 | // Note: Windows SRWLocks have different function to unlocking. |
| 210 | // To have a common portable API, there are two methods for |
| 211 | // unlocking here as well, even though that they do the same |
| 212 | // on !windows platforms. |
| 213 | @[inline] |
| 214 | pub fn (mut m RwMutex) unlock() { |
| 215 | C.pthread_rwlock_unlock(&m.mutex) |
| 216 | } |
| 217 | |
| 218 | // new_semaphore creates a new initialised Semaphore instance on the heap, and returns a pointer to it. |
| 219 | // The initial counter value of the semaphore is 0. |
| 220 | pub fn new_semaphore() &Semaphore { |
| 221 | return new_semaphore_init(0) |
| 222 | } |
| 223 | |
| 224 | // new_semaphore_init creates a new initialised Semaphore instance on the heap, and returns a pointer to it. |
| 225 | // The `n` parameter can be used to set the initial counter value of the semaphore. |
| 226 | pub fn new_semaphore_init(n u32) &Semaphore { |
| 227 | mut sem := &Semaphore{} |
| 228 | sem.init(n) |
| 229 | return sem |
| 230 | } |
| 231 | |
| 232 | // init initialises the Semaphore instance with `n` as its initial counter value. |
| 233 | // It should be called once before the semaphore is used, since it creates the associated |
| 234 | // resources needed for the semaphore to work properly. |
| 235 | pub fn (mut sem Semaphore) init(n u32) { |
| 236 | C.atomic_store_u32(&sem.count, n) |
| 237 | should_be_zero(C.pthread_mutex_init(&sem.mtx, C.NULL)) |
| 238 | attr := CondAttr{} |
| 239 | should_be_zero(C.pthread_condattr_init(&attr.attr)) |
| 240 | C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE) |
| 241 | C.pthread_cond_init(&sem.cond, &attr.attr) |
| 242 | C.pthread_condattr_destroy(&attr.attr) |
| 243 | } |
| 244 | |
| 245 | // post increases the counter of the semaphore by 1. |
| 246 | // If the resulting counter value is > 0, and if there is a thread waiting |
| 247 | // on the semaphore, the waiting thread will decrement the counter by 1, and |
| 248 | // then will continue running. See also .wait() . |
| 249 | pub fn (mut sem Semaphore) post() { |
| 250 | mut c := C.atomic_load_u32(&sem.count) |
| 251 | for c > 1 { |
| 252 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { |
| 253 | return |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | C.pthread_mutex_lock(&sem.mtx) |
| 258 | c = C.atomic_fetch_add_u32(&sem.count, 1) |
| 259 | if c == 0 { |
| 260 | C.pthread_cond_signal(&sem.cond) |
| 261 | } |
| 262 | C.pthread_mutex_unlock(&sem.mtx) |
| 263 | } |
| 264 | |
| 265 | // wait will just decrement the semaphore count, if it was positive. |
| 266 | // It it was not positive, it will waits for the semaphore count to reach a positive number. |
| 267 | // When that happens, it will decrease the semaphore count, and will return. |
| 268 | // In effect, it allows you to block threads, until the semaphore, is posted by another thread. |
| 269 | // See also .post() . |
| 270 | pub fn (mut sem Semaphore) wait() { |
| 271 | mut c := C.atomic_load_u32(&sem.count) |
| 272 | for c > 0 { |
| 273 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
| 274 | return |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | C.pthread_mutex_lock(&sem.mtx) |
| 279 | c = C.atomic_load_u32(&sem.count) |
| 280 | outer: for { |
| 281 | if c == 0 { |
| 282 | C.pthread_cond_wait(&sem.cond, &sem.mtx) |
| 283 | c = C.atomic_load_u32(&sem.count) |
| 284 | } |
| 285 | for c > 0 { |
| 286 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
| 287 | if c > 1 { |
| 288 | C.pthread_cond_signal(&sem.cond) |
| 289 | } |
| 290 | break outer |
| 291 | } |
| 292 | } |
| 293 | } |
| 294 | C.pthread_mutex_unlock(&sem.mtx) |
| 295 | } |
| 296 | |
| 297 | // try_wait tries to decrease the semaphore count by 1, if it was positive. |
| 298 | // If it succeeds in that, it returns true, otherwise it returns false. |
| 299 | // Note: try_wait should return as fast as possible so error handling is only |
| 300 | // done when debugging |
| 301 | pub fn (mut sem Semaphore) try_wait() bool { |
| 302 | mut c := C.atomic_load_u32(&sem.count) |
| 303 | for c > 0 { |
| 304 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
| 305 | return true |
| 306 | } |
| 307 | } |
| 308 | return false |
| 309 | } |
| 310 | |
| 311 | // timed_wait is similar to .wait(), but it also accepts a timeout duration, |
| 312 | // thus it can return false early, if the timeout passed before the semaphore was posted. |
| 313 | pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { |
| 314 | mut c := C.atomic_load_u32(&sem.count) |
| 315 | for c > 0 { |
| 316 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
| 317 | return true |
| 318 | } |
| 319 | } |
| 320 | C.pthread_mutex_lock(&sem.mtx) |
| 321 | t_spec := timeout.timespec() |
| 322 | mut res := 0 |
| 323 | c = C.atomic_load_u32(&sem.count) |
| 324 | |
| 325 | outer: for { |
| 326 | if c == 0 { |
| 327 | res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec) |
| 328 | if res == C.ETIMEDOUT { |
| 329 | break outer |
| 330 | } |
| 331 | c = C.atomic_load_u32(&sem.count) |
| 332 | } |
| 333 | for c > 0 { |
| 334 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
| 335 | if c > 1 { |
| 336 | C.pthread_cond_signal(&sem.cond) |
| 337 | } |
| 338 | break outer |
| 339 | } |
| 340 | } |
| 341 | } |
| 342 | C.pthread_mutex_unlock(&sem.mtx) |
| 343 | return res == 0 |
| 344 | } |
| 345 | |
| 346 | // destroy frees the resources associated with the Semaphore instance. |
| 347 | // Note: the semaphore instance itself is not freed. |
| 348 | pub fn (mut sem Semaphore) destroy() { |
| 349 | should_be_zero(C.pthread_cond_destroy(&sem.cond)) |
| 350 | should_be_zero(C.pthread_mutex_destroy(&sem.mtx)) |
| 351 | } |
| 352 | |