Skip to main content

vector/sources/util/http/
encoding.rs

1use std::{io::Read, sync::OnceLock};
2
3use bytes::{Buf, Bytes};
4#[cfg(any(
5    feature = "sources-utils-http-prelude",
6    feature = "sources-opentelemetry",
7    test
8))]
9use bytes::{BufMut, BytesMut};
10use flate2::read::{MultiGzDecoder, ZlibDecoder};
11#[cfg(any(
12    feature = "sources-utils-http-prelude",
13    feature = "sources-opentelemetry",
14    test
15))]
16use futures_util::StreamExt;
17use snap::raw::Decoder as SnappyDecoder;
18use warp::http::StatusCode;
19#[cfg(any(
20    feature = "sources-utils-http-prelude",
21    feature = "sources-opentelemetry"
22))]
23use warp::{Filter, filters::BoxedFilter};
24
25use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
26
27/// Default cap on the decompressed body size produced by [`decompress_body`].
28///
29/// Prevents a compressed "bomb" payload from causing unbounded memory growth.
30pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024;
31
32static MAX_DECOMPRESSED_BODY_SIZE: OnceLock<usize> = OnceLock::new();
33
34/// Override the global decompressed body size cap. Must be called before any sources start.
35pub fn set_max_decompressed_size_bytes(size: usize) {
36    MAX_DECOMPRESSED_BODY_SIZE
37        .set(size)
38        .expect("max_decompressed_size_bytes already set");
39}
40
41/// Returns the currently configured decompressed body size cap.
42pub(crate) fn max_decompressed_size_bytes() -> usize {
43    *MAX_DECOMPRESSED_BODY_SIZE
44        .get()
45        .unwrap_or(&DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)
46}
47
48/// Collects a request body into [`Bytes`] while enforcing an in-memory size cap.
49#[cfg(any(
50    feature = "sources-utils-http-prelude",
51    feature = "sources-opentelemetry"
52))]
53pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> {
54    let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX);
55
56    warp::header::optional::<u64>("content-length")
57        .and_then(move |declared: Option<u64>| async move {
58            if declared.is_some_and(|len| len > max_body_size_header) {
59                Err(warp::reject::custom(request_body_too_large_error(
60                    max_body_size,
61                )))
62            } else {
63                Ok(())
64            }
65        })
66        .untuple_one()
67        .and(warp::body::stream())
68        .and_then(move |body| async move {
69            collect_body_with_limit(body, max_body_size)
70                .await
71                .map_err(warp::reject::custom)
72        })
73        .boxed()
74}
75
76/// Decompresses the body based on the Content-Encoding header.
77///
78/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
79///
80/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks.
81pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
82    decompress_body_with_limit(header, body, Some(max_decompressed_size_bytes()))
83}
84
85/// Like [`decompress_body`], but allows the caller to control the decompressed size cap.
86///
87/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input).
88pub(crate) fn decompress_body_with_limit(
89    header: Option<&str>,
90    mut body: Bytes,
91    max_decompressed_size: Option<usize>,
92) -> Result<Bytes, ErrorMessage> {
93    if let Some(encodings) = header {
94        for encoding in encodings.rsplit(',').map(str::trim) {
95            body = match encoding {
96                "identity" => body,
97                "gzip" => decompress_reader(
98                    MultiGzDecoder::new(body.reader()),
99                    encoding,
100                    max_decompressed_size,
101                )?,
102                "deflate" => decompress_reader(
103                    ZlibDecoder::new(body.reader()),
104                    encoding,
105                    max_decompressed_size,
106                )?,
107                "snappy" => decompress_snappy(&body, max_decompressed_size)?,
108                "zstd" => {
109                    let mut decoder = zstd::stream::read::Decoder::new(body.reader())
110                        .map_err(|error| emit_decompress_error(encoding, error))?;
111                    if let Some(max) = max_decompressed_size
112                        && let Some(window_log_max) = zstd_window_log_max(max)
113                    {
114                        decoder
115                            .window_log_max(window_log_max)
116                            .map_err(|error| emit_decompress_error(encoding, error))?;
117                    }
118                    decompress_reader(decoder, encoding, max_decompressed_size)?
119                }
120                encoding => {
121                    return Err(ErrorMessage::new(
122                        StatusCode::UNSUPPORTED_MEDIA_TYPE,
123                        format!("Unsupported encoding {encoding}"),
124                    ));
125                }
126            }
127        }
128    }
129
130    ensure_body_within_limit(&body, "identity", max_decompressed_size)?;
131    Ok(body)
132}
133
134fn decompress_reader<R: Read>(
135    reader: R,
136    encoding: &str,
137    max_decompressed_size: Option<usize>,
138) -> Result<Bytes, ErrorMessage> {
139    let mut decoded = Vec::new();
140    match max_decompressed_size {
141        Some(max) => {
142            // Read one byte beyond the cap so we can detect overflow without ambiguity.
143            let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1);
144            reader
145                .take(limit)
146                .read_to_end(&mut decoded)
147                .map_err(|error| emit_decompress_error(encoding, error))?;
148            if decoded.len() > max {
149                return Err(decompressed_too_large_error(encoding, max));
150            }
151        }
152        None => {
153            let mut reader = reader;
154            reader
155                .read_to_end(&mut decoded)
156                .map_err(|error| emit_decompress_error(encoding, error))?;
157        }
158    }
159    Ok(decoded.into())
160}
161
162fn decompress_snappy(
163    body: &Bytes,
164    max_decompressed_size: Option<usize>,
165) -> Result<Bytes, ErrorMessage> {
166    // Snappy stores the decompressed length in the frame header, so reject oversized
167    // payloads before allocating the output buffer.
168    if let Some(max) = max_decompressed_size {
169        let len = snap::raw::decompress_len(body)
170            .map_err(|error| emit_decompress_error("snappy", error))?;
171        if len > max {
172            return Err(decompressed_too_large_error("snappy", max));
173        }
174    }
175    let decoded = SnappyDecoder::new()
176        .decompress_vec(body)
177        .map_err(|error| emit_decompress_error("snappy", error))?;
178    Ok(decoded.into())
179}
180
181#[cfg(any(
182    feature = "sources-utils-http-prelude",
183    feature = "sources-opentelemetry",
184    test
185))]
186async fn collect_body_with_limit<S, B>(body: S, max_body_size: usize) -> Result<Bytes, ErrorMessage>
187where
188    S: futures_util::Stream<Item = Result<B, warp::Error>>,
189    B: Buf,
190{
191    futures_util::pin_mut!(body);
192
193    let mut bytes = BytesMut::new();
194    while let Some(chunk) = body.next().await {
195        let chunk = chunk.map_err(|error| {
196            ErrorMessage::new(
197                StatusCode::BAD_REQUEST,
198                format!("Failed reading request body: {error}"),
199            )
200        })?;
201        if chunk.remaining() > max_body_size.saturating_sub(bytes.len()) {
202            return Err(request_body_too_large_error(max_body_size));
203        }
204        bytes.put(chunk);
205    }
206
207    Ok(bytes.freeze())
208}
209
210fn ensure_body_within_limit(
211    body: &Bytes,
212    encoding: &str,
213    max_decompressed_size: Option<usize>,
214) -> Result<(), ErrorMessage> {
215    if let Some(max) = max_decompressed_size
216        && body.len() > max
217    {
218        return Err(decompressed_too_large_error(encoding, max));
219    }
220    Ok(())
221}
222
223fn zstd_window_log_max(max_decompressed_size: usize) -> Option<u32> {
224    const MIN_ZSTD_WINDOW_LOG: u32 = 10;
225    const MAX_ZSTD_WINDOW_LOG: u32 = 31;
226
227    // `window_log_max` is expressed as a power-of-two log. Use the smallest zstd
228    // window capable of representing the configured byte budget.
229    max_decompressed_size.checked_sub(1).map(|max_index| {
230        (usize::BITS - max_index.leading_zeros()).clamp(MIN_ZSTD_WINDOW_LOG, MAX_ZSTD_WINDOW_LOG)
231    })
232}
233
234#[cfg(any(
235    feature = "sources-utils-http-prelude",
236    feature = "sources-opentelemetry",
237    test
238))]
239fn request_body_too_large_error(max: usize) -> ErrorMessage {
240    ErrorMessage::new(
241        StatusCode::PAYLOAD_TOO_LARGE,
242        format!("Request body exceeds limit of {max} bytes."),
243    )
244}
245
246fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage {
247    ErrorMessage::new(
248        StatusCode::PAYLOAD_TOO_LARGE,
249        format!("Decompressed {encoding} body exceeds limit of {max} bytes."),
250    )
251}
252
253pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
254    emit!(HttpDecompressError {
255        encoding,
256        error: &error
257    });
258    ErrorMessage::new(
259        StatusCode::UNPROCESSABLE_ENTITY,
260        format!("Failed decompressing payload with {encoding} decoder."),
261    )
262}
263
264#[cfg(test)]
265mod tests {
266    use std::io::Write;
267
268    use flate2::{Compression, write::GzEncoder};
269    use futures_util::stream;
270    use zstd::stream::Encoder as ZstdEncoder;
271
272    use super::*;
273
274    fn gzip_payload(plaintext: &[u8]) -> Bytes {
275        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
276        encoder.write_all(plaintext).unwrap();
277        encoder.finish().unwrap().into()
278    }
279
280    fn zstd_payload_with_window_log(plaintext: &[u8], window_log: u32) -> Bytes {
281        let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap();
282        encoder.window_log(window_log).unwrap();
283        encoder.write_all(plaintext).unwrap();
284        encoder.finish().unwrap().into()
285    }
286
287    #[test]
288    fn gzip_within_limit_succeeds() {
289        let plaintext = vec![0u8; 10_000];
290        let body = gzip_payload(&plaintext);
291
292        let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap();
293        assert_eq!(decoded.len(), plaintext.len());
294    }
295
296    #[test]
297    fn gzip_exceeding_limit_returns_413() {
298        // Compress 1 MB of zeros, then cap at 1 KB.
299        let plaintext = vec![0u8; 1_000_000];
300        let body = gzip_payload(&plaintext);
301
302        let err =
303            decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject");
304        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
305    }
306
307    #[test]
308    fn snappy_exceeding_limit_returns_413_before_allocating() {
309        // 2 MB of zeros. Snappy keeps the embedded length in the frame header.
310        let plaintext = vec![0u8; 2 * 1024 * 1024];
311        let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap();
312
313        let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024))
314            .expect_err("should reject");
315        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
316    }
317
318    #[test]
319    fn zstd_exceeding_limit_returns_413() {
320        let plaintext = vec![0u8; 10_000];
321        let compressed = zstd_payload_with_window_log(plaintext.as_slice(), 10);
322
323        let err = decompress_body_with_limit(Some("zstd"), compressed, Some(1024))
324            .expect_err("should reject");
325        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
326    }
327
328    #[test]
329    fn identity_passes_through() {
330        let body: Bytes = Bytes::from_static(b"hello world");
331        let decoded = decompress_body(Some("identity"), body.clone()).unwrap();
332        assert_eq!(decoded, body);
333    }
334
335    #[test]
336    fn identity_exceeding_limit_returns_413() {
337        let body = Bytes::from_static(b"hello world");
338
339        let err =
340            decompress_body_with_limit(Some("identity"), body, Some(5)).expect_err("should reject");
341        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
342    }
343
344    #[test]
345    fn missing_content_encoding_exceeding_limit_returns_413() {
346        let body = Bytes::from_static(b"hello world");
347
348        let err = decompress_body_with_limit(None, body, Some(5)).expect_err("should reject");
349        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
350    }
351
352    #[test]
353    fn zstd_window_log_tracks_limit() {
354        assert_eq!(zstd_window_log_max(0), None);
355        assert_eq!(zstd_window_log_max(1), Some(10));
356        assert_eq!(zstd_window_log_max(1024), Some(10));
357        assert_eq!(zstd_window_log_max(1025), Some(11));
358        assert_eq!(
359            zstd_window_log_max(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE),
360            Some(27)
361        );
362    }
363
364    #[tokio::test]
365    async fn collect_body_with_limit_succeeds_within_limit() {
366        let body = stream::iter([
367            Ok::<_, warp::Error>(Bytes::from_static(b"hello")),
368            Ok::<_, warp::Error>(Bytes::from_static(b" world")),
369        ]);
370
371        let collected = collect_body_with_limit(body, 11).await.unwrap();
372        assert_eq!(collected, Bytes::from_static(b"hello world"));
373    }
374
375    #[tokio::test]
376    async fn collect_body_with_limit_rejects_oversized_stream() {
377        let body = stream::iter([
378            Ok::<_, warp::Error>(Bytes::from_static(b"hello")),
379            Ok::<_, warp::Error>(Bytes::from_static(b" world")),
380        ]);
381
382        let err = collect_body_with_limit(body, 5)
383            .await
384            .expect_err("should reject");
385        assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
386    }
387}