v / vlib / net / s3 / multipart.v
410 lines · 389 sloc · 13.21 KB · 4142432483c4e8de44ab7b0d6ac944f3251e03c8
Raw
1// Copyright (c) 2019-2026 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module s3
5
6import os
7import strings
8import time
9
10// Multipart upload constants — defaults aligned with the S3 protocol limits.
11pub const min_part_size = i64(5 * 1024 * 1024) // 5 MiB
12pub const max_part_size = i64(5 * 1024 * 1024 * 1024) // 5 GiB
13pub const max_parts = 10000 // hard S3 limit
14
15// upload_file streams a local file to S3, choosing single-part or multipart
16// based on size. Use this for files larger than ~50 MiB or anything you
17// don't want to slurp into memory.
18//
19// `key` is the destination object key. The local file is read in
20// `client.part_size` chunks (default 5 MiB).
21pub fn (c &Client) upload_file(key string, local_path string, opts PutOptions) ! {
22 stat_local := os.stat(local_path) or {
23 return new_error('LocalFileNotFound',
24 'Cannot stat local file: ${local_path} (${err.msg()})')
25 }
26 size := i64(stat_local.size)
27 if size <= min_part_size {
28 data := os.read_bytes(local_path) or {
29 return new_error('LocalReadFailed', 'Cannot read ${local_path}: ${err.msg()}')
30 }
31 return c.put(key, data, opts)
32 }
33 c.upload_file_multipart(key, local_path, size, opts)!
34}
35
36// MultipartUploader is a stateful handle to an in-flight multipart upload,
37// produced by `Client.start_multipart`. Use it when you want to push parts
38// generated on-the-fly (network sources, decompressors, anything you don't
39// want to materialise on disk):
40//
41// mut up := c.start_multipart('key', s3.PutOptions{ content_type: 'application/octet-stream' })!
42// for chunk in chunks { up.upload(chunk)! }
43// up.complete()!
44//
45// On error, `complete()` / `upload()` return without aborting. The caller
46// must invoke `abort()` themselves so that `defer { up.abort() or {} }`
47// remains an explicit, visible cleanup hook.
48pub struct MultipartUploader {
49mut:
50 client &Client
51 key string
52 upload_id string
53 opts PutOptions
54 parts []PartRef
55 part_number int
56 completed bool
57 aborted bool
58}
59
60// start_multipart begins a multipart upload and returns a MultipartUploader.
61// Memory cost: zero — each `upload(chunk)` call streams the chunk to S3 and
62// returns when it is acknowledged.
63pub fn (c &Client) start_multipart(key string, opts PutOptions) !MultipartUploader {
64 upload_id := c.initiate_multipart(key, opts)!
65 return MultipartUploader{
66 // `unsafe { c }` is required to store the receiver's pointer in the
67 // returned struct — V's borrow checker forbids this implicitly, even
68 // though `Client` is `@[heap]` so the lifetime is safe.
69 client: unsafe { c }
70 key: key
71 upload_id: upload_id
72 opts: opts
73 parts: []PartRef{}
74 part_number: 1
75 }
76}
77
78// upload pushes one chunk as the next part. Each chunk MUST be at least
79// `min_part_size` (5 MiB) except the last one — that's an S3 invariant
80// the server enforces; we do not buffer for you.
81pub fn (mut u MultipartUploader) upload(data []u8) ! {
82 if u.completed || u.aborted {
83 return new_error('InvalidState', 'multipart upload is already finalised')
84 }
85 if u.part_number > max_parts {
86 return new_error('TooManyParts', 'Exceeded ${max_parts} parts — increase part_size')
87 }
88 etag := u.client.upload_part(u.key, u.upload_id, u.part_number, data, u.opts)!
89 u.parts << PartRef{
90 part_number: u.part_number
91 etag: etag
92 }
93 u.part_number++
94}
95
96// complete finalises the upload. After this call the object is visible.
97pub fn (mut u MultipartUploader) complete() ! {
98 if u.completed {
99 return
100 }
101 u.client.complete_multipart(u.key, u.upload_id, u.parts, u.opts)!
102 u.completed = true
103}
104
105// abort cancels the in-flight upload. Idempotent.
106pub fn (mut u MultipartUploader) abort() ! {
107 if u.aborted || u.completed {
108 return
109 }
110 u.client.abort_multipart(u.key, u.upload_id, u.opts)!
111 u.aborted = true
112}
113
114// upload_bytes_multipart uploads an in-memory byte buffer using multipart.
115// Useful for large generated payloads (test fixtures up to 5 GiB). Uses up
116// to `Client.queue_size` parallel part uploads.
117pub fn (c &Client) upload_bytes_multipart(key string, data []u8, opts PutOptions) ! {
118 upload_id := c.initiate_multipart(key, opts)!
119 parts := c.run_parallel_parts(key, upload_id, opts, fn [data] (offset i64, want int) ![]u8 {
120 end := if offset + i64(want) > i64(data.len) { i64(data.len) } else { offset + i64(want) }
121 return data[offset..end]
122 }, i64(data.len)) or {
123 c.abort_multipart(key, upload_id, opts) or {}
124 return err
125 }
126 c.complete_multipart(key, upload_id, parts, opts)!
127}
128
129// upload_file_multipart streams a local file part-by-part with up to
130// `Client.queue_size` concurrent uploads. Peak memory is roughly
131// `queue_size * part_size`.
132pub fn (c &Client) upload_file_multipart(key string, local_path string, size i64, opts PutOptions) ! {
133 mut f := os.open(local_path) or {
134 return new_error('LocalReadFailed', 'open ${local_path}: ${err.msg()}')
135 }
136 defer {
137 f.close()
138 }
139 upload_id := c.initiate_multipart(key, opts)!
140 parts := c.run_parallel_parts(key, upload_id, opts, fn [mut f] (offset i64, want int) ![]u8 {
141 mut buf := []u8{len: want}
142 n := f.read_bytes_into(u64(offset), mut buf) or {
143 return new_error('LocalReadFailed', 'read_bytes_into: ${err.msg()}')
144 }
145 if n <= 0 {
146 return new_error('LocalReadFailed', 'unexpected EOF at offset ${offset}')
147 }
148 return buf[..n]
149 }, size) or {
150 c.abort_multipart(key, upload_id, opts) or {}
151 return err
152 }
153 c.complete_multipart(key, upload_id, parts, opts)!
154}
155
156// PartJob is a single upload-part task pushed onto the worker queue.
157struct PartJob {
158 part_number int
159 data []u8
160}
161
162// PartResult carries the worker outcome back to the dispatcher.
163struct PartResult {
164 part_number int
165 etag string
166 err ?IError
167}
168
169// part_worker pulls jobs off `jobs` and pushes results onto `results`. Exits
170// when `jobs` is closed.
171fn (c &Client) part_worker(key string, upload_id string, opts PutOptions, jobs chan PartJob, results chan PartResult) {
172 for {
173 job := <-jobs or { return }
174 etag := c.upload_part(key, upload_id, job.part_number, job.data, opts) or {
175 results <- PartResult{
176 part_number: job.part_number
177 err: err
178 }
179 continue
180 }
181 results <- PartResult{
182 part_number: job.part_number
183 etag: etag
184 }
185 }
186}
187
188// run_parallel_parts dispatches `producer` chunks across a worker pool of
189// `Client.queue_size` goroutines and gathers the resulting PartRefs. The
190// caller is responsible for issuing `initiate_multipart` / `abort_multipart`
191// / `complete_multipart` around this call. `producer(offset, want)` must
192// return a freshly-allocated chunk of length ≤ `want` bytes (workers may
193// hold the slice past the next call). Returns parts in completion order;
194// `complete_multipart` re-sorts by part number before sending the manifest.
195fn (c &Client) run_parallel_parts(key string, upload_id string, opts PutOptions, producer fn (offset i64, want int) ![]u8, total i64) ![]PartRef {
196 workers := if c.queue_size < 1 { 1 } else { c.queue_size }
197 part_size := if c.part_size < min_part_size { min_part_size } else { c.part_size }
198 jobs := chan PartJob{cap: workers}
199 results := chan PartResult{cap: workers}
200 for _ in 0 .. workers {
201 spawn c.part_worker(key, upload_id, opts, jobs, results)
202 }
203
204 mut parts := []PartRef{}
205 mut first_err := ?IError(none)
206 mut sent := 0
207 mut received := 0
208 mut offset := i64(0)
209 mut part_number := 1
210
211 for offset < total && first_err == none {
212 if part_number > max_parts {
213 first_err = new_error('TooManyParts',
214 'Exceeded ${max_parts} parts — increase part_size')
215 break
216 }
217 want := if offset + part_size > total { int(total - offset) } else { int(part_size) }
218 chunk := producer(offset, want) or {
219 first_err = err
220 break
221 }
222 if chunk.len == 0 {
223 break
224 }
225 jobs <- PartJob{
226 part_number: part_number
227 data: chunk
228 }
229 sent++
230 offset += i64(chunk.len)
231 part_number++
232 // Drain ready results without blocking — surfaces errors early so we
233 // can stop dispatching new parts.
234 for {
235 select {
236 r := <-results {
237 received++
238 if e := r.err {
239 if first_err == none {
240 first_err = e
241 }
242 } else {
243 parts << PartRef{
244 part_number: r.part_number
245 etag: r.etag
246 }
247 }
248 }
249 else {
250 break
251 }
252 }
253 }
254 }
255 jobs.close()
256 for received < sent {
257 r := <-results
258 received++
259 if e := r.err {
260 if first_err == none {
261 first_err = e
262 }
263 } else {
264 parts << PartRef{
265 part_number: r.part_number
266 etag: r.etag
267 }
268 }
269 }
270 results.close()
271 if e := first_err {
272 return e
273 }
274 return parts
275}
276
277// PartRef is one ETag/PartNumber pair recorded during multipart upload.
278pub struct PartRef {
279pub:
280 part_number int
281 etag string
282}
283
284// initiate_multipart starts an upload and returns the UploadId. ACL,
285// content-type, and friends from `opts` are sent here — they apply to the
286// final object.
287pub fn (c &Client) initiate_multipart(key string, opts PutOptions) !string {
288 creds := c.creds_for(opts.bucket)
289 path := build_object_path(creds, opts.bucket, key)!
290 signed := sign_request(creds, SignRequest{
291 method: 'POST'
292 path: path
293 query: 'uploads='
294 payload_hash: empty_sha256
295 extra_headers: put_object_headers(opts)
296 })!
297 resp := c.do_http(signed, '')!
298 if resp.status_code != 200 {
299 return new_http_error(resp.status_code, key, resp.body)
300 }
301 upload_id := extract_xml_tag(resp.body, 'UploadId')
302 if upload_id == '' {
303 return new_error('InvalidResponse', 'CreateMultipartUploadResult missing UploadId')
304 }
305 return upload_id
306}
307
308// upload_part uploads a single chunk and returns the server-side ETag. Up to
309// `Client.retry` attempts with exponential backoff (200ms, 400ms, 800ms, …)
310// on transient failures.
311//
312// The payload is always SHA-256-signed (not `UNSIGNED-PAYLOAD`) so the
313// service validates byte-for-byte integrity end-to-end. Multipart uploads
314// don't carry a Content-MD5 header by default; without payload signing a
315// flipped bit in transit would silently produce a corrupt object that
316// passes the multipart ETag check.
317pub fn (c &Client) upload_part(key string, upload_id string, part_number int, data []u8, opts PutOptions) !string {
318 creds := c.creds_for(opts.bucket)
319 path := build_object_path(creds, opts.bucket, key)!
320 query := canonical_query_string({
321 'partNumber': part_number.str()
322 'uploadId': upload_id
323 })
324 // Hash the payload once — multi-MiB chunks should not be re-hashed per attempt.
325 // The signature itself still has to be recomputed on each retry because the
326 // `x-amz-date` header advances.
327 payload_hash := sha256_hex(data)
328 body := data.bytestr()
329 max_attempts := if c.retry < 1 { 1 } else { c.retry }
330 mut last_err := IError(new_error('NetworkError', 'upload_part: no attempt was made'))
331 for attempt in 0 .. max_attempts {
332 signed := sign_request(creds, SignRequest{
333 method: 'PUT'
334 path: path
335 query: query
336 payload_hash: payload_hash
337 })!
338 resp := c.do_http(signed, body) or {
339 last_err = err
340 if attempt + 1 < max_attempts {
341 time.sleep((1 << attempt) * 200 * time.millisecond)
342 }
343 continue
344 }
345 if resp.status_code == 200 {
346 etag := resp.header.get(.etag) or { '' }
347 if etag == '' {
348 return new_error('InvalidResponse', 'UploadPart returned no ETag')
349 }
350 return etag.trim('"')
351 }
352 last_err = new_http_error(resp.status_code, key, resp.body)
353 if attempt + 1 < max_attempts {
354 time.sleep((1 << attempt) * 200 * time.millisecond)
355 }
356 }
357 return last_err
358}
359
360// complete_multipart finalizes a multipart upload. Parts must be in ascending
361// `part_number` order; we sort defensively.
362pub fn (c &Client) complete_multipart(key string, upload_id string, parts []PartRef, opts PutOptions) ! {
363 mut sorted := parts.clone()
364 sorted.sort(a.part_number < b.part_number)
365 mut body := strings.new_builder(1024)
366 body.write_string('<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">')
367 for p in sorted {
368 body.write_string('<Part><PartNumber>${p.part_number}</PartNumber><ETag>"${p.etag}"</ETag></Part>')
369 }
370 body.write_string('</CompleteMultipartUpload>')
371 xml := body.str()
372 creds := c.creds_for(opts.bucket)
373 path := build_object_path(creds, opts.bucket, key)!
374 q := 'uploadId=' + uri_encode_query(upload_id)
375 signed := sign_request(creds, SignRequest{
376 method: 'POST'
377 path: path
378 query: q
379 payload_hash: sha256_hex(xml.bytes())
380 extra_headers: {
381 'content-type': 'application/xml'
382 }
383 })!
384 resp := c.do_http(signed, xml)!
385 if resp.status_code != 200 {
386 return new_http_error(resp.status_code, key, resp.body)
387 }
388 // CompleteMultipartUpload may return HTTP 200 with an <Error> body — surface those.
389 if resp.body.contains('<Error>') {
390 return new_http_error(200, key, resp.body)
391 }
392}
393
394// abort_multipart cancels an in-flight upload. Best-effort — callers usually
395// invoke it inside an error path and don't care about the result.
396pub fn (c &Client) abort_multipart(key string, upload_id string, opts PutOptions) ! {
397 creds := c.creds_for(opts.bucket)
398 path := build_object_path(creds, opts.bucket, key)!
399 q := 'uploadId=' + uri_encode_query(upload_id)
400 signed := sign_request(creds, SignRequest{
401 method: 'DELETE'
402 path: path
403 query: q
404 payload_hash: empty_sha256
405 })!
406 resp := c.do_http(signed, '')!
407 if resp.status_code !in [204, 200, 404] {
408 return new_http_error(resp.status_code, key, resp.body)
409 }
410}
411