1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9 codecs::{
10 BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11 NewlineDelimitedDecoderConfig,
12 decoding::{DeserializerConfig, FramingConfig},
13 },
14 config::{DataType, LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 lookup::{PathPrefix, lookup_v2::OptionalValuePath, owned_value_path, path},
17 schema::Definition,
18};
19use vrl::{
20 path::ValuePath as _,
21 value::{Kind, ObjectMap, kind::Collection},
22};
23use warp::http::HeaderMap;
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::{
29 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30 SourceOutput,
31 },
32 event::Event,
33 http::KeepaliveConfig,
34 serde::{bool_or_struct, default_decoding},
35 sources::util::{
36 Encoding, HttpSource,
37 http::{HttpMethod, add_headers, add_query_parameters},
38 },
39 tls::TlsEnableableConfig,
40};
41
42#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
44#[configurable(metadata(deprecated))]
45#[derive(Clone, Debug)]
46pub struct HttpConfig(SimpleHttpConfig);
47
48impl GenerateConfig for HttpConfig {
49 fn generate_config() -> toml::Value {
50 <SimpleHttpConfig as GenerateConfig>::generate_config()
51 }
52}
53
54#[async_trait::async_trait]
55#[typetag::serde(name = "http")]
56impl SourceConfig for HttpConfig {
57 async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
58 self.0.build(cx).await
59 }
60
61 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
62 self.0.outputs(global_log_namespace)
63 }
64
65 fn resources(&self) -> Vec<Resource> {
66 self.0.resources()
67 }
68
69 fn can_acknowledge(&self) -> bool {
70 self.0.can_acknowledge()
71 }
72}
73
74#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
76#[derive(Clone, Debug)]
77pub struct SimpleHttpConfig {
78 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82 #[configurable(metadata(docs::examples = "localhost:80"))]
83 address: SocketAddr,
84
85 #[configurable(deprecated)]
89 #[serde(default)]
90 encoding: Option<Encoding>,
91
92 #[serde(default)]
100 #[configurable(metadata(docs::examples = "User-Agent"))]
101 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
102 #[configurable(metadata(docs::examples = "X-*"))]
103 #[configurable(metadata(docs::examples = "*"))]
104 headers: Vec<String>,
105
106 #[serde(default)]
114 #[configurable(metadata(docs::examples = "application"))]
115 #[configurable(metadata(docs::examples = "source"))]
116 #[configurable(metadata(docs::examples = "param*"))]
117 #[configurable(metadata(docs::examples = "*"))]
118 query_parameters: Vec<String>,
119
120 #[configurable(derived)]
129 auth: Option<HttpServerAuthConfig>,
130
131 #[serde(default = "crate::serde::default_true")]
139 strict_path: bool,
140
141 #[serde(default = "default_path")]
143 #[configurable(metadata(docs::examples = "/event/path"))]
144 #[configurable(metadata(docs::examples = "/logs"))]
145 path: String,
146
147 #[serde(default = "default_path_key")]
149 #[configurable(metadata(docs::examples = "vector_http_path"))]
150 path_key: OptionalValuePath,
151
152 #[serde(default = "default_host_key")]
154 #[configurable(metadata(docs::examples = "hostname"))]
155 host_key: OptionalValuePath,
156
157 #[serde(default = "default_http_method")]
159 method: HttpMethod,
160
161 #[configurable(metadata(docs::examples = 202))]
163 #[configurable(metadata(docs::numeric_type = "uint"))]
164 #[serde(with = "http_serde::status_code")]
165 #[serde(default = "default_http_response_code")]
166 response_code: StatusCode,
167
168 #[configurable(derived)]
169 tls: Option<TlsEnableableConfig>,
170
171 #[configurable(derived)]
172 framing: Option<FramingConfig>,
173
174 #[configurable(derived)]
175 decoding: Option<DeserializerConfig>,
176
177 #[configurable(derived)]
178 #[serde(default, deserialize_with = "bool_or_struct")]
179 acknowledgements: SourceAcknowledgementsConfig,
180
181 #[configurable(metadata(docs::hidden))]
183 #[serde(default)]
184 log_namespace: Option<bool>,
185
186 #[configurable(derived)]
187 #[serde(default)]
188 keepalive: KeepaliveConfig,
189}
190
191impl SimpleHttpConfig {
192 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
194 let mut schema_definition = self
195 .decoding
196 .as_ref()
197 .unwrap_or(&default_decoding())
198 .schema_definition(log_namespace)
199 .with_source_metadata(
200 SimpleHttpConfig::NAME,
201 self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
202 &owned_value_path!("path"),
203 Kind::bytes(),
204 None,
205 )
206 .with_source_metadata(
208 SimpleHttpConfig::NAME,
209 None,
210 &owned_value_path!("headers"),
211 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
212 None,
213 )
214 .with_source_metadata(
216 SimpleHttpConfig::NAME,
217 None,
218 &owned_value_path!("query_parameters"),
219 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
220 None,
221 )
222 .with_source_metadata(
223 SimpleHttpConfig::NAME,
224 self.host_key.path.clone().map(LegacyKey::Overwrite),
225 &owned_value_path!("host"),
226 Kind::bytes().or_undefined(),
227 None,
228 )
229 .with_standard_vector_source_metadata();
230
231 if log_namespace == LogNamespace::Legacy {
233 let unknown_kind = if matches!(self.auth, Some(HttpServerAuthConfig::Custom { .. })) {
236 Kind::any()
237 } else {
238 Kind::bytes()
239 };
240 schema_definition = schema_definition.unknown_fields(unknown_kind);
241 }
242
243 schema_definition
244 }
245
246 fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
247 if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
248 return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
249 }
250
251 let (framing, decoding) = if let Some(encoding) = self.encoding {
252 match encoding {
253 Encoding::Text => (
254 NewlineDelimitedDecoderConfig::new().into(),
255 BytesDeserializerConfig::new().into(),
256 ),
257 Encoding::Json => (
258 BytesDecoderConfig::new().into(),
259 JsonDeserializerConfig::default().into(),
260 ),
261 Encoding::Ndjson => (
262 NewlineDelimitedDecoderConfig::new().into(),
263 JsonDeserializerConfig::default().into(),
264 ),
265 Encoding::Binary => (
266 BytesDecoderConfig::new().into(),
267 BytesDeserializerConfig::new().into(),
268 ),
269 }
270 } else {
271 let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
272 let framing = self
273 .framing
274 .clone()
275 .unwrap_or_else(|| decoding.default_stream_framing());
276 (framing, decoding)
277 };
278
279 Ok(DecodingConfig::new(
280 framing,
281 decoding,
282 self.log_namespace.unwrap_or(false).into(),
283 ))
284 }
285}
286
287impl Default for SimpleHttpConfig {
288 fn default() -> Self {
289 Self {
290 address: "0.0.0.0:8080".parse().unwrap(),
291 encoding: None,
292 headers: Vec::new(),
293 query_parameters: Vec::new(),
294 tls: None,
295 auth: None,
296 path: default_path(),
297 path_key: default_path_key(),
298 host_key: default_host_key(),
299 method: default_http_method(),
300 response_code: default_http_response_code(),
301 strict_path: true,
302 framing: None,
303 decoding: Some(default_decoding()),
304 acknowledgements: SourceAcknowledgementsConfig::default(),
305 log_namespace: None,
306 keepalive: KeepaliveConfig::default(),
307 }
308 }
309}
310
311impl_generate_config_from_default!(SimpleHttpConfig);
312
313const fn default_http_method() -> HttpMethod {
314 HttpMethod::Post
315}
316
317fn default_path() -> String {
318 "/".to_string()
319}
320
321fn default_path_key() -> OptionalValuePath {
322 OptionalValuePath::from(owned_value_path!("path"))
323}
324
325fn default_host_key() -> OptionalValuePath {
326 OptionalValuePath::none()
327}
328
329const fn default_http_response_code() -> StatusCode {
330 StatusCode::OK
331}
332
333pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
335 list.sort();
336
337 let mut dedup = false;
338 for (idx, name) in list.iter().enumerate() {
339 if idx < list.len() - 1 && list[idx] == list[idx + 1] {
340 warn!(
341 "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
342 list_name, name
343 );
344 dedup = true;
345 }
346 }
347
348 if dedup {
349 list.dedup();
350 }
351 list
352}
353
354fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
356 addr.ip().to_string()
357}
358
359#[derive(Clone)]
360pub enum HttpConfigParamKind {
361 Glob(glob::Pattern),
362 Exact(String),
363}
364
365pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
366 list.iter()
367 .map(|s| match s.contains('*') {
368 true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
369 false => Ok(HttpConfigParamKind::Exact(s.to_string())),
370 })
371 .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
372}
373
374#[async_trait::async_trait]
375#[typetag::serde(name = "http_server")]
376impl SourceConfig for SimpleHttpConfig {
377 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
378 let log_namespace = cx.log_namespace(self.log_namespace);
379 let decoder = self
380 .get_decoding_config()?
381 .build()?
382 .with_log_namespace(log_namespace);
383
384 let source = SimpleHttpSource {
385 headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
386 query_parameters: build_param_matcher(&remove_duplicates(
387 self.query_parameters.clone(),
388 "query_parameters",
389 ))?,
390 path_key: self.path_key.clone(),
391 host_key: self.host_key.clone(),
392 decoder,
393 log_namespace,
394 };
395 source.run(
396 self.address,
397 self.path.as_str(),
398 self.method,
399 self.response_code,
400 self.strict_path,
401 self.tls.as_ref(),
402 self.auth.as_ref(),
403 cx,
404 self.acknowledgements,
405 self.keepalive.clone(),
406 )
407 }
408
409 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
410 let log_namespace = global_log_namespace.merge(self.log_namespace);
413
414 let schema_definition = self.schema_definition(log_namespace);
415
416 vec![SourceOutput::new_maybe_logs(
417 self.decoding
418 .as_ref()
419 .map(|d| d.output_type())
420 .unwrap_or(DataType::Log),
421 schema_definition,
422 )]
423 }
424
425 fn resources(&self) -> Vec<Resource> {
426 vec![Resource::tcp(self.address)]
427 }
428
429 fn can_acknowledge(&self) -> bool {
430 true
431 }
432}
433
434#[derive(Clone)]
435struct SimpleHttpSource {
436 headers: Vec<HttpConfigParamKind>,
437 query_parameters: Vec<HttpConfigParamKind>,
438 path_key: OptionalValuePath,
439 host_key: OptionalValuePath,
440 decoder: Decoder,
441 log_namespace: LogNamespace,
442}
443
444impl HttpSource for SimpleHttpSource {
445 fn enrich_events(
448 &self,
449 events: &mut [Event],
450 request_path: &str,
451 headers: &HeaderMap,
452 query_parameters: &HashMap<String, String>,
453 source_ip: Option<&SocketAddr>,
454 ) {
455 let now = Utc::now();
456 for event in events.iter_mut() {
457 match event {
458 Event::Log(log) => {
459 self.log_namespace.insert_source_metadata(
461 SimpleHttpConfig::NAME,
462 log,
463 self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
464 path!("path"),
465 request_path.to_owned(),
466 );
467
468 self.log_namespace.insert_standard_vector_source_metadata(
469 log,
470 SimpleHttpConfig::NAME,
471 now,
472 );
473
474 if let Some(addr) = source_ip {
475 self.log_namespace.insert_source_metadata(
476 SimpleHttpConfig::NAME,
477 log,
478 self.host_key.path.as_ref().map(LegacyKey::Overwrite),
479 path!("host"),
480 socket_addr_to_ip_string(addr),
481 );
482 }
483 }
484 _ => {
485 continue;
486 }
487 }
488 }
489
490 add_headers(
491 events,
492 &self.headers,
493 headers,
494 self.log_namespace,
495 SimpleHttpConfig::NAME,
496 );
497
498 add_query_parameters(
499 events,
500 &self.query_parameters,
501 query_parameters,
502 self.log_namespace,
503 SimpleHttpConfig::NAME,
504 );
505 }
506
507 fn build_events(
508 &self,
509 body: Bytes,
510 _header_map: &HeaderMap,
511 _query_parameters: &HashMap<String, String>,
512 _request_path: &str,
513 ) -> Result<Vec<Event>, ErrorMessage> {
514 let mut decoder = self.decoder.clone();
515 let mut events = Vec::new();
516 let mut bytes = BytesMut::new();
517 bytes.extend_from_slice(&body);
518
519 loop {
520 match decoder.decode_eof(&mut bytes) {
521 Ok(Some((next, _))) => {
522 events.extend(next);
523 }
524 Ok(None) => break,
525 Err(error) => {
526 return Err(ErrorMessage::new(
529 StatusCode::BAD_REQUEST,
530 format!("Failed decoding body: {error}"),
531 ));
532 }
533 }
534 }
535
536 Ok(events)
537 }
538
539 fn enable_source_ip(&self) -> bool {
540 self.host_key.path.is_some()
541 }
542
543 fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
551 for event in events.iter_mut() {
552 match self.log_namespace {
553 LogNamespace::Vector => {
554 let meta = event.metadata_mut().value_mut();
557 for (key, value) in &enrichment {
558 let key_str = key.as_str();
559 let name_part = path!(SimpleHttpConfig::NAME);
560 let key_part = path!(key_str);
561 let full_path = name_part.concat(key_part);
562 if meta.get(full_path.clone()).is_none() {
563 meta.insert(full_path, value.clone());
564 }
565 }
566 }
567 LogNamespace::Legacy => {
568 if let Event::Log(log) = event {
570 for (key, value) in &enrichment {
571 log.try_insert((PathPrefix::Event, path!(key.as_str())), value.clone());
572 }
573 }
574 }
575 }
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use std::{io::Write, net::SocketAddr, str::FromStr};
583
584 use flate2::{
585 Compression,
586 write::{GzEncoder, ZlibEncoder},
587 };
588 use futures::Stream;
589 use headers::{Authorization, authorization::Credentials};
590 use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
591 use similar_asserts::assert_eq;
592 use vector_lib::{
593 codecs::{
594 BytesDecoderConfig, JsonDeserializerConfig,
595 decoding::{DeserializerConfig, FramingConfig},
596 },
597 config::LogNamespace,
598 event::LogEvent,
599 lookup::{
600 OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath,
601 owned_value_path, path,
602 },
603 schema::Definition,
604 };
605 use vrl::{
606 path::ValuePath as _,
607 value::{Kind, ObjectMap, kind::Collection},
608 };
609
610 use super::{SimpleHttpConfig, remove_duplicates};
611 use crate::{
612 SourceSender,
613 common::http::server_auth::HttpServerAuthConfig,
614 components::validation::prelude::*,
615 config::{SourceConfig, SourceContext, log_schema},
616 event::{Event, EventStatus, Value},
617 sources::http_server::HttpMethod,
618 test_util::{
619 addr::next_addr,
620 components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
621 spawn_collect_n, wait_for_tcp,
622 },
623 };
624
625 #[test]
626 fn generate_config() {
627 crate::test_util::test_generate_config::<SimpleHttpConfig>();
628 }
629
630 #[allow(clippy::too_many_arguments)]
631 async fn source<'a>(
632 headers: Vec<String>,
633 query_parameters: Vec<String>,
634 path_key: &'a str,
635 host_key: &'a str,
636 path: &'a str,
637 method: &'a str,
638 response_code: StatusCode,
639 auth: Option<HttpServerAuthConfig>,
640 strict_path: bool,
641 status: EventStatus,
642 acknowledgements: bool,
643 framing: Option<FramingConfig>,
644 decoding: Option<DeserializerConfig>,
645 ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
646 let (sender, recv) = SourceSender::new_test_finalize(status);
647 let (_guard, address) = next_addr();
648 let path = path.to_owned();
649 let host_key = OptionalValuePath::from(owned_value_path!(host_key));
650 let path_key = OptionalValuePath::from(owned_value_path!(path_key));
651 let context = SourceContext::new_test(sender, None);
652 let method = match Method::from_str(method).unwrap() {
653 Method::GET => HttpMethod::Get,
654 Method::POST => HttpMethod::Post,
655 _ => HttpMethod::Post,
656 };
657
658 tokio::spawn(async move {
659 SimpleHttpConfig {
660 address,
661 headers,
662 encoding: None,
663 query_parameters,
664 response_code,
665 tls: None,
666 auth,
667 strict_path,
668 path_key,
669 host_key,
670 path,
671 method,
672 framing,
673 decoding,
674 acknowledgements: acknowledgements.into(),
675 log_namespace: None,
676 keepalive: Default::default(),
677 }
678 .build(context)
679 .await
680 .unwrap()
681 .await
682 .unwrap();
683 });
684 wait_for_tcp(address).await;
685 (recv, address)
686 }
687
688 async fn send(address: SocketAddr, body: &str) -> u16 {
689 reqwest::Client::new()
690 .post(format!("http://{address}/"))
691 .body(body.to_owned())
692 .send()
693 .await
694 .unwrap()
695 .status()
696 .as_u16()
697 }
698
699 async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
700 reqwest::Client::new()
701 .post(format!("http://{address}/"))
702 .headers(headers)
703 .body(body.to_owned())
704 .send()
705 .await
706 .unwrap()
707 .status()
708 .as_u16()
709 }
710
711 async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
712 reqwest::Client::new()
713 .post(format!("http://{address}?{query}"))
714 .body(body.to_owned())
715 .send()
716 .await
717 .unwrap()
718 .status()
719 .as_u16()
720 }
721
722 async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
723 reqwest::Client::new()
724 .post(format!("http://{address}{path}"))
725 .body(body.to_owned())
726 .send()
727 .await
728 .unwrap()
729 .status()
730 .as_u16()
731 }
732
733 async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
734 let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
735 reqwest::Client::new()
736 .request(method, format!("http://{address}{path}"))
737 .body(body.to_owned())
738 .send()
739 .await
740 .unwrap()
741 .status()
742 .as_u16()
743 }
744
745 async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
746 reqwest::Client::new()
747 .post(format!("http://{address}/"))
748 .headers(headers)
749 .body(body)
750 .send()
751 .await
752 .unwrap()
753 .status()
754 .as_u16()
755 }
756
757 async fn spawn_ok_collect_n(
758 send: impl std::future::Future<Output = u16> + Send + 'static,
759 rx: impl Stream<Item = Event> + Unpin,
760 n: usize,
761 ) -> Vec<Event> {
762 spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
763 }
764
765 #[tokio::test]
766 async fn http_multiline_text() {
767 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
768 let body = "test body\ntest body 2";
769
770 let (rx, addr) = source(
771 vec![],
772 vec![],
773 "http_path",
774 "remote_ip",
775 "/",
776 "POST",
777 StatusCode::OK,
778 None,
779 true,
780 EventStatus::Delivered,
781 true,
782 None,
783 None,
784 )
785 .await;
786
787 spawn_ok_collect_n(send(addr, body), rx, 2).await
788 })
789 .await;
790
791 {
792 let event = events.remove(0);
793 let log = event.as_log();
794 assert_eq!(*log.get_message().unwrap(), "test body".into());
795 assert!(log.get_timestamp().is_some());
796 assert_eq!(
797 *log.get_source_type().unwrap(),
798 SimpleHttpConfig::NAME.into()
799 );
800 assert_eq!(log["http_path"], "/".into());
801 assert_event_metadata(log).await;
802 }
803 {
804 let event = events.remove(0);
805 let log = event.as_log();
806 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
807 assert_event_metadata(log).await;
808 }
809 }
810
811 #[tokio::test]
812 async fn http_multiline_text2() {
813 let body = "test body\ntest body 2\n";
815
816 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
817 let (rx, addr) = source(
818 vec![],
819 vec![],
820 "http_path",
821 "remote_ip",
822 "/",
823 "POST",
824 StatusCode::OK,
825 None,
826 true,
827 EventStatus::Delivered,
828 true,
829 None,
830 None,
831 )
832 .await;
833
834 spawn_ok_collect_n(send(addr, body), rx, 2).await
835 })
836 .await;
837
838 {
839 let event = events.remove(0);
840 let log = event.as_log();
841 assert_eq!(*log.get_message().unwrap(), "test body".into());
842 assert_event_metadata(log).await;
843 }
844 {
845 let event = events.remove(0);
846 let log = event.as_log();
847 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
848 assert_event_metadata(log).await;
849 }
850 }
851
852 #[tokio::test]
853 async fn http_bytes_codec_preserves_newlines() {
854 let body = "foo\nbar";
855
856 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
857 let (rx, addr) = source(
858 vec![],
859 vec![],
860 "http_path",
861 "remote_ip",
862 "/",
863 "POST",
864 StatusCode::OK,
865 None,
866 true,
867 EventStatus::Delivered,
868 true,
869 Some(BytesDecoderConfig::new().into()),
870 None,
871 )
872 .await;
873
874 spawn_ok_collect_n(send(addr, body), rx, 1).await
875 })
876 .await;
877
878 assert_eq!(events.len(), 1);
879
880 {
881 let event = events.remove(0);
882 let log = event.as_log();
883 assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
884 assert_event_metadata(log).await;
885 }
886 }
887
888 #[tokio::test]
889 async fn http_json_parsing() {
890 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
891 let (rx, addr) = source(
892 vec![],
893 vec![],
894 "http_path",
895 "remote_ip",
896 "/",
897 "POST",
898 StatusCode::OK,
899 None,
900 true,
901 EventStatus::Delivered,
902 true,
903 None,
904 Some(JsonDeserializerConfig::default().into()),
905 )
906 .await;
907
908 spawn_collect_n(
909 async move {
910 assert_eq!(400, send(addr, "{").await); assert_eq!(400, send(addr, r#"{"key"}"#).await); assert_eq!(200, send(addr, "{}").await); assert_eq!(200, send(addr, "[{},{},{}]").await);
915 },
916 rx,
917 2,
918 )
919 .await
920 })
921 .await;
922
923 assert!(events.remove(1).as_log().get_timestamp().is_some());
924 assert!(events.remove(0).as_log().get_timestamp().is_some());
925 }
926
927 #[tokio::test]
928 async fn http_json_values() {
929 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
930 let (rx, addr) = source(
931 vec![],
932 vec![],
933 "http_path",
934 "remote_ip",
935 "/",
936 "POST",
937 StatusCode::OK,
938 None,
939 true,
940 EventStatus::Delivered,
941 true,
942 None,
943 Some(JsonDeserializerConfig::default().into()),
944 )
945 .await;
946
947 spawn_collect_n(
948 async move {
949 assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
950 assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
951 },
952 rx,
953 2,
954 )
955 .await
956 })
957 .await;
958
959 {
960 let event = events.remove(0);
961 let log = event.as_log();
962 assert_eq!(log["key"], "value".into());
963 assert_event_metadata(log).await;
964 }
965 {
966 let event = events.remove(0);
967 let log = event.as_log();
968 assert_eq!(log["key2"], "value2".into());
969 assert_event_metadata(log).await;
970 }
971 }
972
973 #[tokio::test]
974 async fn http_json_dotted_keys() {
975 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
976 let (rx, addr) = source(
977 vec![],
978 vec![],
979 "http_path",
980 "remote_ip",
981 "/",
982 "POST",
983 StatusCode::OK,
984 None,
985 true,
986 EventStatus::Delivered,
987 true,
988 None,
989 Some(JsonDeserializerConfig::default().into()),
990 )
991 .await;
992
993 spawn_collect_n(
994 async move {
995 assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
996 assert_eq!(
997 200,
998 send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
999 );
1000 },
1001 rx,
1002 2,
1003 )
1004 .await
1005 })
1006 .await;
1007
1008 {
1009 let event = events.remove(0);
1010 let log = event.as_log();
1011 assert_eq!(
1012 log.get(event_path!("dotted.key")).unwrap(),
1013 &Value::from("value")
1014 );
1015 }
1016 {
1017 let event = events.remove(0);
1018 let log = event.as_log();
1019 let mut map = ObjectMap::new();
1020 map.insert("dotted.key2".into(), Value::from("value2"));
1021 assert_eq!(log["nested"], map.into());
1022 }
1023 }
1024
1025 #[tokio::test]
1026 async fn http_ndjson() {
1027 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1028 let (rx, addr) = source(
1029 vec![],
1030 vec![],
1031 "http_path",
1032 "remote_ip",
1033 "/",
1034 "POST",
1035 StatusCode::OK,
1036 None,
1037 true,
1038 EventStatus::Delivered,
1039 true,
1040 None,
1041 Some(JsonDeserializerConfig::default().into()),
1042 )
1043 .await;
1044
1045 spawn_collect_n(
1046 async move {
1047 assert_eq!(
1048 200,
1049 send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
1050 );
1051
1052 assert_eq!(
1053 200,
1054 send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
1055 );
1056 },
1057 rx,
1058 4,
1059 )
1060 .await
1061 })
1062 .await;
1063
1064 {
1065 let event = events.remove(0);
1066 let log = event.as_log();
1067 assert_eq!(log["key1"], "value1".into());
1068 assert_event_metadata(log).await;
1069 }
1070 {
1071 let event = events.remove(0);
1072 let log = event.as_log();
1073 assert_eq!(log["key2"], "value2".into());
1074 assert_event_metadata(log).await;
1075 }
1076 {
1077 let event = events.remove(0);
1078 let log = event.as_log();
1079 assert_eq!(log["key1"], "value1".into());
1080 assert_event_metadata(log).await;
1081 }
1082 {
1083 let event = events.remove(0);
1084 let log = event.as_log();
1085 assert_eq!(log["key2"], "value2".into());
1086 assert_event_metadata(log).await;
1087 }
1088 }
1089
1090 async fn assert_event_metadata(log: &LogEvent) {
1091 assert!(log.get_timestamp().is_some());
1092
1093 let source_type_key_value = log
1094 .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1095 .unwrap()
1096 .as_str()
1097 .unwrap();
1098 assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1099 assert_eq!(log["http_path"], "/".into());
1100 }
1101
1102 #[tokio::test]
1103 async fn http_headers() {
1104 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1105 let mut headers = HeaderMap::new();
1106 headers.insert("User-Agent", "test_client".parse().unwrap());
1107 headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1108 headers.insert("X-Test-Header", "true".parse().unwrap());
1109
1110 let (rx, addr) = source(
1111 vec![
1112 "User-Agent".to_string(),
1113 "Upgrade-Insecure-Requests".to_string(),
1114 "X-*".to_string(),
1115 "AbsentHeader".to_string(),
1116 ],
1117 vec![],
1118 "http_path",
1119 "remote_ip",
1120 "/",
1121 "POST",
1122 StatusCode::OK,
1123 None,
1124 true,
1125 EventStatus::Delivered,
1126 true,
1127 None,
1128 Some(JsonDeserializerConfig::default().into()),
1129 )
1130 .await;
1131
1132 spawn_ok_collect_n(
1133 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1134 rx,
1135 1,
1136 )
1137 .await
1138 })
1139 .await;
1140
1141 {
1142 let event = events.remove(0);
1143 let log = event.as_log();
1144 assert_eq!(log["key1"], "value1".into());
1145 assert_eq!(log["\"User-Agent\""], "test_client".into());
1146 assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1147 assert_eq!(log["\"x-test-header\""], "true".into());
1148 assert_eq!(log["AbsentHeader"], Value::Null);
1149 assert_event_metadata(log).await;
1150 }
1151 }
1152
1153 #[tokio::test]
1154 async fn http_headers_wildcard() {
1155 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1156 let mut headers = HeaderMap::new();
1157 headers.insert("User-Agent", "test_client".parse().unwrap());
1158 headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1159 headers.insert("key1", "value_from_header".parse().unwrap());
1161
1162 let (rx, addr) = source(
1163 vec!["*".to_string()],
1164 vec![],
1165 "http_path",
1166 "remote_ip",
1167 "/",
1168 "POST",
1169 StatusCode::OK,
1170 None,
1171 true,
1172 EventStatus::Delivered,
1173 true,
1174 None,
1175 Some(JsonDeserializerConfig::default().into()),
1176 )
1177 .await;
1178
1179 spawn_ok_collect_n(
1180 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1181 rx,
1182 1,
1183 )
1184 .await
1185 })
1186 .await;
1187
1188 {
1189 let event = events.remove(0);
1190 let log = event.as_log();
1191 assert_eq!(log["key1"], "value1".into());
1192 assert_eq!(log["\"user-agent\""], "test_client".into());
1193 assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1194 assert_event_metadata(log).await;
1195 }
1196 }
1197
1198 #[tokio::test]
1199 async fn http_query() {
1200 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1201 let (rx, addr) = source(
1202 vec![],
1203 vec![
1204 "source".to_string(),
1205 "region".to_string(),
1206 "absent".to_string(),
1207 ],
1208 "http_path",
1209 "remote_ip",
1210 "/",
1211 "POST",
1212 StatusCode::OK,
1213 None,
1214 true,
1215 EventStatus::Delivered,
1216 true,
1217 None,
1218 Some(JsonDeserializerConfig::default().into()),
1219 )
1220 .await;
1221
1222 spawn_ok_collect_n(
1223 send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"),
1224 rx,
1225 1,
1226 )
1227 .await
1228 })
1229 .await;
1230
1231 {
1232 let event = events.remove(0);
1233 let log = event.as_log();
1234 assert_eq!(log["key1"], "value1".into());
1235 assert_eq!(log["source"], "staging".into());
1236 assert_eq!(log["region"], "gb".into());
1237 assert_eq!(log["absent"], Value::Null);
1238 assert_event_metadata(log).await;
1239 }
1240 }
1241
1242 #[tokio::test]
1243 async fn http_query_wildcard() {
1244 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1245 let (rx, addr) = source(
1246 vec![],
1247 vec!["*".to_string()],
1248 "http_path",
1249 "remote_ip",
1250 "/",
1251 "POST",
1252 StatusCode::OK,
1253 None,
1254 true,
1255 EventStatus::Delivered,
1256 true,
1257 None,
1258 Some(JsonDeserializerConfig::default().into()),
1259 )
1260 .await;
1261
1262 spawn_ok_collect_n(
1263 send_with_query(
1264 addr,
1265 "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1266 "source=staging®ion=gb&key1=value_from_query",
1267 ),
1268 rx,
1269 1,
1270 )
1271 .await
1272 })
1273 .await;
1274
1275 {
1276 let event = events.remove(0);
1277 let log = event.as_log();
1278 assert_eq!(log["key1"], "value_from_query".into());
1279 assert_eq!(log["key2"], "value2".into());
1280 assert_eq!(log["source"], "staging".into());
1281 assert_eq!(log["region"], "gb".into());
1282 assert_event_metadata(log).await;
1283 }
1284 }
1285
1286 #[tokio::test]
1287 async fn http_gzip_deflate() {
1288 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1289 let body = "test body";
1290
1291 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1292 encoder.write_all(body.as_bytes()).unwrap();
1293 let body = encoder.finish().unwrap();
1294
1295 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1296 encoder.write_all(body.as_slice()).unwrap();
1297 let body = encoder.finish().unwrap();
1298
1299 let mut headers = HeaderMap::new();
1300 headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1301
1302 let (rx, addr) = source(
1303 vec![],
1304 vec![],
1305 "http_path",
1306 "remote_ip",
1307 "/",
1308 "POST",
1309 StatusCode::OK,
1310 None,
1311 true,
1312 EventStatus::Delivered,
1313 true,
1314 None,
1315 None,
1316 )
1317 .await;
1318
1319 spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1320 })
1321 .await;
1322
1323 {
1324 let event = events.remove(0);
1325 let log = event.as_log();
1326 assert_eq!(*log.get_message().unwrap(), "test body".into());
1327 assert_event_metadata(log).await;
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn http_rejects_gzip_bomb_with_413() {
1333 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1336 let chunk = [0u8; 8 * 1024];
1337 for _ in 0..(200 * 1024 * 1024 / chunk.len()) {
1338 encoder.write_all(&chunk).unwrap();
1339 }
1340 let body = encoder.finish().unwrap();
1341
1342 let mut headers = HeaderMap::new();
1343 headers.insert("Content-Encoding", "gzip".parse().unwrap());
1344
1345 components::init_test();
1346 let (_rx, addr) = source(
1347 vec![],
1348 vec![],
1349 "http_path",
1350 "remote_ip",
1351 "/",
1352 "POST",
1353 StatusCode::OK,
1354 None,
1355 true,
1356 EventStatus::Delivered,
1357 true,
1358 None,
1359 None,
1360 )
1361 .await;
1362
1363 assert_eq!(413, send_bytes(addr, body, headers).await);
1364 }
1365
1366 #[tokio::test]
1367 async fn http_path() {
1368 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1369 let (rx, addr) = source(
1370 vec![],
1371 vec![],
1372 "vector_http_path",
1373 "vector_remote_ip",
1374 "/event/path",
1375 "POST",
1376 StatusCode::OK,
1377 None,
1378 true,
1379 EventStatus::Delivered,
1380 true,
1381 None,
1382 Some(JsonDeserializerConfig::default().into()),
1383 )
1384 .await;
1385
1386 spawn_ok_collect_n(
1387 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1388 rx,
1389 1,
1390 )
1391 .await
1392 })
1393 .await;
1394
1395 {
1396 let event = events.remove(0);
1397 let log = event.as_log();
1398 assert_eq!(log["key1"], "value1".into());
1399 assert_eq!(log["vector_http_path"], "/event/path".into());
1400 assert!(log.get_timestamp().is_some());
1401 assert_eq!(
1402 *log.get_source_type().unwrap(),
1403 SimpleHttpConfig::NAME.into()
1404 );
1405 }
1406 }
1407
1408 #[tokio::test]
1409 async fn http_path_no_restriction() {
1410 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1411 let (rx, addr) = source(
1412 vec![],
1413 vec![],
1414 "vector_http_path",
1415 "vector_remote_ip",
1416 "/event",
1417 "POST",
1418 StatusCode::OK,
1419 None,
1420 false,
1421 EventStatus::Delivered,
1422 true,
1423 None,
1424 Some(JsonDeserializerConfig::default().into()),
1425 )
1426 .await;
1427
1428 spawn_collect_n(
1429 async move {
1430 assert_eq!(
1431 200,
1432 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1433 );
1434 assert_eq!(
1435 200,
1436 send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1437 );
1438 },
1439 rx,
1440 2,
1441 )
1442 .await
1443 })
1444 .await;
1445
1446 {
1447 let event = events.remove(0);
1448 let log = event.as_log();
1449 assert_eq!(log["key1"], "value1".into());
1450 assert_eq!(log["vector_http_path"], "/event/path1".into());
1451 assert!(log.get_timestamp().is_some());
1452 assert_eq!(
1453 *log.get_source_type().unwrap(),
1454 SimpleHttpConfig::NAME.into()
1455 );
1456 }
1457 {
1458 let event = events.remove(0);
1459 let log = event.as_log();
1460 assert_eq!(log["key2"], "value2".into());
1461 assert_eq!(log["vector_http_path"], "/event/path2".into());
1462 assert!(log.get_timestamp().is_some());
1463 assert_eq!(
1464 *log.get_source_type().unwrap(),
1465 SimpleHttpConfig::NAME.into()
1466 );
1467 }
1468 }
1469
1470 #[tokio::test]
1471 async fn http_wrong_path() {
1472 components::init_test();
1473 let (_rx, addr) = source(
1474 vec![],
1475 vec![],
1476 "vector_http_path",
1477 "vector_remote_ip",
1478 "/",
1479 "POST",
1480 StatusCode::OK,
1481 None,
1482 true,
1483 EventStatus::Delivered,
1484 true,
1485 None,
1486 Some(JsonDeserializerConfig::default().into()),
1487 )
1488 .await;
1489
1490 assert_eq!(
1491 404,
1492 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1493 );
1494 }
1495
1496 #[tokio::test]
1497 async fn http_status_code() {
1498 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1499 let (rx, addr) = source(
1500 vec![],
1501 vec![],
1502 "http_path",
1503 "remote_ip",
1504 "/",
1505 "POST",
1506 StatusCode::ACCEPTED,
1507 None,
1508 true,
1509 EventStatus::Delivered,
1510 true,
1511 None,
1512 None,
1513 )
1514 .await;
1515
1516 spawn_collect_n(
1517 async move {
1518 assert_eq!(
1519 StatusCode::ACCEPTED,
1520 send(addr, "{\"key1\":\"value1\"}").await
1521 );
1522 },
1523 rx,
1524 1,
1525 )
1526 .await;
1527 })
1528 .await;
1529 }
1530
1531 #[tokio::test]
1532 async fn http_delivery_failure() {
1533 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1534 let (rx, addr) = source(
1535 vec![],
1536 vec![],
1537 "http_path",
1538 "remote_ip",
1539 "/",
1540 "POST",
1541 StatusCode::OK,
1542 None,
1543 true,
1544 EventStatus::Rejected,
1545 true,
1546 None,
1547 None,
1548 )
1549 .await;
1550
1551 spawn_collect_n(
1552 async move {
1553 assert_eq!(400, send(addr, "test body\n").await);
1554 },
1555 rx,
1556 1,
1557 )
1558 .await;
1559 })
1560 .await;
1561 }
1562
1563 #[tokio::test]
1564 async fn ignores_disabled_acknowledgements() {
1565 let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1566 let (rx, addr) = source(
1567 vec![],
1568 vec![],
1569 "http_path",
1570 "remote_ip",
1571 "/",
1572 "POST",
1573 StatusCode::OK,
1574 None,
1575 true,
1576 EventStatus::Rejected,
1577 false,
1578 None,
1579 None,
1580 )
1581 .await;
1582
1583 spawn_collect_n(
1584 async move {
1585 assert_eq!(200, send(addr, "test body\n").await);
1586 },
1587 rx,
1588 1,
1589 )
1590 .await
1591 })
1592 .await;
1593
1594 assert_eq!(events.len(), 1);
1595 }
1596
1597 #[tokio::test]
1598 async fn http_get_method() {
1599 components::init_test();
1600 let (_rx, addr) = source(
1601 vec![],
1602 vec![],
1603 "http_path",
1604 "remote_ip",
1605 "/",
1606 "GET",
1607 StatusCode::OK,
1608 None,
1609 true,
1610 EventStatus::Delivered,
1611 true,
1612 None,
1613 None,
1614 )
1615 .await;
1616
1617 assert_eq!(200, send_request(addr, "GET", "", "/").await);
1618 }
1619
1620 #[tokio::test]
1621 async fn returns_401_when_required_auth_is_missing() {
1622 components::init_test();
1623 let (_rx, addr) = source(
1624 vec![],
1625 vec![],
1626 "http_path",
1627 "remote_ip",
1628 "/",
1629 "GET",
1630 StatusCode::OK,
1631 Some(HttpServerAuthConfig::Basic {
1632 username: "test".to_string(),
1633 password: "test".to_string().into(),
1634 }),
1635 true,
1636 EventStatus::Delivered,
1637 true,
1638 None,
1639 None,
1640 )
1641 .await;
1642
1643 assert_eq!(401, send_request(addr, "GET", "", "/").await);
1644 }
1645
1646 #[tokio::test]
1647 async fn returns_401_when_required_auth_is_wrong() {
1648 components::init_test();
1649 let (_rx, addr) = source(
1650 vec![],
1651 vec![],
1652 "http_path",
1653 "remote_ip",
1654 "/",
1655 "POST",
1656 StatusCode::OK,
1657 Some(HttpServerAuthConfig::Basic {
1658 username: "test".to_string(),
1659 password: "test".to_string().into(),
1660 }),
1661 true,
1662 EventStatus::Delivered,
1663 true,
1664 None,
1665 None,
1666 )
1667 .await;
1668
1669 let mut headers = HeaderMap::new();
1670 headers.insert(
1671 AUTHORIZATION,
1672 Authorization::basic("wrong", "test").0.encode(),
1673 );
1674 assert_eq!(401, send_with_headers(addr, "", headers).await);
1675 }
1676
1677 #[tokio::test]
1678 async fn http_get_with_correct_auth() {
1679 components::init_test();
1680 let (_rx, addr) = source(
1681 vec![],
1682 vec![],
1683 "http_path",
1684 "remote_ip",
1685 "/",
1686 "POST",
1687 StatusCode::OK,
1688 Some(HttpServerAuthConfig::Basic {
1689 username: "test".to_string(),
1690 password: "test".to_string().into(),
1691 }),
1692 true,
1693 EventStatus::Delivered,
1694 true,
1695 None,
1696 None,
1697 )
1698 .await;
1699
1700 let mut headers = HeaderMap::new();
1701 headers.insert(
1702 AUTHORIZATION,
1703 Authorization::basic("test", "test").0.encode(),
1704 );
1705 assert_eq!(200, send_with_headers(addr, "", headers).await);
1706 }
1707
1708 #[test]
1709 fn output_schema_definition_vector_namespace() {
1710 let config = SimpleHttpConfig {
1711 log_namespace: Some(true),
1712 ..Default::default()
1713 };
1714
1715 let definitions = config
1716 .outputs(LogNamespace::Vector)
1717 .remove(0)
1718 .schema_definition(true);
1719
1720 let expected_definition =
1721 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1722 .with_meaning(OwnedTargetPath::event_root(), "message")
1723 .with_metadata_field(
1724 &owned_value_path!("vector", "source_type"),
1725 Kind::bytes(),
1726 None,
1727 )
1728 .with_metadata_field(
1729 &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1730 Kind::bytes(),
1731 None,
1732 )
1733 .with_metadata_field(
1734 &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1735 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1736 None,
1737 )
1738 .with_metadata_field(
1739 &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1740 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1741 None,
1742 )
1743 .with_metadata_field(
1744 &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1745 Kind::bytes().or_undefined(),
1746 None,
1747 )
1748 .with_metadata_field(
1749 &owned_value_path!("vector", "ingest_timestamp"),
1750 Kind::timestamp(),
1751 None,
1752 );
1753
1754 assert_eq!(definitions, Some(expected_definition))
1755 }
1756
1757 #[test]
1758 fn output_schema_definition_legacy_namespace() {
1759 let config = SimpleHttpConfig::default();
1760
1761 let definitions = config
1762 .outputs(LogNamespace::Legacy)
1763 .remove(0)
1764 .schema_definition(true);
1765
1766 let expected_definition = Definition::new_with_default_metadata(
1767 Kind::object(Collection::empty()),
1768 [LogNamespace::Legacy],
1769 )
1770 .with_event_field(
1771 &owned_value_path!("message"),
1772 Kind::bytes(),
1773 Some("message"),
1774 )
1775 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1776 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1777 .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1778 .with_event_field(
1779 &owned_value_path!("host"),
1780 Kind::bytes().or_undefined(),
1781 None,
1782 )
1783 .unknown_fields(Kind::bytes());
1784
1785 assert_eq!(definitions, Some(expected_definition))
1786 }
1787
1788 #[test]
1789 fn validate_remove_duplicates() {
1790 let mut list = vec![
1791 "a".to_owned(),
1792 "b".to_owned(),
1793 "c".to_owned(),
1794 "d".to_owned(),
1795 ];
1796
1797 {
1799 let list_dedup = remove_duplicates(list.clone(), "foo");
1800
1801 assert_eq!(list, list_dedup);
1802 }
1803
1804 list.push("b".to_owned());
1805
1806 {
1808 let list_dedup = remove_duplicates(list.clone(), "foo");
1809 assert_eq!(
1810 vec![
1811 "a".to_owned(),
1812 "b".to_owned(),
1813 "c".to_owned(),
1814 "d".to_owned()
1815 ],
1816 list_dedup
1817 );
1818 }
1819 }
1820
1821 #[test]
1822 fn inject_auth_enrichment_does_not_clobber_vector_namespace_builtin_fields() {
1823 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1824 use vector_lib::codecs::BytesDeserializerConfig;
1825 use vrl::value::KeyString;
1826
1827 let decoder = DecodingConfig::new(
1828 BytesDecoderConfig::new().into(),
1829 BytesDeserializerConfig::new().into(),
1830 LogNamespace::Vector,
1831 )
1832 .build()
1833 .unwrap()
1834 .with_log_namespace(LogNamespace::Vector);
1835
1836 let source = super::SimpleHttpSource {
1837 headers: vec![],
1838 query_parameters: vec![],
1839 path_key: OptionalValuePath::none(),
1840 host_key: OptionalValuePath::none(),
1841 decoder,
1842 log_namespace: LogNamespace::Vector,
1843 };
1844
1845 let mut log = LogEvent::default();
1846 log.insert(
1848 (
1849 PathPrefix::Metadata,
1850 path!(SimpleHttpConfig::NAME).concat(path!("path")),
1851 ),
1852 "/real/path",
1853 );
1854
1855 let mut events = vec![Event::Log(log)];
1856 let mut enrichment = ObjectMap::new();
1857 enrichment.insert(KeyString::from("path"), Value::from("/clobbered"));
1859 enrichment.insert(KeyString::from("tenant_id"), Value::from("t-123"));
1860
1861 source.inject_auth_enrichment(&mut events, enrichment);
1862
1863 let Event::Log(log) = &events[0] else {
1864 panic!("expected log event");
1865 };
1866 assert_eq!(
1867 log.get((
1868 PathPrefix::Metadata,
1869 path!(SimpleHttpConfig::NAME).concat(path!("path")),
1870 )),
1871 Some(&Value::from("/real/path")),
1872 "auth enrichment must not overwrite built-in source metadata"
1873 );
1874 assert_eq!(
1875 log.get((
1876 PathPrefix::Metadata,
1877 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1878 )),
1879 Some(&Value::from("t-123")),
1880 "new auth enrichment field must be injected"
1881 );
1882 }
1883
1884 #[test]
1885 fn inject_auth_enrichment_does_not_overwrite_existing_metadata_in_vector_namespace() {
1886 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1887 use vector_lib::codecs::BytesDeserializerConfig;
1888 use vrl::value::KeyString;
1889
1890 let decoder = DecodingConfig::new(
1891 BytesDecoderConfig::new().into(),
1892 BytesDeserializerConfig::new().into(),
1893 LogNamespace::Vector,
1894 )
1895 .build()
1896 .unwrap()
1897 .with_log_namespace(LogNamespace::Vector);
1898
1899 let source = super::SimpleHttpSource {
1900 headers: vec![],
1901 query_parameters: vec![],
1902 path_key: OptionalValuePath::none(),
1903 host_key: OptionalValuePath::none(),
1904 decoder,
1905 log_namespace: LogNamespace::Vector,
1906 };
1907
1908 let mut log = LogEvent::default();
1909 log.insert(
1911 (
1912 PathPrefix::Metadata,
1913 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1914 ),
1915 "existing",
1916 );
1917
1918 let mut events = vec![Event::Log(log)];
1919 let mut enrichment = ObjectMap::new();
1920 enrichment.insert(KeyString::from("tenant_id"), Value::from("auth-value"));
1921
1922 source.inject_auth_enrichment(&mut events, enrichment);
1923
1924 let Event::Log(log) = &events[0] else {
1925 panic!("expected log event");
1926 };
1927 assert_eq!(
1928 log.get((
1929 PathPrefix::Metadata,
1930 path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1931 )),
1932 Some(&Value::from("existing")),
1933 "auth enrichment must not overwrite already-present metadata keys"
1934 );
1935 }
1936
1937 #[test]
1938 fn inject_auth_enrichment_applies_to_non_log_events_in_vector_namespace() {
1939 use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1940 use vector_lib::{
1941 codecs::BytesDeserializerConfig,
1942 event::{Metric, MetricKind, MetricValue},
1943 };
1944 use vrl::value::KeyString;
1945
1946 let decoder = DecodingConfig::new(
1947 BytesDecoderConfig::new().into(),
1948 BytesDeserializerConfig::new().into(),
1949 LogNamespace::Vector,
1950 )
1951 .build()
1952 .unwrap()
1953 .with_log_namespace(LogNamespace::Vector);
1954
1955 let source = super::SimpleHttpSource {
1956 headers: vec![],
1957 query_parameters: vec![],
1958 path_key: OptionalValuePath::none(),
1959 host_key: OptionalValuePath::none(),
1960 decoder,
1961 log_namespace: LogNamespace::Vector,
1962 };
1963
1964 let metric = Metric::new(
1965 "requests",
1966 MetricKind::Incremental,
1967 MetricValue::Counter { value: 1.0 },
1968 );
1969 let mut events = vec![Event::Metric(metric)];
1970
1971 let mut enrichment = ObjectMap::new();
1972 enrichment.insert(KeyString::from("tenant_id"), Value::from("t-456"));
1973
1974 source.inject_auth_enrichment(&mut events, enrichment);
1975
1976 let Event::Metric(metric) = &events[0] else {
1977 panic!("expected metric event");
1978 };
1979 assert_eq!(
1980 metric
1981 .metadata()
1982 .value()
1983 .get(path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),),
1984 Some(&Value::from("t-456")),
1985 "auth enrichment must be written to non-log event metadata"
1986 );
1987 }
1988
1989 impl ValidatableComponent for SimpleHttpConfig {
1990 fn validation_configuration() -> ValidationConfiguration {
1991 let config = Self {
1992 decoding: Some(DeserializerConfig::Json(Default::default())),
1993 ..Default::default()
1994 };
1995
1996 let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1997
1998 let listen_addr_http = format!("http://{}/", config.address);
1999 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2000
2001 let external_resource = ExternalResource::new(
2002 ResourceDirection::Push,
2003 HttpResourceConfig::from_parts(uri, Some(config.method.into())),
2004 config
2005 .get_decoding_config()
2006 .expect("should not fail to get decoding config"),
2007 );
2008
2009 ValidationConfiguration::from_source(
2010 Self::NAME,
2011 log_namespace,
2012 vec![ComponentTestCaseConfig::from_source(
2013 config,
2014 None,
2015 Some(external_resource),
2016 )],
2017 )
2018 }
2019 }
2020
2021 register_validatable_component!(SimpleHttpConfig);
2022}