| 1 | // Snappy framing format — streaming wrapper around raw Snappy blocks. |
| 2 | // |
| 3 | // Spec: https://github.com/google/snappy/blob/main/framing_format.txt |
| 4 | // |
| 5 | // A framed stream is a sequence of typed chunks, each with a 4-byte header: |
| 6 | // |
| 7 | // [type: 1 byte] [length: 3 bytes, little-endian] |
| 8 | // |
| 9 | // Chunk types: |
| 10 | // 0xff Stream identifier — must be the first chunk; payload = "sNaPpY" |
| 11 | // 0x00 Compressed data — 4-byte masked CRC32C + Snappy-compressed block |
| 12 | // 0x01 Uncompressed data — 4-byte masked CRC32C + raw block |
| 13 | // 0xfe Padding — payload is ignored |
| 14 | // 0x80..0xcd Reserved skippable — may be silently ignored |
| 15 | // 0x02..0x7f Reserved unskippable — must be treated as an error |
| 16 | // |
| 17 | // Public API: |
| 18 | // encode_stream(input []u8) []u8 |
| 19 | // decode_stream(input []u8) ![]u8 |
| 20 | // |
| 21 | // For incremental (push-based) I/O see StreamEncoder and StreamDecoder. |
| 22 | |
| 23 | module snappy |
| 24 | |
| 25 | import math |
| 26 | import io |
| 27 | |
| 28 | // --------------------------------------------------------------------------- |
| 29 | // Constants |
| 30 | // --------------------------------------------------------------------------- |
| 31 | |
| 32 | const stream_identifier_magic = 'sNaPpY'.bytes() |
| 33 | const chunk_type_stream_id = u8(0xff) |
| 34 | const chunk_type_compressed = u8(0x00) |
| 35 | const chunk_type_uncompressed = u8(0x01) |
| 36 | const chunk_type_padding = u8(0xfe) |
| 37 | |
| 38 | // Pre encoded stream header |
| 39 | const stream_header = [ |
| 40 | chunk_type_stream_id, |
| 41 | u8(stream_identifier_magic.len & 0xff), |
| 42 | u8((stream_identifier_magic.len >> 8) & 0xff), |
| 43 | u8((stream_identifier_magic.len >> 16) & 0xff), |
| 44 | stream_identifier_magic[0], |
| 45 | stream_identifier_magic[1], |
| 46 | stream_identifier_magic[2], |
| 47 | stream_identifier_magic[3], |
| 48 | stream_identifier_magic[4], |
| 49 | stream_identifier_magic[5], |
| 50 | ] |
| 51 | |
| 52 | // max_chunk_data_size is the largest block of uncompressed data per chunk. |
| 53 | const max_chunk_data_size = 65536 |
| 54 | |
| 55 | // --------------------------------------------------------------------------- |
| 56 | // One-shot API |
| 57 | // --------------------------------------------------------------------------- |
| 58 | |
| 59 | // encode_stream wraps `input` in the Snappy framing format. |
| 60 | // It returns the complete byte stream. Input is split into <=64 KiB blocks automatically. |
| 61 | pub fn encode_stream(input []u8) []u8 { |
| 62 | // Upper bound: identifier chunk + one compressed chunk per block. |
| 63 | // Each block: 4-byte header + 4-byte CRC + max_compressed_length. |
| 64 | blocks := (input.len + max_chunk_data_size - 1) / max_chunk_data_size |
| 65 | mut out := []u8{cap: 10 + blocks * (8 + max_compressed_length(max_chunk_data_size))} |
| 66 | |
| 67 | out << stream_header |
| 68 | |
| 69 | mut pos := 0 |
| 70 | for pos < input.len { |
| 71 | end := math.min(pos + max_chunk_data_size, input.len) |
| 72 | write_data_chunk(mut out, input[pos..end]) |
| 73 | pos = end |
| 74 | } |
| 75 | |
| 76 | // Always emit at least the identifier for empty input. |
| 77 | return out |
| 78 | } |
| 79 | |
| 80 | // decode_stream decodes a Snappy framing stream. |
| 81 | // It returns the concatenated uncompressed data, or an error if the stream is malformed. |
| 82 | pub fn decode_stream(input []u8) ![]u8 { |
| 83 | // The compressed stream is typically smaller than the uncompressed data, |
| 84 | // so use a larger initial capacity estimate. |
| 85 | mut out := []u8{cap: input.len * 2} |
| 86 | mut pos := 0 |
| 87 | |
| 88 | // The very first chunk must be the stream identifier. |
| 89 | pos = read_stream_identifier(input, pos)! |
| 90 | |
| 91 | for pos < input.len { |
| 92 | chunk_type, chunk_len, new_pos := read_chunk_header(input, pos)! |
| 93 | pos = new_pos |
| 94 | |
| 95 | if pos + chunk_len > input.len { |
| 96 | return error('snappy framing: chunk body truncated at pos ${pos}') |
| 97 | } |
| 98 | chunk_data := input[pos..pos + chunk_len] |
| 99 | pos += chunk_len |
| 100 | |
| 101 | if chunk_type == chunk_type_compressed { |
| 102 | uncompressed := decode_compressed_chunk(chunk_data)! |
| 103 | out << uncompressed |
| 104 | } else if chunk_type == chunk_type_uncompressed { |
| 105 | raw := decode_uncompressed_chunk(chunk_data)! |
| 106 | out << raw |
| 107 | } else if chunk_type == chunk_type_padding || int(chunk_type) >= 0x80 { |
| 108 | // 0xfe = padding, 0x80..0xfd = reserved skippable; ignore. |
| 109 | continue |
| 110 | } else { |
| 111 | // 0x02..0x7f — reserved unskippable; must error. |
| 112 | return error('snappy framing: unsupported unskippable chunk type 0x${chunk_type:02x}') |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | return out |
| 117 | } |
| 118 | |
| 119 | // --------------------------------------------------------------------------- |
| 120 | // Incremental encoder |
| 121 | // --------------------------------------------------------------------------- |
| 122 | |
| 123 | // StreamEncoder holds state for incremental framing-format encoding. |
| 124 | // Call write() one or more times, then close() to flush the final chunk. |
| 125 | // Take the encoded bytes using read(). |
| 126 | pub struct StreamEncoder { |
| 127 | mut: |
| 128 | buf []u8 // pending uncompressed bytes |
| 129 | out []u8 = stream_header // accumulated encoded output, initialized with stream identifier |
| 130 | closed bool |
| 131 | } |
| 132 | |
| 133 | // write appends `buf` to the encoder, flushing complete 64 KiB chunks. |
| 134 | pub fn (mut enc StreamEncoder) write(buf []u8) !int { |
| 135 | if enc.closed { |
| 136 | return error('snappy: write on a closed stream encoder') |
| 137 | } |
| 138 | |
| 139 | enc.buf << buf |
| 140 | for enc.buf.len >= max_chunk_data_size { |
| 141 | block := enc.buf[..max_chunk_data_size] |
| 142 | write_data_chunk(mut enc.out, block) |
| 143 | enc.buf = enc.buf[max_chunk_data_size..].clone() |
| 144 | } |
| 145 | |
| 146 | return buf.len |
| 147 | } |
| 148 | |
| 149 | pub fn (mut enc StreamEncoder) read(mut buf []u8) !int { |
| 150 | if enc.out.len == 0 { |
| 151 | if enc.closed { |
| 152 | return io.Eof{} |
| 153 | } |
| 154 | return 0 // not finished yet, just no data ready, the caller should feed more |
| 155 | } |
| 156 | |
| 157 | n := copy(mut buf, enc.out) |
| 158 | |
| 159 | if n == enc.out.len { |
| 160 | enc.out = []u8{} |
| 161 | } else { |
| 162 | enc.out = enc.out[n..].clone() |
| 163 | } |
| 164 | |
| 165 | return n |
| 166 | } |
| 167 | |
| 168 | // close flushes any remaining buffered bytes and returns the complete stream. |
| 169 | // The encoder must not be used after this call. |
| 170 | pub fn (mut enc StreamEncoder) close() { |
| 171 | if enc.closed { |
| 172 | return |
| 173 | } |
| 174 | |
| 175 | if enc.buf.len > 0 { |
| 176 | write_data_chunk(mut enc.out, enc.buf) |
| 177 | enc.buf = [] |
| 178 | } |
| 179 | enc.closed = true |
| 180 | } |
| 181 | |
| 182 | // peek returns the encoded bytes produced so far without flushing. |
| 183 | pub fn (enc StreamEncoder) peek() []u8 { |
| 184 | return enc.out |
| 185 | } |
| 186 | |
| 187 | // --------------------------------------------------------------------------- |
| 188 | // Incremental decoder |
| 189 | // --------------------------------------------------------------------------- |
| 190 | |
| 191 | // StreamDecoder holds state for incremental framing-format decoding. |
| 192 | // Call write() with successive chunks of compressed input, and call read() to read the decoded bytes. |
| 193 | // The writer must call close() when it finishes writing the bytes. |
| 194 | pub struct StreamDecoder { |
| 195 | mut: |
| 196 | buf []u8 // unparsed compressed bytes |
| 197 | offset int // read position within buf (avoids repeated cloning) |
| 198 | identified bool // have we seen the stream identifier chunk? |
| 199 | err ?IError |
| 200 | output []u8 // decoded bytes ready for the caller |
| 201 | closed bool |
| 202 | } |
| 203 | |
| 204 | // write appends `buf` to the decoder's input buffer. |
| 205 | // It processes as many complete chunks as possible and errors if the stream is malformed. |
| 206 | pub fn (mut dec StreamDecoder) write(buf []u8) !int { |
| 207 | if dec.closed { |
| 208 | return error('snappy: write to closed stream decoder') |
| 209 | } |
| 210 | if err := dec.err { |
| 211 | return err |
| 212 | } |
| 213 | dec.buf << buf |
| 214 | |
| 215 | for { |
| 216 | remaining := dec.buf.len - dec.offset |
| 217 | // Need at least 4 bytes for the chunk header. |
| 218 | if remaining < 4 { |
| 219 | break |
| 220 | } |
| 221 | chunk_type, chunk_len, hdr_end := read_chunk_header(dec.buf, dec.offset) or { |
| 222 | dec_err := error('snappy: failed while reading chunk header: ${err}') |
| 223 | dec.err = dec_err |
| 224 | return dec_err |
| 225 | } |
| 226 | if hdr_end + chunk_len > dec.buf.len { |
| 227 | break // wait for more data |
| 228 | } |
| 229 | chunk_data := dec.buf[hdr_end..hdr_end + chunk_len] |
| 230 | |
| 231 | if !dec.identified { |
| 232 | if chunk_type != chunk_type_stream_id { |
| 233 | dec_err := error('snappy framing: stream does not begin with identifier chunk') |
| 234 | dec.err = dec_err |
| 235 | return dec_err |
| 236 | } |
| 237 | if chunk_len != stream_identifier_magic.len { |
| 238 | dec_err := error('snappy framing: bad identifier chunk length ${chunk_len}') |
| 239 | dec.err = dec_err |
| 240 | return dec_err |
| 241 | } |
| 242 | if chunk_data != stream_identifier_magic { |
| 243 | dec_err := error('snappy framing: bad stream identifier payload') |
| 244 | dec.err = dec_err |
| 245 | return dec_err |
| 246 | } |
| 247 | dec.identified = true |
| 248 | } else if chunk_type == chunk_type_compressed { |
| 249 | uncompressed := decode_compressed_chunk(chunk_data) or { |
| 250 | dec_err := error('snappy: failed while decoding compressed chunk: ${err}') |
| 251 | dec.err = dec_err |
| 252 | return dec_err |
| 253 | } |
| 254 | dec.output << uncompressed |
| 255 | } else if chunk_type == chunk_type_uncompressed { |
| 256 | raw := decode_uncompressed_chunk(chunk_data) or { |
| 257 | dec_err := error('snappy: failed while decoding uncompressed chunk: ${err}') |
| 258 | dec.err = dec_err |
| 259 | return dec_err |
| 260 | } |
| 261 | dec.output << raw |
| 262 | } else if chunk_type == chunk_type_padding || int(chunk_type) >= 0x80 { |
| 263 | // skip |
| 264 | } else { |
| 265 | dec_err := |
| 266 | error('snappy framing: unsupported unskippable chunk type 0x${chunk_type:02x}') |
| 267 | dec.err = dec_err |
| 268 | return dec_err |
| 269 | } |
| 270 | |
| 271 | dec.offset = hdr_end + chunk_len |
| 272 | } |
| 273 | |
| 274 | // Compact the buffer: discard consumed bytes only when worthwhile |
| 275 | // to avoid copying on every single chunk. |
| 276 | if dec.offset > 0 { |
| 277 | if dec.offset == dec.buf.len { |
| 278 | dec.buf = []u8{} |
| 279 | } else { |
| 280 | dec.buf = dec.buf[dec.offset..].clone() |
| 281 | } |
| 282 | dec.offset = 0 |
| 283 | } |
| 284 | |
| 285 | return buf.len |
| 286 | } |
| 287 | |
| 288 | // read removes and returns decoded bytes. |
| 289 | // When the output buffer is empty it surfaces any error recorded during write()/close(). |
| 290 | // Only then does it return io.Eof{} for a cleanly terminated stream. |
| 291 | pub fn (mut dec StreamDecoder) read(mut buf []u8) !int { |
| 292 | // Always drain already-decoded output first so the caller never loses data. |
| 293 | if dec.output.len > 0 { |
| 294 | n := copy(mut buf, dec.output) |
| 295 | dec.output = dec.output[n..] |
| 296 | return n |
| 297 | } |
| 298 | |
| 299 | if dec.closed { |
| 300 | // Surface truncation / malformed-stream errors before EOF. |
| 301 | if err := dec.err { |
| 302 | return err |
| 303 | } |
| 304 | return io.Eof{} |
| 305 | } |
| 306 | |
| 307 | return 0 |
| 308 | } |
| 309 | |
| 310 | // close marks the stream finished and validates its terminal state. |
| 311 | // Returns an error if the stream was truncated or the identifier was never received. |
| 312 | // The error is also stored internally so read() will surface it after draining output. |
| 313 | pub fn (mut dec StreamDecoder) close() ! { |
| 314 | if dec.closed { |
| 315 | return |
| 316 | } |
| 317 | dec.closed = true |
| 318 | |
| 319 | remaining := dec.buf.len - dec.offset |
| 320 | |
| 321 | if remaining > 0 { |
| 322 | err := |
| 323 | error('snappy framing: stream closed with ${remaining} unprocessed bytes (truncated chunk)') |
| 324 | dec.err = err |
| 325 | return err |
| 326 | } |
| 327 | |
| 328 | if !dec.identified && dec.buf.len > 0 { |
| 329 | err := error('snappy framing: stream closed before identifier chunk was received') |
| 330 | dec.err = err |
| 331 | return err |
| 332 | } |
| 333 | } |
| 334 | |
| 335 | // --------------------------------------------------------------------------- |
| 336 | // Chunk writers |
| 337 | // --------------------------------------------------------------------------- |
| 338 | |
| 339 | // write_data_chunk compresses `block`, chooses the compact representation, |
| 340 | // and appends the chunk (type + header + masked CRC + payload) to `out`. |
| 341 | fn write_data_chunk(mut out []u8, block []u8) { |
| 342 | crc := mask_crc(crc32c(block)) |
| 343 | compressed := compress(block) |
| 344 | |
| 345 | // Use the compressed form only when it is strictly smaller. |
| 346 | if compressed.len < block.len { |
| 347 | // Compressed chunk: type=0x00, length=4+compressed.len |
| 348 | out << chunk_type_compressed |
| 349 | write_u24le(mut out, 4 + compressed.len) |
| 350 | write_u32le(mut out, crc) |
| 351 | out << compressed |
| 352 | } else { |
| 353 | // Uncompressed chunk: type=0x01, length=4+block.len |
| 354 | out << chunk_type_uncompressed |
| 355 | write_u24le(mut out, 4 + block.len) |
| 356 | write_u32le(mut out, crc) |
| 357 | out << block |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | // --------------------------------------------------------------------------- |
| 362 | // Chunk readers |
| 363 | // --------------------------------------------------------------------------- |
| 364 | |
| 365 | // read_stream_identifier reads and validates the identifier chunk starting at |
| 366 | // `pos` and returns the position immediately after it. |
| 367 | fn read_stream_identifier(data []u8, pos int) !int { |
| 368 | chunk_type, chunk_len, new_pos := read_chunk_header(data, pos)! |
| 369 | if chunk_type != chunk_type_stream_id { |
| 370 | return error('snappy framing: stream must begin with identifier chunk, got 0x${chunk_type:02x}') |
| 371 | } |
| 372 | if chunk_len != stream_identifier_magic.len { |
| 373 | return error('snappy framing: bad identifier chunk length ${chunk_len}') |
| 374 | } |
| 375 | if new_pos + chunk_len > data.len { |
| 376 | return error('snappy framing: identifier chunk truncated') |
| 377 | } |
| 378 | magic := data[new_pos..new_pos + chunk_len] |
| 379 | if magic != stream_identifier_magic { |
| 380 | return error('snappy framing: bad stream identifier magic') |
| 381 | } |
| 382 | return new_pos + chunk_len |
| 383 | } |
| 384 | |
| 385 | // read_chunk_header reads the 4-byte chunk header at `pos` and returns |
| 386 | // (chunk_type, chunk_body_length, pos_after_header). |
| 387 | @[inline] |
| 388 | fn read_chunk_header(data []u8, pos int) !(u8, int, int) { |
| 389 | if pos + 4 > data.len { |
| 390 | return error('snappy framing: truncated chunk header at pos ${pos}') |
| 391 | } |
| 392 | chunk_type := data[pos] |
| 393 | chunk_len := int(u32(data[pos + 1]) | u32(data[pos + 2]) << 8 | u32(data[pos + 3]) << 16) |
| 394 | return chunk_type, chunk_len, pos + 4 |
| 395 | } |
| 396 | |
| 397 | // decode_compressed_chunk decompresses a compressed data chunk payload |
| 398 | // (CRC32C[4] + snappy_data[…]) and verifies the checksum. |
| 399 | fn decode_compressed_chunk(payload []u8) ![]u8 { |
| 400 | if payload.len < 4 { |
| 401 | return error('snappy framing: compressed chunk too short') |
| 402 | } |
| 403 | stored_masked_crc := u32(payload[0]) | u32(payload[1]) << 8 | u32(payload[2]) << 16 | u32(payload[3]) << 24 |
| 404 | uncompressed := decompress(payload[4..])! |
| 405 | actual_crc := mask_crc(crc32c(uncompressed)) |
| 406 | if actual_crc != stored_masked_crc { |
| 407 | return error('snappy framing: CRC32C mismatch in compressed chunk') |
| 408 | } |
| 409 | return uncompressed |
| 410 | } |
| 411 | |
| 412 | // decode_uncompressed_chunk validates and returns the payload of an |
| 413 | // uncompressed data chunk (CRC32C[4] + raw_data[…]). |
| 414 | fn decode_uncompressed_chunk(payload []u8) ![]u8 { |
| 415 | if payload.len < 4 { |
| 416 | return error('snappy framing: uncompressed chunk too short') |
| 417 | } |
| 418 | stored_masked_crc := u32(payload[0]) | u32(payload[1]) << 8 | u32(payload[2]) << 16 | u32(payload[3]) << 24 |
| 419 | raw := payload[4..].clone() |
| 420 | actual_crc := mask_crc(crc32c(raw)) |
| 421 | if actual_crc != stored_masked_crc { |
| 422 | return error('snappy framing: CRC32C mismatch in uncompressed chunk') |
| 423 | } |
| 424 | return raw |
| 425 | } |
| 426 | |
| 427 | // --------------------------------------------------------------------------- |
| 428 | // Little-endian integer writers |
| 429 | // --------------------------------------------------------------------------- |
| 430 | |
| 431 | @[inline] |
| 432 | fn write_u24le(mut out []u8, v int) { |
| 433 | out << u8(v & 0xff) |
| 434 | out << u8((v >> 8) & 0xff) |
| 435 | out << u8((v >> 16) & 0xff) |
| 436 | } |
| 437 | |
| 438 | @[inline] |
| 439 | fn write_u32le(mut out []u8, v u32) { |
| 440 | out << u8(v & 0xff) |
| 441 | out << u8((v >> 8) & 0xff) |
| 442 | out << u8((v >> 16) & 0xff) |
| 443 | out << u8((v >> 24) & 0xff) |
| 444 | } |
| 445 | |