vector/sources/util/http/
encoding.rs1use std::{io::Read, sync::OnceLock};
2
3use bytes::{Buf, Bytes};
4#[cfg(any(
5 feature = "sources-utils-http-prelude",
6 feature = "sources-opentelemetry",
7 test
8))]
9use bytes::{BufMut, BytesMut};
10use flate2::read::{MultiGzDecoder, ZlibDecoder};
11#[cfg(any(
12 feature = "sources-utils-http-prelude",
13 feature = "sources-opentelemetry",
14 test
15))]
16use futures_util::StreamExt;
17use snap::raw::Decoder as SnappyDecoder;
18use warp::http::StatusCode;
19#[cfg(any(
20 feature = "sources-utils-http-prelude",
21 feature = "sources-opentelemetry"
22))]
23use warp::{Filter, filters::BoxedFilter};
24
25use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
26
27pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024;
31
32static MAX_DECOMPRESSED_BODY_SIZE: OnceLock<usize> = OnceLock::new();
33
34pub fn set_max_decompressed_size_bytes(size: usize) {
36 MAX_DECOMPRESSED_BODY_SIZE
37 .set(size)
38 .expect("max_decompressed_size_bytes already set");
39}
40
41pub(crate) fn max_decompressed_size_bytes() -> usize {
43 *MAX_DECOMPRESSED_BODY_SIZE
44 .get()
45 .unwrap_or(&DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)
46}
47
48#[cfg(any(
50 feature = "sources-utils-http-prelude",
51 feature = "sources-opentelemetry"
52))]
53pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> {
54 let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX);
55
56 warp::header::optional::<u64>("content-length")
57 .and_then(move |declared: Option<u64>| async move {
58 if declared.is_some_and(|len| len > max_body_size_header) {
59 Err(warp::reject::custom(request_body_too_large_error(
60 max_body_size,
61 )))
62 } else {
63 Ok(())
64 }
65 })
66 .untuple_one()
67 .and(warp::body::stream())
68 .and_then(move |body| async move {
69 collect_body_with_limit(body, max_body_size)
70 .await
71 .map_err(warp::reject::custom)
72 })
73 .boxed()
74}
75
76pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
82 decompress_body_with_limit(header, body, Some(max_decompressed_size_bytes()))
83}
84
85pub(crate) fn decompress_body_with_limit(
89 header: Option<&str>,
90 mut body: Bytes,
91 max_decompressed_size: Option<usize>,
92) -> Result<Bytes, ErrorMessage> {
93 if let Some(encodings) = header {
94 for encoding in encodings.rsplit(',').map(str::trim) {
95 body = match encoding {
96 "identity" => body,
97 "gzip" => decompress_reader(
98 MultiGzDecoder::new(body.reader()),
99 encoding,
100 max_decompressed_size,
101 )?,
102 "deflate" => decompress_reader(
103 ZlibDecoder::new(body.reader()),
104 encoding,
105 max_decompressed_size,
106 )?,
107 "snappy" => decompress_snappy(&body, max_decompressed_size)?,
108 "zstd" => {
109 let mut decoder = zstd::stream::read::Decoder::new(body.reader())
110 .map_err(|error| emit_decompress_error(encoding, error))?;
111 if let Some(max) = max_decompressed_size
112 && let Some(window_log_max) = zstd_window_log_max(max)
113 {
114 decoder
115 .window_log_max(window_log_max)
116 .map_err(|error| emit_decompress_error(encoding, error))?;
117 }
118 decompress_reader(decoder, encoding, max_decompressed_size)?
119 }
120 encoding => {
121 return Err(ErrorMessage::new(
122 StatusCode::UNSUPPORTED_MEDIA_TYPE,
123 format!("Unsupported encoding {encoding}"),
124 ));
125 }
126 }
127 }
128 }
129
130 ensure_body_within_limit(&body, "identity", max_decompressed_size)?;
131 Ok(body)
132}
133
134fn decompress_reader<R: Read>(
135 reader: R,
136 encoding: &str,
137 max_decompressed_size: Option<usize>,
138) -> Result<Bytes, ErrorMessage> {
139 let mut decoded = Vec::new();
140 match max_decompressed_size {
141 Some(max) => {
142 let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1);
144 reader
145 .take(limit)
146 .read_to_end(&mut decoded)
147 .map_err(|error| emit_decompress_error(encoding, error))?;
148 if decoded.len() > max {
149 return Err(decompressed_too_large_error(encoding, max));
150 }
151 }
152 None => {
153 let mut reader = reader;
154 reader
155 .read_to_end(&mut decoded)
156 .map_err(|error| emit_decompress_error(encoding, error))?;
157 }
158 }
159 Ok(decoded.into())
160}
161
162fn decompress_snappy(
163 body: &Bytes,
164 max_decompressed_size: Option<usize>,
165) -> Result<Bytes, ErrorMessage> {
166 if let Some(max) = max_decompressed_size {
169 let len = snap::raw::decompress_len(body)
170 .map_err(|error| emit_decompress_error("snappy", error))?;
171 if len > max {
172 return Err(decompressed_too_large_error("snappy", max));
173 }
174 }
175 let decoded = SnappyDecoder::new()
176 .decompress_vec(body)
177 .map_err(|error| emit_decompress_error("snappy", error))?;
178 Ok(decoded.into())
179}
180
181#[cfg(any(
182 feature = "sources-utils-http-prelude",
183 feature = "sources-opentelemetry",
184 test
185))]
186async fn collect_body_with_limit<S, B>(body: S, max_body_size: usize) -> Result<Bytes, ErrorMessage>
187where
188 S: futures_util::Stream<Item = Result<B, warp::Error>>,
189 B: Buf,
190{
191 futures_util::pin_mut!(body);
192
193 let mut bytes = BytesMut::new();
194 while let Some(chunk) = body.next().await {
195 let chunk = chunk.map_err(|error| {
196 ErrorMessage::new(
197 StatusCode::BAD_REQUEST,
198 format!("Failed reading request body: {error}"),
199 )
200 })?;
201 if chunk.remaining() > max_body_size.saturating_sub(bytes.len()) {
202 return Err(request_body_too_large_error(max_body_size));
203 }
204 bytes.put(chunk);
205 }
206
207 Ok(bytes.freeze())
208}
209
210fn ensure_body_within_limit(
211 body: &Bytes,
212 encoding: &str,
213 max_decompressed_size: Option<usize>,
214) -> Result<(), ErrorMessage> {
215 if let Some(max) = max_decompressed_size
216 && body.len() > max
217 {
218 return Err(decompressed_too_large_error(encoding, max));
219 }
220 Ok(())
221}
222
223fn zstd_window_log_max(max_decompressed_size: usize) -> Option<u32> {
224 const MIN_ZSTD_WINDOW_LOG: u32 = 10;
225 const MAX_ZSTD_WINDOW_LOG: u32 = 31;
226
227 max_decompressed_size.checked_sub(1).map(|max_index| {
230 (usize::BITS - max_index.leading_zeros()).clamp(MIN_ZSTD_WINDOW_LOG, MAX_ZSTD_WINDOW_LOG)
231 })
232}
233
234#[cfg(any(
235 feature = "sources-utils-http-prelude",
236 feature = "sources-opentelemetry",
237 test
238))]
239fn request_body_too_large_error(max: usize) -> ErrorMessage {
240 ErrorMessage::new(
241 StatusCode::PAYLOAD_TOO_LARGE,
242 format!("Request body exceeds limit of {max} bytes."),
243 )
244}
245
246fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage {
247 ErrorMessage::new(
248 StatusCode::PAYLOAD_TOO_LARGE,
249 format!("Decompressed {encoding} body exceeds limit of {max} bytes."),
250 )
251}
252
253pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
254 emit!(HttpDecompressError {
255 encoding,
256 error: &error
257 });
258 ErrorMessage::new(
259 StatusCode::UNPROCESSABLE_ENTITY,
260 format!("Failed decompressing payload with {encoding} decoder."),
261 )
262}
263
264#[cfg(test)]
265mod tests {
266 use std::io::Write;
267
268 use flate2::{Compression, write::GzEncoder};
269 use futures_util::stream;
270 use zstd::stream::Encoder as ZstdEncoder;
271
272 use super::*;
273
274 fn gzip_payload(plaintext: &[u8]) -> Bytes {
275 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
276 encoder.write_all(plaintext).unwrap();
277 encoder.finish().unwrap().into()
278 }
279
280 fn zstd_payload_with_window_log(plaintext: &[u8], window_log: u32) -> Bytes {
281 let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap();
282 encoder.window_log(window_log).unwrap();
283 encoder.write_all(plaintext).unwrap();
284 encoder.finish().unwrap().into()
285 }
286
287 #[test]
288 fn gzip_within_limit_succeeds() {
289 let plaintext = vec![0u8; 10_000];
290 let body = gzip_payload(&plaintext);
291
292 let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap();
293 assert_eq!(decoded.len(), plaintext.len());
294 }
295
296 #[test]
297 fn gzip_exceeding_limit_returns_413() {
298 let plaintext = vec![0u8; 1_000_000];
300 let body = gzip_payload(&plaintext);
301
302 let err =
303 decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject");
304 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
305 }
306
307 #[test]
308 fn snappy_exceeding_limit_returns_413_before_allocating() {
309 let plaintext = vec![0u8; 2 * 1024 * 1024];
311 let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap();
312
313 let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024))
314 .expect_err("should reject");
315 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
316 }
317
318 #[test]
319 fn zstd_exceeding_limit_returns_413() {
320 let plaintext = vec![0u8; 10_000];
321 let compressed = zstd_payload_with_window_log(plaintext.as_slice(), 10);
322
323 let err = decompress_body_with_limit(Some("zstd"), compressed, Some(1024))
324 .expect_err("should reject");
325 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
326 }
327
328 #[test]
329 fn identity_passes_through() {
330 let body: Bytes = Bytes::from_static(b"hello world");
331 let decoded = decompress_body(Some("identity"), body.clone()).unwrap();
332 assert_eq!(decoded, body);
333 }
334
335 #[test]
336 fn identity_exceeding_limit_returns_413() {
337 let body = Bytes::from_static(b"hello world");
338
339 let err =
340 decompress_body_with_limit(Some("identity"), body, Some(5)).expect_err("should reject");
341 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
342 }
343
344 #[test]
345 fn missing_content_encoding_exceeding_limit_returns_413() {
346 let body = Bytes::from_static(b"hello world");
347
348 let err = decompress_body_with_limit(None, body, Some(5)).expect_err("should reject");
349 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
350 }
351
352 #[test]
353 fn zstd_window_log_tracks_limit() {
354 assert_eq!(zstd_window_log_max(0), None);
355 assert_eq!(zstd_window_log_max(1), Some(10));
356 assert_eq!(zstd_window_log_max(1024), Some(10));
357 assert_eq!(zstd_window_log_max(1025), Some(11));
358 assert_eq!(
359 zstd_window_log_max(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE),
360 Some(27)
361 );
362 }
363
364 #[tokio::test]
365 async fn collect_body_with_limit_succeeds_within_limit() {
366 let body = stream::iter([
367 Ok::<_, warp::Error>(Bytes::from_static(b"hello")),
368 Ok::<_, warp::Error>(Bytes::from_static(b" world")),
369 ]);
370
371 let collected = collect_body_with_limit(body, 11).await.unwrap();
372 assert_eq!(collected, Bytes::from_static(b"hello world"));
373 }
374
375 #[tokio::test]
376 async fn collect_body_with_limit_rejects_oversized_stream() {
377 let body = stream::iter([
378 Ok::<_, warp::Error>(Bytes::from_static(b"hello")),
379 Ok::<_, warp::Error>(Bytes::from_static(b" world")),
380 ]);
381
382 let err = collect_body_with_limit(body, 5)
383 .await
384 .expect_err("should reject");
385 assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
386 }
387}