| 1 | // Copyright (c) 2019-2026 Alexander Medvednikov. All rights reserved. |
| 2 | // Use of this source code is governed by an MIT license |
| 3 | // that can be found in the LICENSE file. |
| 4 | module s3 |
| 5 | |
| 6 | import os |
| 7 | import strings |
| 8 | import time |
| 9 | |
| 10 | // Multipart upload constants — defaults aligned with the S3 protocol limits. |
| 11 | pub const min_part_size = i64(5 * 1024 * 1024) // 5 MiB |
| 12 | pub const max_part_size = i64(5 * 1024 * 1024 * 1024) // 5 GiB |
| 13 | pub const max_parts = 10000 // hard S3 limit |
| 14 | |
| 15 | // upload_file streams a local file to S3, choosing single-part or multipart |
| 16 | // based on size. Use this for files larger than ~50 MiB or anything you |
| 17 | // don't want to slurp into memory. |
| 18 | // |
| 19 | // `key` is the destination object key. The local file is read in |
| 20 | // `client.part_size` chunks (default 5 MiB). |
| 21 | pub fn (c &Client) upload_file(key string, local_path string, opts PutOptions) ! { |
| 22 | stat_local := os.stat(local_path) or { |
| 23 | return new_error('LocalFileNotFound', |
| 24 | 'Cannot stat local file: ${local_path} (${err.msg()})') |
| 25 | } |
| 26 | size := i64(stat_local.size) |
| 27 | if size <= min_part_size { |
| 28 | data := os.read_bytes(local_path) or { |
| 29 | return new_error('LocalReadFailed', 'Cannot read ${local_path}: ${err.msg()}') |
| 30 | } |
| 31 | return c.put(key, data, opts) |
| 32 | } |
| 33 | c.upload_file_multipart(key, local_path, size, opts)! |
| 34 | } |
| 35 | |
| 36 | // MultipartUploader is a stateful handle to an in-flight multipart upload, |
| 37 | // produced by `Client.start_multipart`. Use it when you want to push parts |
| 38 | // generated on-the-fly (network sources, decompressors, anything you don't |
| 39 | // want to materialise on disk): |
| 40 | // |
| 41 | // mut up := c.start_multipart('key', s3.PutOptions{ content_type: 'application/octet-stream' })! |
| 42 | // for chunk in chunks { up.upload(chunk)! } |
| 43 | // up.complete()! |
| 44 | // |
| 45 | // On error, `complete()` / `upload()` return without aborting. The caller |
| 46 | // must invoke `abort()` themselves so that `defer { up.abort() or {} }` |
| 47 | // remains an explicit, visible cleanup hook. |
| 48 | pub struct MultipartUploader { |
| 49 | mut: |
| 50 | client &Client |
| 51 | key string |
| 52 | upload_id string |
| 53 | opts PutOptions |
| 54 | parts []PartRef |
| 55 | part_number int |
| 56 | completed bool |
| 57 | aborted bool |
| 58 | } |
| 59 | |
| 60 | // start_multipart begins a multipart upload and returns a MultipartUploader. |
| 61 | // Memory cost: zero — each `upload(chunk)` call streams the chunk to S3 and |
| 62 | // returns when it is acknowledged. |
| 63 | pub fn (c &Client) start_multipart(key string, opts PutOptions) !MultipartUploader { |
| 64 | upload_id := c.initiate_multipart(key, opts)! |
| 65 | return MultipartUploader{ |
| 66 | // `unsafe { c }` is required to store the receiver's pointer in the |
| 67 | // returned struct — V's borrow checker forbids this implicitly, even |
| 68 | // though `Client` is `@[heap]` so the lifetime is safe. |
| 69 | client: unsafe { c } |
| 70 | key: key |
| 71 | upload_id: upload_id |
| 72 | opts: opts |
| 73 | parts: []PartRef{} |
| 74 | part_number: 1 |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | // upload pushes one chunk as the next part. Each chunk MUST be at least |
| 79 | // `min_part_size` (5 MiB) except the last one — that's an S3 invariant |
| 80 | // the server enforces; we do not buffer for you. |
| 81 | pub fn (mut u MultipartUploader) upload(data []u8) ! { |
| 82 | if u.completed || u.aborted { |
| 83 | return new_error('InvalidState', 'multipart upload is already finalised') |
| 84 | } |
| 85 | if u.part_number > max_parts { |
| 86 | return new_error('TooManyParts', 'Exceeded ${max_parts} parts — increase part_size') |
| 87 | } |
| 88 | etag := u.client.upload_part(u.key, u.upload_id, u.part_number, data, u.opts)! |
| 89 | u.parts << PartRef{ |
| 90 | part_number: u.part_number |
| 91 | etag: etag |
| 92 | } |
| 93 | u.part_number++ |
| 94 | } |
| 95 | |
| 96 | // complete finalises the upload. After this call the object is visible. |
| 97 | pub fn (mut u MultipartUploader) complete() ! { |
| 98 | if u.completed { |
| 99 | return |
| 100 | } |
| 101 | u.client.complete_multipart(u.key, u.upload_id, u.parts, u.opts)! |
| 102 | u.completed = true |
| 103 | } |
| 104 | |
| 105 | // abort cancels the in-flight upload. Idempotent. |
| 106 | pub fn (mut u MultipartUploader) abort() ! { |
| 107 | if u.aborted || u.completed { |
| 108 | return |
| 109 | } |
| 110 | u.client.abort_multipart(u.key, u.upload_id, u.opts)! |
| 111 | u.aborted = true |
| 112 | } |
| 113 | |
| 114 | // upload_bytes_multipart uploads an in-memory byte buffer using multipart. |
| 115 | // Useful for large generated payloads (test fixtures up to 5 GiB). Uses up |
| 116 | // to `Client.queue_size` parallel part uploads. |
| 117 | pub fn (c &Client) upload_bytes_multipart(key string, data []u8, opts PutOptions) ! { |
| 118 | upload_id := c.initiate_multipart(key, opts)! |
| 119 | parts := c.run_parallel_parts(key, upload_id, opts, fn [data] (offset i64, want int) ![]u8 { |
| 120 | end := if offset + i64(want) > i64(data.len) { i64(data.len) } else { offset + i64(want) } |
| 121 | return data[offset..end] |
| 122 | }, i64(data.len)) or { |
| 123 | c.abort_multipart(key, upload_id, opts) or {} |
| 124 | return err |
| 125 | } |
| 126 | c.complete_multipart(key, upload_id, parts, opts)! |
| 127 | } |
| 128 | |
| 129 | // upload_file_multipart streams a local file part-by-part with up to |
| 130 | // `Client.queue_size` concurrent uploads. Peak memory is roughly |
| 131 | // `queue_size * part_size`. |
| 132 | pub fn (c &Client) upload_file_multipart(key string, local_path string, size i64, opts PutOptions) ! { |
| 133 | mut f := os.open(local_path) or { |
| 134 | return new_error('LocalReadFailed', 'open ${local_path}: ${err.msg()}') |
| 135 | } |
| 136 | defer { |
| 137 | f.close() |
| 138 | } |
| 139 | upload_id := c.initiate_multipart(key, opts)! |
| 140 | parts := c.run_parallel_parts(key, upload_id, opts, fn [mut f] (offset i64, want int) ![]u8 { |
| 141 | mut buf := []u8{len: want} |
| 142 | n := f.read_bytes_into(u64(offset), mut buf) or { |
| 143 | return new_error('LocalReadFailed', 'read_bytes_into: ${err.msg()}') |
| 144 | } |
| 145 | if n <= 0 { |
| 146 | return new_error('LocalReadFailed', 'unexpected EOF at offset ${offset}') |
| 147 | } |
| 148 | return buf[..n] |
| 149 | }, size) or { |
| 150 | c.abort_multipart(key, upload_id, opts) or {} |
| 151 | return err |
| 152 | } |
| 153 | c.complete_multipart(key, upload_id, parts, opts)! |
| 154 | } |
| 155 | |
| 156 | // PartJob is a single upload-part task pushed onto the worker queue. |
| 157 | struct PartJob { |
| 158 | part_number int |
| 159 | data []u8 |
| 160 | } |
| 161 | |
| 162 | // PartResult carries the worker outcome back to the dispatcher. |
| 163 | struct PartResult { |
| 164 | part_number int |
| 165 | etag string |
| 166 | err ?IError |
| 167 | } |
| 168 | |
| 169 | // part_worker pulls jobs off `jobs` and pushes results onto `results`. Exits |
| 170 | // when `jobs` is closed. |
| 171 | fn (c &Client) part_worker(key string, upload_id string, opts PutOptions, jobs chan PartJob, results chan PartResult) { |
| 172 | for { |
| 173 | job := <-jobs or { return } |
| 174 | etag := c.upload_part(key, upload_id, job.part_number, job.data, opts) or { |
| 175 | results <- PartResult{ |
| 176 | part_number: job.part_number |
| 177 | err: err |
| 178 | } |
| 179 | continue |
| 180 | } |
| 181 | results <- PartResult{ |
| 182 | part_number: job.part_number |
| 183 | etag: etag |
| 184 | } |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | // run_parallel_parts dispatches `producer` chunks across a worker pool of |
| 189 | // `Client.queue_size` goroutines and gathers the resulting PartRefs. The |
| 190 | // caller is responsible for issuing `initiate_multipart` / `abort_multipart` |
| 191 | // / `complete_multipart` around this call. `producer(offset, want)` must |
| 192 | // return a freshly-allocated chunk of length ≤ `want` bytes (workers may |
| 193 | // hold the slice past the next call). Returns parts in completion order; |
| 194 | // `complete_multipart` re-sorts by part number before sending the manifest. |
| 195 | fn (c &Client) run_parallel_parts(key string, upload_id string, opts PutOptions, producer fn (offset i64, want int) ![]u8, total i64) ![]PartRef { |
| 196 | workers := if c.queue_size < 1 { 1 } else { c.queue_size } |
| 197 | part_size := if c.part_size < min_part_size { min_part_size } else { c.part_size } |
| 198 | jobs := chan PartJob{cap: workers} |
| 199 | results := chan PartResult{cap: workers} |
| 200 | for _ in 0 .. workers { |
| 201 | spawn c.part_worker(key, upload_id, opts, jobs, results) |
| 202 | } |
| 203 | |
| 204 | mut parts := []PartRef{} |
| 205 | mut first_err := ?IError(none) |
| 206 | mut sent := 0 |
| 207 | mut received := 0 |
| 208 | mut offset := i64(0) |
| 209 | mut part_number := 1 |
| 210 | |
| 211 | for offset < total && first_err == none { |
| 212 | if part_number > max_parts { |
| 213 | first_err = new_error('TooManyParts', |
| 214 | 'Exceeded ${max_parts} parts — increase part_size') |
| 215 | break |
| 216 | } |
| 217 | want := if offset + part_size > total { int(total - offset) } else { int(part_size) } |
| 218 | chunk := producer(offset, want) or { |
| 219 | first_err = err |
| 220 | break |
| 221 | } |
| 222 | if chunk.len == 0 { |
| 223 | break |
| 224 | } |
| 225 | jobs <- PartJob{ |
| 226 | part_number: part_number |
| 227 | data: chunk |
| 228 | } |
| 229 | sent++ |
| 230 | offset += i64(chunk.len) |
| 231 | part_number++ |
| 232 | // Drain ready results without blocking — surfaces errors early so we |
| 233 | // can stop dispatching new parts. |
| 234 | for { |
| 235 | select { |
| 236 | r := <-results { |
| 237 | received++ |
| 238 | if e := r.err { |
| 239 | if first_err == none { |
| 240 | first_err = e |
| 241 | } |
| 242 | } else { |
| 243 | parts << PartRef{ |
| 244 | part_number: r.part_number |
| 245 | etag: r.etag |
| 246 | } |
| 247 | } |
| 248 | } |
| 249 | else { |
| 250 | break |
| 251 | } |
| 252 | } |
| 253 | } |
| 254 | } |
| 255 | jobs.close() |
| 256 | for received < sent { |
| 257 | r := <-results |
| 258 | received++ |
| 259 | if e := r.err { |
| 260 | if first_err == none { |
| 261 | first_err = e |
| 262 | } |
| 263 | } else { |
| 264 | parts << PartRef{ |
| 265 | part_number: r.part_number |
| 266 | etag: r.etag |
| 267 | } |
| 268 | } |
| 269 | } |
| 270 | results.close() |
| 271 | if e := first_err { |
| 272 | return e |
| 273 | } |
| 274 | return parts |
| 275 | } |
| 276 | |
| 277 | // PartRef is one ETag/PartNumber pair recorded during multipart upload. |
| 278 | pub struct PartRef { |
| 279 | pub: |
| 280 | part_number int |
| 281 | etag string |
| 282 | } |
| 283 | |
| 284 | // initiate_multipart starts an upload and returns the UploadId. ACL, |
| 285 | // content-type, and friends from `opts` are sent here — they apply to the |
| 286 | // final object. |
| 287 | pub fn (c &Client) initiate_multipart(key string, opts PutOptions) !string { |
| 288 | creds := c.creds_for(opts.bucket) |
| 289 | path := build_object_path(creds, opts.bucket, key)! |
| 290 | signed := sign_request(creds, SignRequest{ |
| 291 | method: 'POST' |
| 292 | path: path |
| 293 | query: 'uploads=' |
| 294 | payload_hash: empty_sha256 |
| 295 | extra_headers: put_object_headers(opts) |
| 296 | })! |
| 297 | resp := c.do_http(signed, '')! |
| 298 | if resp.status_code != 200 { |
| 299 | return new_http_error(resp.status_code, key, resp.body) |
| 300 | } |
| 301 | upload_id := extract_xml_tag(resp.body, 'UploadId') |
| 302 | if upload_id == '' { |
| 303 | return new_error('InvalidResponse', 'CreateMultipartUploadResult missing UploadId') |
| 304 | } |
| 305 | return upload_id |
| 306 | } |
| 307 | |
| 308 | // upload_part uploads a single chunk and returns the server-side ETag. Up to |
| 309 | // `Client.retry` attempts with exponential backoff (200ms, 400ms, 800ms, …) |
| 310 | // on transient failures. |
| 311 | // |
| 312 | // The payload is always SHA-256-signed (not `UNSIGNED-PAYLOAD`) so the |
| 313 | // service validates byte-for-byte integrity end-to-end. Multipart uploads |
| 314 | // don't carry a Content-MD5 header by default; without payload signing a |
| 315 | // flipped bit in transit would silently produce a corrupt object that |
| 316 | // passes the multipart ETag check. |
| 317 | pub fn (c &Client) upload_part(key string, upload_id string, part_number int, data []u8, opts PutOptions) !string { |
| 318 | creds := c.creds_for(opts.bucket) |
| 319 | path := build_object_path(creds, opts.bucket, key)! |
| 320 | query := canonical_query_string({ |
| 321 | 'partNumber': part_number.str() |
| 322 | 'uploadId': upload_id |
| 323 | }) |
| 324 | // Hash the payload once — multi-MiB chunks should not be re-hashed per attempt. |
| 325 | // The signature itself still has to be recomputed on each retry because the |
| 326 | // `x-amz-date` header advances. |
| 327 | payload_hash := sha256_hex(data) |
| 328 | body := data.bytestr() |
| 329 | max_attempts := if c.retry < 1 { 1 } else { c.retry } |
| 330 | mut last_err := IError(new_error('NetworkError', 'upload_part: no attempt was made')) |
| 331 | for attempt in 0 .. max_attempts { |
| 332 | signed := sign_request(creds, SignRequest{ |
| 333 | method: 'PUT' |
| 334 | path: path |
| 335 | query: query |
| 336 | payload_hash: payload_hash |
| 337 | })! |
| 338 | resp := c.do_http(signed, body) or { |
| 339 | last_err = err |
| 340 | if attempt + 1 < max_attempts { |
| 341 | time.sleep((1 << attempt) * 200 * time.millisecond) |
| 342 | } |
| 343 | continue |
| 344 | } |
| 345 | if resp.status_code == 200 { |
| 346 | etag := resp.header.get(.etag) or { '' } |
| 347 | if etag == '' { |
| 348 | return new_error('InvalidResponse', 'UploadPart returned no ETag') |
| 349 | } |
| 350 | return etag.trim('"') |
| 351 | } |
| 352 | last_err = new_http_error(resp.status_code, key, resp.body) |
| 353 | if attempt + 1 < max_attempts { |
| 354 | time.sleep((1 << attempt) * 200 * time.millisecond) |
| 355 | } |
| 356 | } |
| 357 | return last_err |
| 358 | } |
| 359 | |
| 360 | // complete_multipart finalizes a multipart upload. Parts must be in ascending |
| 361 | // `part_number` order; we sort defensively. |
| 362 | pub fn (c &Client) complete_multipart(key string, upload_id string, parts []PartRef, opts PutOptions) ! { |
| 363 | mut sorted := parts.clone() |
| 364 | sorted.sort(a.part_number < b.part_number) |
| 365 | mut body := strings.new_builder(1024) |
| 366 | body.write_string('<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">') |
| 367 | for p in sorted { |
| 368 | body.write_string('<Part><PartNumber>${p.part_number}</PartNumber><ETag>"${p.etag}"</ETag></Part>') |
| 369 | } |
| 370 | body.write_string('</CompleteMultipartUpload>') |
| 371 | xml := body.str() |
| 372 | creds := c.creds_for(opts.bucket) |
| 373 | path := build_object_path(creds, opts.bucket, key)! |
| 374 | q := 'uploadId=' + uri_encode_query(upload_id) |
| 375 | signed := sign_request(creds, SignRequest{ |
| 376 | method: 'POST' |
| 377 | path: path |
| 378 | query: q |
| 379 | payload_hash: sha256_hex(xml.bytes()) |
| 380 | extra_headers: { |
| 381 | 'content-type': 'application/xml' |
| 382 | } |
| 383 | })! |
| 384 | resp := c.do_http(signed, xml)! |
| 385 | if resp.status_code != 200 { |
| 386 | return new_http_error(resp.status_code, key, resp.body) |
| 387 | } |
| 388 | // CompleteMultipartUpload may return HTTP 200 with an <Error> body — surface those. |
| 389 | if resp.body.contains('<Error>') { |
| 390 | return new_http_error(200, key, resp.body) |
| 391 | } |
| 392 | } |
| 393 | |
| 394 | // abort_multipart cancels an in-flight upload. Best-effort — callers usually |
| 395 | // invoke it inside an error path and don't care about the result. |
| 396 | pub fn (c &Client) abort_multipart(key string, upload_id string, opts PutOptions) ! { |
| 397 | creds := c.creds_for(opts.bucket) |
| 398 | path := build_object_path(creds, opts.bucket, key)! |
| 399 | q := 'uploadId=' + uri_encode_query(upload_id) |
| 400 | signed := sign_request(creds, SignRequest{ |
| 401 | method: 'DELETE' |
| 402 | path: path |
| 403 | query: q |
| 404 | payload_hash: empty_sha256 |
| 405 | })! |
| 406 | resp := c.do_http(signed, '')! |
| 407 | if resp.status_code !in [204, 200, 404] { |
| 408 | return new_http_error(resp.status_code, key, resp.body) |
| 409 | } |
| 410 | } |
| 411 | |