v / vlib / compress / snappy / snappy_framing.v
444 lines · 389 sloc · 13.84 KB · 37513121a31460af1448cbe957f474d30e6f0516
Raw
1// Snappy framing format — streaming wrapper around raw Snappy blocks.
2//
3// Spec: https://github.com/google/snappy/blob/main/framing_format.txt
4//
5// A framed stream is a sequence of typed chunks, each with a 4-byte header:
6//
7// [type: 1 byte] [length: 3 bytes, little-endian]
8//
9// Chunk types:
10// 0xff Stream identifier — must be the first chunk; payload = "sNaPpY"
11// 0x00 Compressed data — 4-byte masked CRC32C + Snappy-compressed block
12// 0x01 Uncompressed data — 4-byte masked CRC32C + raw block
13// 0xfe Padding — payload is ignored
14// 0x80..0xcd Reserved skippable — may be silently ignored
15// 0x02..0x7f Reserved unskippable — must be treated as an error
16//
17// Public API:
18// encode_stream(input []u8) []u8
19// decode_stream(input []u8) ![]u8
20//
21// For incremental (push-based) I/O see StreamEncoder and StreamDecoder.
22
23module snappy
24
25import math
26import io
27
28// ---------------------------------------------------------------------------
29// Constants
30// ---------------------------------------------------------------------------
31
32const stream_identifier_magic = 'sNaPpY'.bytes()
33const chunk_type_stream_id = u8(0xff)
34const chunk_type_compressed = u8(0x00)
35const chunk_type_uncompressed = u8(0x01)
36const chunk_type_padding = u8(0xfe)
37
38// Pre encoded stream header
39const stream_header = [
40 chunk_type_stream_id,
41 u8(stream_identifier_magic.len & 0xff),
42 u8((stream_identifier_magic.len >> 8) & 0xff),
43 u8((stream_identifier_magic.len >> 16) & 0xff),
44 stream_identifier_magic[0],
45 stream_identifier_magic[1],
46 stream_identifier_magic[2],
47 stream_identifier_magic[3],
48 stream_identifier_magic[4],
49 stream_identifier_magic[5],
50]
51
52// max_chunk_data_size is the largest block of uncompressed data per chunk.
53const max_chunk_data_size = 65536
54
55// ---------------------------------------------------------------------------
56// One-shot API
57// ---------------------------------------------------------------------------
58
59// encode_stream wraps `input` in the Snappy framing format.
60// It returns the complete byte stream. Input is split into <=64 KiB blocks automatically.
61pub fn encode_stream(input []u8) []u8 {
62 // Upper bound: identifier chunk + one compressed chunk per block.
63 // Each block: 4-byte header + 4-byte CRC + max_compressed_length.
64 blocks := (input.len + max_chunk_data_size - 1) / max_chunk_data_size
65 mut out := []u8{cap: 10 + blocks * (8 + max_compressed_length(max_chunk_data_size))}
66
67 out << stream_header
68
69 mut pos := 0
70 for pos < input.len {
71 end := math.min(pos + max_chunk_data_size, input.len)
72 write_data_chunk(mut out, input[pos..end])
73 pos = end
74 }
75
76 // Always emit at least the identifier for empty input.
77 return out
78}
79
80// decode_stream decodes a Snappy framing stream.
81// It returns the concatenated uncompressed data, or an error if the stream is malformed.
82pub fn decode_stream(input []u8) ![]u8 {
83 // The compressed stream is typically smaller than the uncompressed data,
84 // so use a larger initial capacity estimate.
85 mut out := []u8{cap: input.len * 2}
86 mut pos := 0
87
88 // The very first chunk must be the stream identifier.
89 pos = read_stream_identifier(input, pos)!
90
91 for pos < input.len {
92 chunk_type, chunk_len, new_pos := read_chunk_header(input, pos)!
93 pos = new_pos
94
95 if pos + chunk_len > input.len {
96 return error('snappy framing: chunk body truncated at pos ${pos}')
97 }
98 chunk_data := input[pos..pos + chunk_len]
99 pos += chunk_len
100
101 if chunk_type == chunk_type_compressed {
102 uncompressed := decode_compressed_chunk(chunk_data)!
103 out << uncompressed
104 } else if chunk_type == chunk_type_uncompressed {
105 raw := decode_uncompressed_chunk(chunk_data)!
106 out << raw
107 } else if chunk_type == chunk_type_padding || int(chunk_type) >= 0x80 {
108 // 0xfe = padding, 0x80..0xfd = reserved skippable; ignore.
109 continue
110 } else {
111 // 0x02..0x7f — reserved unskippable; must error.
112 return error('snappy framing: unsupported unskippable chunk type 0x${chunk_type:02x}')
113 }
114 }
115
116 return out
117}
118
119// ---------------------------------------------------------------------------
120// Incremental encoder
121// ---------------------------------------------------------------------------
122
123// StreamEncoder holds state for incremental framing-format encoding.
124// Call write() one or more times, then close() to flush the final chunk.
125// Take the encoded bytes using read().
126pub struct StreamEncoder {
127mut:
128 buf []u8 // pending uncompressed bytes
129 out []u8 = stream_header // accumulated encoded output, initialized with stream identifier
130 closed bool
131}
132
133// write appends `buf` to the encoder, flushing complete 64 KiB chunks.
134pub fn (mut enc StreamEncoder) write(buf []u8) !int {
135 if enc.closed {
136 return error('snappy: write on a closed stream encoder')
137 }
138
139 enc.buf << buf
140 for enc.buf.len >= max_chunk_data_size {
141 block := enc.buf[..max_chunk_data_size]
142 write_data_chunk(mut enc.out, block)
143 enc.buf = enc.buf[max_chunk_data_size..].clone()
144 }
145
146 return buf.len
147}
148
149pub fn (mut enc StreamEncoder) read(mut buf []u8) !int {
150 if enc.out.len == 0 {
151 if enc.closed {
152 return io.Eof{}
153 }
154 return 0 // not finished yet, just no data ready, the caller should feed more
155 }
156
157 n := copy(mut buf, enc.out)
158
159 if n == enc.out.len {
160 enc.out = []u8{}
161 } else {
162 enc.out = enc.out[n..].clone()
163 }
164
165 return n
166}
167
168// close flushes any remaining buffered bytes and returns the complete stream.
169// The encoder must not be used after this call.
170pub fn (mut enc StreamEncoder) close() {
171 if enc.closed {
172 return
173 }
174
175 if enc.buf.len > 0 {
176 write_data_chunk(mut enc.out, enc.buf)
177 enc.buf = []
178 }
179 enc.closed = true
180}
181
182// peek returns the encoded bytes produced so far without flushing.
183pub fn (enc StreamEncoder) peek() []u8 {
184 return enc.out
185}
186
187// ---------------------------------------------------------------------------
188// Incremental decoder
189// ---------------------------------------------------------------------------
190
191// StreamDecoder holds state for incremental framing-format decoding.
192// Call write() with successive chunks of compressed input, and call read() to read the decoded bytes.
193// The writer must call close() when it finishes writing the bytes.
194pub struct StreamDecoder {
195mut:
196 buf []u8 // unparsed compressed bytes
197 offset int // read position within buf (avoids repeated cloning)
198 identified bool // have we seen the stream identifier chunk?
199 err ?IError
200 output []u8 // decoded bytes ready for the caller
201 closed bool
202}
203
204// write appends `buf` to the decoder's input buffer.
205// It processes as many complete chunks as possible and errors if the stream is malformed.
206pub fn (mut dec StreamDecoder) write(buf []u8) !int {
207 if dec.closed {
208 return error('snappy: write to closed stream decoder')
209 }
210 if err := dec.err {
211 return err
212 }
213 dec.buf << buf
214
215 for {
216 remaining := dec.buf.len - dec.offset
217 // Need at least 4 bytes for the chunk header.
218 if remaining < 4 {
219 break
220 }
221 chunk_type, chunk_len, hdr_end := read_chunk_header(dec.buf, dec.offset) or {
222 dec_err := error('snappy: failed while reading chunk header: ${err}')
223 dec.err = dec_err
224 return dec_err
225 }
226 if hdr_end + chunk_len > dec.buf.len {
227 break // wait for more data
228 }
229 chunk_data := dec.buf[hdr_end..hdr_end + chunk_len]
230
231 if !dec.identified {
232 if chunk_type != chunk_type_stream_id {
233 dec_err := error('snappy framing: stream does not begin with identifier chunk')
234 dec.err = dec_err
235 return dec_err
236 }
237 if chunk_len != stream_identifier_magic.len {
238 dec_err := error('snappy framing: bad identifier chunk length ${chunk_len}')
239 dec.err = dec_err
240 return dec_err
241 }
242 if chunk_data != stream_identifier_magic {
243 dec_err := error('snappy framing: bad stream identifier payload')
244 dec.err = dec_err
245 return dec_err
246 }
247 dec.identified = true
248 } else if chunk_type == chunk_type_compressed {
249 uncompressed := decode_compressed_chunk(chunk_data) or {
250 dec_err := error('snappy: failed while decoding compressed chunk: ${err}')
251 dec.err = dec_err
252 return dec_err
253 }
254 dec.output << uncompressed
255 } else if chunk_type == chunk_type_uncompressed {
256 raw := decode_uncompressed_chunk(chunk_data) or {
257 dec_err := error('snappy: failed while decoding uncompressed chunk: ${err}')
258 dec.err = dec_err
259 return dec_err
260 }
261 dec.output << raw
262 } else if chunk_type == chunk_type_padding || int(chunk_type) >= 0x80 {
263 // skip
264 } else {
265 dec_err :=
266 error('snappy framing: unsupported unskippable chunk type 0x${chunk_type:02x}')
267 dec.err = dec_err
268 return dec_err
269 }
270
271 dec.offset = hdr_end + chunk_len
272 }
273
274 // Compact the buffer: discard consumed bytes only when worthwhile
275 // to avoid copying on every single chunk.
276 if dec.offset > 0 {
277 if dec.offset == dec.buf.len {
278 dec.buf = []u8{}
279 } else {
280 dec.buf = dec.buf[dec.offset..].clone()
281 }
282 dec.offset = 0
283 }
284
285 return buf.len
286}
287
288// read removes and returns decoded bytes.
289// When the output buffer is empty it surfaces any error recorded during write()/close().
290// Only then does it return io.Eof{} for a cleanly terminated stream.
291pub fn (mut dec StreamDecoder) read(mut buf []u8) !int {
292 // Always drain already-decoded output first so the caller never loses data.
293 if dec.output.len > 0 {
294 n := copy(mut buf, dec.output)
295 dec.output = dec.output[n..]
296 return n
297 }
298
299 if dec.closed {
300 // Surface truncation / malformed-stream errors before EOF.
301 if err := dec.err {
302 return err
303 }
304 return io.Eof{}
305 }
306
307 return 0
308}
309
310// close marks the stream finished and validates its terminal state.
311// Returns an error if the stream was truncated or the identifier was never received.
312// The error is also stored internally so read() will surface it after draining output.
313pub fn (mut dec StreamDecoder) close() ! {
314 if dec.closed {
315 return
316 }
317 dec.closed = true
318
319 remaining := dec.buf.len - dec.offset
320
321 if remaining > 0 {
322 err :=
323 error('snappy framing: stream closed with ${remaining} unprocessed bytes (truncated chunk)')
324 dec.err = err
325 return err
326 }
327
328 if !dec.identified && dec.buf.len > 0 {
329 err := error('snappy framing: stream closed before identifier chunk was received')
330 dec.err = err
331 return err
332 }
333}
334
335// ---------------------------------------------------------------------------
336// Chunk writers
337// ---------------------------------------------------------------------------
338
339// write_data_chunk compresses `block`, chooses the compact representation,
340// and appends the chunk (type + header + masked CRC + payload) to `out`.
341fn write_data_chunk(mut out []u8, block []u8) {
342 crc := mask_crc(crc32c(block))
343 compressed := compress(block)
344
345 // Use the compressed form only when it is strictly smaller.
346 if compressed.len < block.len {
347 // Compressed chunk: type=0x00, length=4+compressed.len
348 out << chunk_type_compressed
349 write_u24le(mut out, 4 + compressed.len)
350 write_u32le(mut out, crc)
351 out << compressed
352 } else {
353 // Uncompressed chunk: type=0x01, length=4+block.len
354 out << chunk_type_uncompressed
355 write_u24le(mut out, 4 + block.len)
356 write_u32le(mut out, crc)
357 out << block
358 }
359}
360
361// ---------------------------------------------------------------------------
362// Chunk readers
363// ---------------------------------------------------------------------------
364
365// read_stream_identifier reads and validates the identifier chunk starting at
366// `pos` and returns the position immediately after it.
367fn read_stream_identifier(data []u8, pos int) !int {
368 chunk_type, chunk_len, new_pos := read_chunk_header(data, pos)!
369 if chunk_type != chunk_type_stream_id {
370 return error('snappy framing: stream must begin with identifier chunk, got 0x${chunk_type:02x}')
371 }
372 if chunk_len != stream_identifier_magic.len {
373 return error('snappy framing: bad identifier chunk length ${chunk_len}')
374 }
375 if new_pos + chunk_len > data.len {
376 return error('snappy framing: identifier chunk truncated')
377 }
378 magic := data[new_pos..new_pos + chunk_len]
379 if magic != stream_identifier_magic {
380 return error('snappy framing: bad stream identifier magic')
381 }
382 return new_pos + chunk_len
383}
384
385// read_chunk_header reads the 4-byte chunk header at `pos` and returns
386// (chunk_type, chunk_body_length, pos_after_header).
387@[inline]
388fn read_chunk_header(data []u8, pos int) !(u8, int, int) {
389 if pos + 4 > data.len {
390 return error('snappy framing: truncated chunk header at pos ${pos}')
391 }
392 chunk_type := data[pos]
393 chunk_len := int(u32(data[pos + 1]) | u32(data[pos + 2]) << 8 | u32(data[pos + 3]) << 16)
394 return chunk_type, chunk_len, pos + 4
395}
396
397// decode_compressed_chunk decompresses a compressed data chunk payload
398// (CRC32C[4] + snappy_data[…]) and verifies the checksum.
399fn decode_compressed_chunk(payload []u8) ![]u8 {
400 if payload.len < 4 {
401 return error('snappy framing: compressed chunk too short')
402 }
403 stored_masked_crc := u32(payload[0]) | u32(payload[1]) << 8 | u32(payload[2]) << 16 | u32(payload[3]) << 24
404 uncompressed := decompress(payload[4..])!
405 actual_crc := mask_crc(crc32c(uncompressed))
406 if actual_crc != stored_masked_crc {
407 return error('snappy framing: CRC32C mismatch in compressed chunk')
408 }
409 return uncompressed
410}
411
412// decode_uncompressed_chunk validates and returns the payload of an
413// uncompressed data chunk (CRC32C[4] + raw_data[…]).
414fn decode_uncompressed_chunk(payload []u8) ![]u8 {
415 if payload.len < 4 {
416 return error('snappy framing: uncompressed chunk too short')
417 }
418 stored_masked_crc := u32(payload[0]) | u32(payload[1]) << 8 | u32(payload[2]) << 16 | u32(payload[3]) << 24
419 raw := payload[4..].clone()
420 actual_crc := mask_crc(crc32c(raw))
421 if actual_crc != stored_masked_crc {
422 return error('snappy framing: CRC32C mismatch in uncompressed chunk')
423 }
424 return raw
425}
426
427// ---------------------------------------------------------------------------
428// Little-endian integer writers
429// ---------------------------------------------------------------------------
430
431@[inline]
432fn write_u24le(mut out []u8, v int) {
433 out << u8(v & 0xff)
434 out << u8((v >> 8) & 0xff)
435 out << u8((v >> 16) & 0xff)
436}
437
438@[inline]
439fn write_u32le(mut out []u8, v u32) {
440 out << u8(v & 0xff)
441 out << u8((v >> 8) & 0xff)
442 out << u8((v >> 16) & 0xff)
443 out << u8((v >> 24) & 0xff)
444}
445