// Copyright (c) 2019-2026 Alexander Medvednikov. All rights reserved. // Use of this source code is governed by an MIT license // that can be found in the LICENSE file. module s3 import os import strings import time // Multipart upload constants — defaults aligned with the S3 protocol limits. pub const min_part_size = i64(5 * 1024 * 1024) // 5 MiB pub const max_part_size = i64(5 * 1024 * 1024 * 1024) // 5 GiB pub const max_parts = 10000 // hard S3 limit // upload_file streams a local file to S3, choosing single-part or multipart // based on size. Use this for files larger than ~50 MiB or anything you // don't want to slurp into memory. // // `key` is the destination object key. The local file is read in // `client.part_size` chunks (default 5 MiB). pub fn (c &Client) upload_file(key string, local_path string, opts PutOptions) ! { stat_local := os.stat(local_path) or { return new_error('LocalFileNotFound', 'Cannot stat local file: ${local_path} (${err.msg()})') } size := i64(stat_local.size) if size <= min_part_size { data := os.read_bytes(local_path) or { return new_error('LocalReadFailed', 'Cannot read ${local_path}: ${err.msg()}') } return c.put(key, data, opts) } c.upload_file_multipart(key, local_path, size, opts)! } // MultipartUploader is a stateful handle to an in-flight multipart upload, // produced by `Client.start_multipart`. Use it when you want to push parts // generated on-the-fly (network sources, decompressors, anything you don't // want to materialise on disk): // // mut up := c.start_multipart('key', s3.PutOptions{ content_type: 'application/octet-stream' })! // for chunk in chunks { up.upload(chunk)! } // up.complete()! // // On error, `complete()` / `upload()` return without aborting. The caller // must invoke `abort()` themselves so that `defer { up.abort() or {} }` // remains an explicit, visible cleanup hook. pub struct MultipartUploader { mut: client &Client key string upload_id string opts PutOptions parts []PartRef part_number int completed bool aborted bool } // start_multipart begins a multipart upload and returns a MultipartUploader. // Memory cost: zero — each `upload(chunk)` call streams the chunk to S3 and // returns when it is acknowledged. pub fn (c &Client) start_multipart(key string, opts PutOptions) !MultipartUploader { upload_id := c.initiate_multipart(key, opts)! return MultipartUploader{ // `unsafe { c }` is required to store the receiver's pointer in the // returned struct — V's borrow checker forbids this implicitly, even // though `Client` is `@[heap]` so the lifetime is safe. client: unsafe { c } key: key upload_id: upload_id opts: opts parts: []PartRef{} part_number: 1 } } // upload pushes one chunk as the next part. Each chunk MUST be at least // `min_part_size` (5 MiB) except the last one — that's an S3 invariant // the server enforces; we do not buffer for you. pub fn (mut u MultipartUploader) upload(data []u8) ! { if u.completed || u.aborted { return new_error('InvalidState', 'multipart upload is already finalised') } if u.part_number > max_parts { return new_error('TooManyParts', 'Exceeded ${max_parts} parts — increase part_size') } etag := u.client.upload_part(u.key, u.upload_id, u.part_number, data, u.opts)! u.parts << PartRef{ part_number: u.part_number etag: etag } u.part_number++ } // complete finalises the upload. After this call the object is visible. pub fn (mut u MultipartUploader) complete() ! { if u.completed { return } u.client.complete_multipart(u.key, u.upload_id, u.parts, u.opts)! u.completed = true } // abort cancels the in-flight upload. Idempotent. pub fn (mut u MultipartUploader) abort() ! { if u.aborted || u.completed { return } u.client.abort_multipart(u.key, u.upload_id, u.opts)! u.aborted = true } // upload_bytes_multipart uploads an in-memory byte buffer using multipart. // Useful for large generated payloads (test fixtures up to 5 GiB). Uses up // to `Client.queue_size` parallel part uploads. pub fn (c &Client) upload_bytes_multipart(key string, data []u8, opts PutOptions) ! { upload_id := c.initiate_multipart(key, opts)! parts := c.run_parallel_parts(key, upload_id, opts, fn [data] (offset i64, want int) ![]u8 { end := if offset + i64(want) > i64(data.len) { i64(data.len) } else { offset + i64(want) } return data[offset..end] }, i64(data.len)) or { c.abort_multipart(key, upload_id, opts) or {} return err } c.complete_multipart(key, upload_id, parts, opts)! } // upload_file_multipart streams a local file part-by-part with up to // `Client.queue_size` concurrent uploads. Peak memory is roughly // `queue_size * part_size`. pub fn (c &Client) upload_file_multipart(key string, local_path string, size i64, opts PutOptions) ! { mut f := os.open(local_path) or { return new_error('LocalReadFailed', 'open ${local_path}: ${err.msg()}') } defer { f.close() } upload_id := c.initiate_multipart(key, opts)! parts := c.run_parallel_parts(key, upload_id, opts, fn [mut f] (offset i64, want int) ![]u8 { mut buf := []u8{len: want} n := f.read_bytes_into(u64(offset), mut buf) or { return new_error('LocalReadFailed', 'read_bytes_into: ${err.msg()}') } if n <= 0 { return new_error('LocalReadFailed', 'unexpected EOF at offset ${offset}') } return buf[..n] }, size) or { c.abort_multipart(key, upload_id, opts) or {} return err } c.complete_multipart(key, upload_id, parts, opts)! } // PartJob is a single upload-part task pushed onto the worker queue. struct PartJob { part_number int data []u8 } // PartResult carries the worker outcome back to the dispatcher. struct PartResult { part_number int etag string err ?IError } // part_worker pulls jobs off `jobs` and pushes results onto `results`. Exits // when `jobs` is closed. fn (c &Client) part_worker(key string, upload_id string, opts PutOptions, jobs chan PartJob, results chan PartResult) { for { job := <-jobs or { return } etag := c.upload_part(key, upload_id, job.part_number, job.data, opts) or { results <- PartResult{ part_number: job.part_number err: err } continue } results <- PartResult{ part_number: job.part_number etag: etag } } } // run_parallel_parts dispatches `producer` chunks across a worker pool of // `Client.queue_size` goroutines and gathers the resulting PartRefs. The // caller is responsible for issuing `initiate_multipart` / `abort_multipart` // / `complete_multipart` around this call. `producer(offset, want)` must // return a freshly-allocated chunk of length ≤ `want` bytes (workers may // hold the slice past the next call). Returns parts in completion order; // `complete_multipart` re-sorts by part number before sending the manifest. fn (c &Client) run_parallel_parts(key string, upload_id string, opts PutOptions, producer fn (offset i64, want int) ![]u8, total i64) ![]PartRef { workers := if c.queue_size < 1 { 1 } else { c.queue_size } part_size := if c.part_size < min_part_size { min_part_size } else { c.part_size } jobs := chan PartJob{cap: workers} results := chan PartResult{cap: workers} for _ in 0 .. workers { spawn c.part_worker(key, upload_id, opts, jobs, results) } mut parts := []PartRef{} mut first_err := ?IError(none) mut sent := 0 mut received := 0 mut offset := i64(0) mut part_number := 1 for offset < total && first_err == none { if part_number > max_parts { first_err = new_error('TooManyParts', 'Exceeded ${max_parts} parts — increase part_size') break } want := if offset + part_size > total { int(total - offset) } else { int(part_size) } chunk := producer(offset, want) or { first_err = err break } if chunk.len == 0 { break } jobs <- PartJob{ part_number: part_number data: chunk } sent++ offset += i64(chunk.len) part_number++ // Drain ready results without blocking — surfaces errors early so we // can stop dispatching new parts. for { select { r := <-results { received++ if e := r.err { if first_err == none { first_err = e } } else { parts << PartRef{ part_number: r.part_number etag: r.etag } } } else { break } } } } jobs.close() for received < sent { r := <-results received++ if e := r.err { if first_err == none { first_err = e } } else { parts << PartRef{ part_number: r.part_number etag: r.etag } } } results.close() if e := first_err { return e } return parts } // PartRef is one ETag/PartNumber pair recorded during multipart upload. pub struct PartRef { pub: part_number int etag string } // initiate_multipart starts an upload and returns the UploadId. ACL, // content-type, and friends from `opts` are sent here — they apply to the // final object. pub fn (c &Client) initiate_multipart(key string, opts PutOptions) !string { creds := c.creds_for(opts.bucket) path := build_object_path(creds, opts.bucket, key)! signed := sign_request(creds, SignRequest{ method: 'POST' path: path query: 'uploads=' payload_hash: empty_sha256 extra_headers: put_object_headers(opts) })! resp := c.do_http(signed, '')! if resp.status_code != 200 { return new_http_error(resp.status_code, key, resp.body) } upload_id := extract_xml_tag(resp.body, 'UploadId') if upload_id == '' { return new_error('InvalidResponse', 'CreateMultipartUploadResult missing UploadId') } return upload_id } // upload_part uploads a single chunk and returns the server-side ETag. Up to // `Client.retry` attempts with exponential backoff (200ms, 400ms, 800ms, …) // on transient failures. // // The payload is always SHA-256-signed (not `UNSIGNED-PAYLOAD`) so the // service validates byte-for-byte integrity end-to-end. Multipart uploads // don't carry a Content-MD5 header by default; without payload signing a // flipped bit in transit would silently produce a corrupt object that // passes the multipart ETag check. pub fn (c &Client) upload_part(key string, upload_id string, part_number int, data []u8, opts PutOptions) !string { creds := c.creds_for(opts.bucket) path := build_object_path(creds, opts.bucket, key)! query := canonical_query_string({ 'partNumber': part_number.str() 'uploadId': upload_id }) // Hash the payload once — multi-MiB chunks should not be re-hashed per attempt. // The signature itself still has to be recomputed on each retry because the // `x-amz-date` header advances. payload_hash := sha256_hex(data) body := data.bytestr() max_attempts := if c.retry < 1 { 1 } else { c.retry } mut last_err := IError(new_error('NetworkError', 'upload_part: no attempt was made')) for attempt in 0 .. max_attempts { signed := sign_request(creds, SignRequest{ method: 'PUT' path: path query: query payload_hash: payload_hash })! resp := c.do_http(signed, body) or { last_err = err if attempt + 1 < max_attempts { time.sleep((1 << attempt) * 200 * time.millisecond) } continue } if resp.status_code == 200 { etag := resp.header.get(.etag) or { '' } if etag == '' { return new_error('InvalidResponse', 'UploadPart returned no ETag') } return etag.trim('"') } last_err = new_http_error(resp.status_code, key, resp.body) if attempt + 1 < max_attempts { time.sleep((1 << attempt) * 200 * time.millisecond) } } return last_err } // complete_multipart finalizes a multipart upload. Parts must be in ascending // `part_number` order; we sort defensively. pub fn (c &Client) complete_multipart(key string, upload_id string, parts []PartRef, opts PutOptions) ! { mut sorted := parts.clone() sorted.sort(a.part_number < b.part_number) mut body := strings.new_builder(1024) body.write_string('') for p in sorted { body.write_string('${p.part_number}"${p.etag}"') } body.write_string('') xml := body.str() creds := c.creds_for(opts.bucket) path := build_object_path(creds, opts.bucket, key)! q := 'uploadId=' + uri_encode_query(upload_id) signed := sign_request(creds, SignRequest{ method: 'POST' path: path query: q payload_hash: sha256_hex(xml.bytes()) extra_headers: { 'content-type': 'application/xml' } })! resp := c.do_http(signed, xml)! if resp.status_code != 200 { return new_http_error(resp.status_code, key, resp.body) } // CompleteMultipartUpload may return HTTP 200 with an body — surface those. if resp.body.contains('') { return new_http_error(200, key, resp.body) } } // abort_multipart cancels an in-flight upload. Best-effort — callers usually // invoke it inside an error path and don't care about the result. pub fn (c &Client) abort_multipart(key string, upload_id string, opts PutOptions) ! { creds := c.creds_for(opts.bucket) path := build_object_path(creds, opts.bucket, key)! q := 'uploadId=' + uri_encode_query(upload_id) signed := sign_request(creds, SignRequest{ method: 'DELETE' path: path query: q payload_hash: empty_sha256 })! resp := c.do_http(signed, '')! if resp.status_code !in [204, 200, 404] { return new_http_error(resp.status_code, key, resp.body) } }