Skip to main content

vector/sources/
http_server.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9    codecs::{
10        BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11        NewlineDelimitedDecoderConfig,
12        decoding::{DeserializerConfig, FramingConfig},
13    },
14    config::{DataType, LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    lookup::{PathPrefix, lookup_v2::OptionalValuePath, owned_value_path, path},
17    schema::Definition,
18};
19use vrl::{
20    path::ValuePath as _,
21    value::{Kind, ObjectMap, kind::Collection},
22};
23use warp::http::HeaderMap;
24
25use crate::{
26    codecs::{Decoder, DecodingConfig},
27    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28    config::{
29        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30        SourceOutput,
31    },
32    event::Event,
33    http::KeepaliveConfig,
34    serde::{bool_or_struct, default_decoding},
35    sources::util::{
36        Encoding, HttpSource,
37        http::{HttpMethod, add_headers, add_query_parameters},
38    },
39    tls::TlsEnableableConfig,
40};
41
42/// Configuration for the `http` source.
43#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
44#[configurable(metadata(deprecated))]
45#[derive(Clone, Debug)]
46pub struct HttpConfig(SimpleHttpConfig);
47
48impl GenerateConfig for HttpConfig {
49    fn generate_config() -> toml::Value {
50        <SimpleHttpConfig as GenerateConfig>::generate_config()
51    }
52}
53
54#[async_trait::async_trait]
55#[typetag::serde(name = "http")]
56impl SourceConfig for HttpConfig {
57    async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
58        self.0.build(cx).await
59    }
60
61    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
62        self.0.outputs(global_log_namespace)
63    }
64
65    fn resources(&self) -> Vec<Resource> {
66        self.0.resources()
67    }
68
69    fn can_acknowledge(&self) -> bool {
70        self.0.can_acknowledge()
71    }
72}
73
74/// Configuration for the `http_server` source.
75#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
76#[derive(Clone, Debug)]
77pub struct SimpleHttpConfig {
78    /// The socket address to listen for connections on.
79    ///
80    /// It _must_ include a port.
81    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82    #[configurable(metadata(docs::examples = "localhost:80"))]
83    address: SocketAddr,
84
85    /// The expected encoding of received data.
86    ///
87    /// For `json` and `ndjson` encodings, the fields of the JSON objects are output as separate fields.
88    #[configurable(deprecated)]
89    #[serde(default)]
90    encoding: Option<Encoding>,
91
92    /// A list of HTTP headers to include in the log event.
93    ///
94    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
95    ///
96    /// Specifying "*" results in all headers included in the log event.
97    ///
98    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
99    #[serde(default)]
100    #[configurable(metadata(docs::examples = "User-Agent"))]
101    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
102    #[configurable(metadata(docs::examples = "X-*"))]
103    #[configurable(metadata(docs::examples = "*"))]
104    headers: Vec<String>,
105
106    /// A list of URL query parameters to include in the log event.
107    ///
108    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
109    ///
110    /// Specifying "*" results in all query parameters included in the log event.
111    ///
112    /// These override any values included in the body with conflicting names.
113    #[serde(default)]
114    #[configurable(metadata(docs::examples = "application"))]
115    #[configurable(metadata(docs::examples = "source"))]
116    #[configurable(metadata(docs::examples = "param*"))]
117    #[configurable(metadata(docs::examples = "*"))]
118    query_parameters: Vec<String>,
119
120    /// HTTP authentication configuration.
121    ///
122    /// Use HTTP authentication with HTTPS only. The authentication credentials are passed as an
123    /// HTTP header without any additional encryption beyond what is provided by the transport itself.
124    ///
125    /// When using the `custom` strategy, the VRL program may write `%field = value` to enrich
126    /// authenticated events. These metadata fields are injected into the event body (legacy
127    /// namespace) or under `http_server.<field>` in event metadata (Vector namespace).
128    #[configurable(derived)]
129    auth: Option<HttpServerAuthConfig>,
130
131    /// Whether or not to treat the configured `path` as an absolute path.
132    ///
133    /// If set to `true`, only requests using the exact URL path specified in `path` are accepted. Otherwise,
134    /// requests sent to a URL path that starts with the value of `path` are accepted.
135    ///
136    /// With `strict_path` set to `false` and `path` set to `""`, the configured HTTP source accepts requests from
137    /// any URL path.
138    #[serde(default = "crate::serde::default_true")]
139    strict_path: bool,
140
141    /// The URL path on which log event POST requests are sent.
142    #[serde(default = "default_path")]
143    #[configurable(metadata(docs::examples = "/event/path"))]
144    #[configurable(metadata(docs::examples = "/logs"))]
145    path: String,
146
147    /// The event key in which the requested URL path used to send the request is stored.
148    #[serde(default = "default_path_key")]
149    #[configurable(metadata(docs::examples = "vector_http_path"))]
150    path_key: OptionalValuePath,
151
152    /// If set, the name of the log field used to add the remote IP to each event
153    #[serde(default = "default_host_key")]
154    #[configurable(metadata(docs::examples = "hostname"))]
155    host_key: OptionalValuePath,
156
157    /// Specifies the action of the HTTP request.
158    #[serde(default = "default_http_method")]
159    method: HttpMethod,
160
161    /// Specifies the HTTP response status code that will be returned on successful requests.
162    #[configurable(metadata(docs::examples = 202))]
163    #[configurable(metadata(docs::numeric_type = "uint"))]
164    #[serde(with = "http_serde::status_code")]
165    #[serde(default = "default_http_response_code")]
166    response_code: StatusCode,
167
168    #[configurable(derived)]
169    tls: Option<TlsEnableableConfig>,
170
171    #[configurable(derived)]
172    framing: Option<FramingConfig>,
173
174    #[configurable(derived)]
175    decoding: Option<DeserializerConfig>,
176
177    #[configurable(derived)]
178    #[serde(default, deserialize_with = "bool_or_struct")]
179    acknowledgements: SourceAcknowledgementsConfig,
180
181    /// The namespace to use for logs. This overrides the global setting.
182    #[configurable(metadata(docs::hidden))]
183    #[serde(default)]
184    log_namespace: Option<bool>,
185
186    #[configurable(derived)]
187    #[serde(default)]
188    keepalive: KeepaliveConfig,
189}
190
191impl SimpleHttpConfig {
192    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
193    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
194        let mut schema_definition = self
195            .decoding
196            .as_ref()
197            .unwrap_or(&default_decoding())
198            .schema_definition(log_namespace)
199            .with_source_metadata(
200                SimpleHttpConfig::NAME,
201                self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
202                &owned_value_path!("path"),
203                Kind::bytes(),
204                None,
205            )
206            // for metadata that is added to the events dynamically from the self.headers
207            .with_source_metadata(
208                SimpleHttpConfig::NAME,
209                None,
210                &owned_value_path!("headers"),
211                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
212                None,
213            )
214            // for metadata that is added to the events dynamically from the self.query_parameters
215            .with_source_metadata(
216                SimpleHttpConfig::NAME,
217                None,
218                &owned_value_path!("query_parameters"),
219                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
220                None,
221            )
222            .with_source_metadata(
223                SimpleHttpConfig::NAME,
224                self.host_key.path.clone().map(LegacyKey::Overwrite),
225                &owned_value_path!("host"),
226                Kind::bytes().or_undefined(),
227                None,
228            )
229            .with_standard_vector_source_metadata();
230
231        // For metadata that is added to the events dynamically from config options.
232        if log_namespace == LogNamespace::Legacy {
233            // Custom auth programs can inject any VRL value, not just bytes; widen the unknown
234            // field kind accordingly so schema-aware downstream components don't reject events.
235            let unknown_kind = if matches!(self.auth, Some(HttpServerAuthConfig::Custom { .. })) {
236                Kind::any()
237            } else {
238                Kind::bytes()
239            };
240            schema_definition = schema_definition.unknown_fields(unknown_kind);
241        }
242
243        schema_definition
244    }
245
246    fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
247        if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
248            return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
249        }
250
251        let (framing, decoding) = if let Some(encoding) = self.encoding {
252            match encoding {
253                Encoding::Text => (
254                    NewlineDelimitedDecoderConfig::new().into(),
255                    BytesDeserializerConfig::new().into(),
256                ),
257                Encoding::Json => (
258                    BytesDecoderConfig::new().into(),
259                    JsonDeserializerConfig::default().into(),
260                ),
261                Encoding::Ndjson => (
262                    NewlineDelimitedDecoderConfig::new().into(),
263                    JsonDeserializerConfig::default().into(),
264                ),
265                Encoding::Binary => (
266                    BytesDecoderConfig::new().into(),
267                    BytesDeserializerConfig::new().into(),
268                ),
269            }
270        } else {
271            let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
272            let framing = self
273                .framing
274                .clone()
275                .unwrap_or_else(|| decoding.default_stream_framing());
276            (framing, decoding)
277        };
278
279        Ok(DecodingConfig::new(
280            framing,
281            decoding,
282            self.log_namespace.unwrap_or(false).into(),
283        ))
284    }
285}
286
287impl Default for SimpleHttpConfig {
288    fn default() -> Self {
289        Self {
290            address: "0.0.0.0:8080".parse().unwrap(),
291            encoding: None,
292            headers: Vec::new(),
293            query_parameters: Vec::new(),
294            tls: None,
295            auth: None,
296            path: default_path(),
297            path_key: default_path_key(),
298            host_key: default_host_key(),
299            method: default_http_method(),
300            response_code: default_http_response_code(),
301            strict_path: true,
302            framing: None,
303            decoding: Some(default_decoding()),
304            acknowledgements: SourceAcknowledgementsConfig::default(),
305            log_namespace: None,
306            keepalive: KeepaliveConfig::default(),
307        }
308    }
309}
310
311impl_generate_config_from_default!(SimpleHttpConfig);
312
313const fn default_http_method() -> HttpMethod {
314    HttpMethod::Post
315}
316
317fn default_path() -> String {
318    "/".to_string()
319}
320
321fn default_path_key() -> OptionalValuePath {
322    OptionalValuePath::from(owned_value_path!("path"))
323}
324
325fn default_host_key() -> OptionalValuePath {
326    OptionalValuePath::none()
327}
328
329const fn default_http_response_code() -> StatusCode {
330    StatusCode::OK
331}
332
333/// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed.
334pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
335    list.sort();
336
337    let mut dedup = false;
338    for (idx, name) in list.iter().enumerate() {
339        if idx < list.len() - 1 && list[idx] == list[idx + 1] {
340            warn!(
341                "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
342                list_name, name
343            );
344            dedup = true;
345        }
346    }
347
348    if dedup {
349        list.dedup();
350    }
351    list
352}
353
354/// Convert [`SocketAddr`] into a string, returning only the IP address.
355fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
356    addr.ip().to_string()
357}
358
359#[derive(Clone)]
360pub enum HttpConfigParamKind {
361    Glob(glob::Pattern),
362    Exact(String),
363}
364
365pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
366    list.iter()
367        .map(|s| match s.contains('*') {
368            true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
369            false => Ok(HttpConfigParamKind::Exact(s.to_string())),
370        })
371        .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
372}
373
374#[async_trait::async_trait]
375#[typetag::serde(name = "http_server")]
376impl SourceConfig for SimpleHttpConfig {
377    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
378        let log_namespace = cx.log_namespace(self.log_namespace);
379        let decoder = self
380            .get_decoding_config()?
381            .build()?
382            .with_log_namespace(log_namespace);
383
384        let source = SimpleHttpSource {
385            headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
386            query_parameters: build_param_matcher(&remove_duplicates(
387                self.query_parameters.clone(),
388                "query_parameters",
389            ))?,
390            path_key: self.path_key.clone(),
391            host_key: self.host_key.clone(),
392            decoder,
393            log_namespace,
394        };
395        source.run(
396            self.address,
397            self.path.as_str(),
398            self.method,
399            self.response_code,
400            self.strict_path,
401            self.tls.as_ref(),
402            self.auth.as_ref(),
403            cx,
404            self.acknowledgements,
405            self.keepalive.clone(),
406        )
407    }
408
409    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
410        // There is a global and per-source `log_namespace` config.
411        // The source config overrides the global setting and is merged here.
412        let log_namespace = global_log_namespace.merge(self.log_namespace);
413
414        let schema_definition = self.schema_definition(log_namespace);
415
416        vec![SourceOutput::new_maybe_logs(
417            self.decoding
418                .as_ref()
419                .map(|d| d.output_type())
420                .unwrap_or(DataType::Log),
421            schema_definition,
422        )]
423    }
424
425    fn resources(&self) -> Vec<Resource> {
426        vec![Resource::tcp(self.address)]
427    }
428
429    fn can_acknowledge(&self) -> bool {
430        true
431    }
432}
433
434#[derive(Clone)]
435struct SimpleHttpSource {
436    headers: Vec<HttpConfigParamKind>,
437    query_parameters: Vec<HttpConfigParamKind>,
438    path_key: OptionalValuePath,
439    host_key: OptionalValuePath,
440    decoder: Decoder,
441    log_namespace: LogNamespace,
442}
443
444impl HttpSource for SimpleHttpSource {
445    /// Enriches the log events with metadata for the `request_path` and for each of the headers.
446    /// Non-log events are skipped.
447    fn enrich_events(
448        &self,
449        events: &mut [Event],
450        request_path: &str,
451        headers: &HeaderMap,
452        query_parameters: &HashMap<String, String>,
453        source_ip: Option<&SocketAddr>,
454    ) {
455        let now = Utc::now();
456        for event in events.iter_mut() {
457            match event {
458                Event::Log(log) => {
459                    // add request_path to each event
460                    self.log_namespace.insert_source_metadata(
461                        SimpleHttpConfig::NAME,
462                        log,
463                        self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
464                        path!("path"),
465                        request_path.to_owned(),
466                    );
467
468                    self.log_namespace.insert_standard_vector_source_metadata(
469                        log,
470                        SimpleHttpConfig::NAME,
471                        now,
472                    );
473
474                    if let Some(addr) = source_ip {
475                        self.log_namespace.insert_source_metadata(
476                            SimpleHttpConfig::NAME,
477                            log,
478                            self.host_key.path.as_ref().map(LegacyKey::Overwrite),
479                            path!("host"),
480                            socket_addr_to_ip_string(addr),
481                        );
482                    }
483                }
484                _ => {
485                    continue;
486                }
487            }
488        }
489
490        add_headers(
491            events,
492            &self.headers,
493            headers,
494            self.log_namespace,
495            SimpleHttpConfig::NAME,
496        );
497
498        add_query_parameters(
499            events,
500            &self.query_parameters,
501            query_parameters,
502            self.log_namespace,
503            SimpleHttpConfig::NAME,
504        );
505    }
506
507    fn build_events(
508        &self,
509        body: Bytes,
510        _header_map: &HeaderMap,
511        _query_parameters: &HashMap<String, String>,
512        _request_path: &str,
513    ) -> Result<Vec<Event>, ErrorMessage> {
514        let mut decoder = self.decoder.clone();
515        let mut events = Vec::new();
516        let mut bytes = BytesMut::new();
517        bytes.extend_from_slice(&body);
518
519        loop {
520            match decoder.decode_eof(&mut bytes) {
521                Ok(Some((next, _))) => {
522                    events.extend(next);
523                }
524                Ok(None) => break,
525                Err(error) => {
526                    // Error is logged / emitted by `vector_lib::codecs::Decoder`, no further
527                    // handling is needed here
528                    return Err(ErrorMessage::new(
529                        StatusCode::BAD_REQUEST,
530                        format!("Failed decoding body: {error}"),
531                    ));
532                }
533            }
534        }
535
536        Ok(events)
537    }
538
539    fn enable_source_ip(&self) -> bool {
540        self.host_key.path.is_some()
541    }
542
543    /// Injects `%field` enrichment from a `custom` auth VRL program into events.
544    /// Both namespaces use insert-if-empty semantics so auth enrichment never
545    /// overwrites built-in source metadata (`path`, `host`, `headers`, …) that
546    /// `enrich_events` already populated.
547    /// Vector namespace: inserted into event metadata under `http_server.<field>` for
548    ///   all event types (Log, Metric, Trace).
549    /// Legacy namespace: inserted into the Log event body only (Metric/Trace are skipped).
550    fn inject_auth_enrichment(&self, events: &mut [Event], enrichment: ObjectMap) {
551        for event in events.iter_mut() {
552            match self.log_namespace {
553                LogNamespace::Vector => {
554                    // metadata_mut() dispatches to Log, Metric, and Trace so every
555                    // decoded event type receives the auth enrichment fields.
556                    let meta = event.metadata_mut().value_mut();
557                    for (key, value) in &enrichment {
558                        let key_str = key.as_str();
559                        let name_part = path!(SimpleHttpConfig::NAME);
560                        let key_part = path!(key_str);
561                        let full_path = name_part.concat(key_part);
562                        if meta.get(full_path.clone()).is_none() {
563                            meta.insert(full_path, value.clone());
564                        }
565                    }
566                }
567                LogNamespace::Legacy => {
568                    // Legacy enrichment targets the event body; only Log events have one.
569                    if let Event::Log(log) = event {
570                        for (key, value) in &enrichment {
571                            log.try_insert((PathPrefix::Event, path!(key.as_str())), value.clone());
572                        }
573                    }
574                }
575            }
576        }
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use std::{io::Write, net::SocketAddr, str::FromStr};
583
584    use flate2::{
585        Compression,
586        write::{GzEncoder, ZlibEncoder},
587    };
588    use futures::Stream;
589    use headers::{Authorization, authorization::Credentials};
590    use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
591    use similar_asserts::assert_eq;
592    use vector_lib::{
593        codecs::{
594            BytesDecoderConfig, JsonDeserializerConfig,
595            decoding::{DeserializerConfig, FramingConfig},
596        },
597        config::LogNamespace,
598        event::LogEvent,
599        lookup::{
600            OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath,
601            owned_value_path, path,
602        },
603        schema::Definition,
604    };
605    use vrl::{
606        path::ValuePath as _,
607        value::{Kind, ObjectMap, kind::Collection},
608    };
609
610    use super::{SimpleHttpConfig, remove_duplicates};
611    use crate::{
612        SourceSender,
613        common::http::server_auth::HttpServerAuthConfig,
614        components::validation::prelude::*,
615        config::{SourceConfig, SourceContext, log_schema},
616        event::{Event, EventStatus, Value},
617        sources::http_server::HttpMethod,
618        test_util::{
619            addr::next_addr,
620            components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
621            spawn_collect_n, wait_for_tcp,
622        },
623    };
624
625    #[test]
626    fn generate_config() {
627        crate::test_util::test_generate_config::<SimpleHttpConfig>();
628    }
629
630    #[allow(clippy::too_many_arguments)]
631    async fn source<'a>(
632        headers: Vec<String>,
633        query_parameters: Vec<String>,
634        path_key: &'a str,
635        host_key: &'a str,
636        path: &'a str,
637        method: &'a str,
638        response_code: StatusCode,
639        auth: Option<HttpServerAuthConfig>,
640        strict_path: bool,
641        status: EventStatus,
642        acknowledgements: bool,
643        framing: Option<FramingConfig>,
644        decoding: Option<DeserializerConfig>,
645    ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
646        let (sender, recv) = SourceSender::new_test_finalize(status);
647        let (_guard, address) = next_addr();
648        let path = path.to_owned();
649        let host_key = OptionalValuePath::from(owned_value_path!(host_key));
650        let path_key = OptionalValuePath::from(owned_value_path!(path_key));
651        let context = SourceContext::new_test(sender, None);
652        let method = match Method::from_str(method).unwrap() {
653            Method::GET => HttpMethod::Get,
654            Method::POST => HttpMethod::Post,
655            _ => HttpMethod::Post,
656        };
657
658        tokio::spawn(async move {
659            SimpleHttpConfig {
660                address,
661                headers,
662                encoding: None,
663                query_parameters,
664                response_code,
665                tls: None,
666                auth,
667                strict_path,
668                path_key,
669                host_key,
670                path,
671                method,
672                framing,
673                decoding,
674                acknowledgements: acknowledgements.into(),
675                log_namespace: None,
676                keepalive: Default::default(),
677            }
678            .build(context)
679            .await
680            .unwrap()
681            .await
682            .unwrap();
683        });
684        wait_for_tcp(address).await;
685        (recv, address)
686    }
687
688    async fn send(address: SocketAddr, body: &str) -> u16 {
689        reqwest::Client::new()
690            .post(format!("http://{address}/"))
691            .body(body.to_owned())
692            .send()
693            .await
694            .unwrap()
695            .status()
696            .as_u16()
697    }
698
699    async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
700        reqwest::Client::new()
701            .post(format!("http://{address}/"))
702            .headers(headers)
703            .body(body.to_owned())
704            .send()
705            .await
706            .unwrap()
707            .status()
708            .as_u16()
709    }
710
711    async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
712        reqwest::Client::new()
713            .post(format!("http://{address}?{query}"))
714            .body(body.to_owned())
715            .send()
716            .await
717            .unwrap()
718            .status()
719            .as_u16()
720    }
721
722    async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
723        reqwest::Client::new()
724            .post(format!("http://{address}{path}"))
725            .body(body.to_owned())
726            .send()
727            .await
728            .unwrap()
729            .status()
730            .as_u16()
731    }
732
733    async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
734        let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
735        reqwest::Client::new()
736            .request(method, format!("http://{address}{path}"))
737            .body(body.to_owned())
738            .send()
739            .await
740            .unwrap()
741            .status()
742            .as_u16()
743    }
744
745    async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
746        reqwest::Client::new()
747            .post(format!("http://{address}/"))
748            .headers(headers)
749            .body(body)
750            .send()
751            .await
752            .unwrap()
753            .status()
754            .as_u16()
755    }
756
757    async fn spawn_ok_collect_n(
758        send: impl std::future::Future<Output = u16> + Send + 'static,
759        rx: impl Stream<Item = Event> + Unpin,
760        n: usize,
761    ) -> Vec<Event> {
762        spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
763    }
764
765    #[tokio::test]
766    async fn http_multiline_text() {
767        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
768            let body = "test body\ntest body 2";
769
770            let (rx, addr) = source(
771                vec![],
772                vec![],
773                "http_path",
774                "remote_ip",
775                "/",
776                "POST",
777                StatusCode::OK,
778                None,
779                true,
780                EventStatus::Delivered,
781                true,
782                None,
783                None,
784            )
785            .await;
786
787            spawn_ok_collect_n(send(addr, body), rx, 2).await
788        })
789        .await;
790
791        {
792            let event = events.remove(0);
793            let log = event.as_log();
794            assert_eq!(*log.get_message().unwrap(), "test body".into());
795            assert!(log.get_timestamp().is_some());
796            assert_eq!(
797                *log.get_source_type().unwrap(),
798                SimpleHttpConfig::NAME.into()
799            );
800            assert_eq!(log["http_path"], "/".into());
801            assert_event_metadata(log).await;
802        }
803        {
804            let event = events.remove(0);
805            let log = event.as_log();
806            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
807            assert_event_metadata(log).await;
808        }
809    }
810
811    #[tokio::test]
812    async fn http_multiline_text2() {
813        //same as above test but with a newline at the end
814        let body = "test body\ntest body 2\n";
815
816        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
817            let (rx, addr) = source(
818                vec![],
819                vec![],
820                "http_path",
821                "remote_ip",
822                "/",
823                "POST",
824                StatusCode::OK,
825                None,
826                true,
827                EventStatus::Delivered,
828                true,
829                None,
830                None,
831            )
832            .await;
833
834            spawn_ok_collect_n(send(addr, body), rx, 2).await
835        })
836        .await;
837
838        {
839            let event = events.remove(0);
840            let log = event.as_log();
841            assert_eq!(*log.get_message().unwrap(), "test body".into());
842            assert_event_metadata(log).await;
843        }
844        {
845            let event = events.remove(0);
846            let log = event.as_log();
847            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
848            assert_event_metadata(log).await;
849        }
850    }
851
852    #[tokio::test]
853    async fn http_bytes_codec_preserves_newlines() {
854        let body = "foo\nbar";
855
856        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
857            let (rx, addr) = source(
858                vec![],
859                vec![],
860                "http_path",
861                "remote_ip",
862                "/",
863                "POST",
864                StatusCode::OK,
865                None,
866                true,
867                EventStatus::Delivered,
868                true,
869                Some(BytesDecoderConfig::new().into()),
870                None,
871            )
872            .await;
873
874            spawn_ok_collect_n(send(addr, body), rx, 1).await
875        })
876        .await;
877
878        assert_eq!(events.len(), 1);
879
880        {
881            let event = events.remove(0);
882            let log = event.as_log();
883            assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
884            assert_event_metadata(log).await;
885        }
886    }
887
888    #[tokio::test]
889    async fn http_json_parsing() {
890        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
891            let (rx, addr) = source(
892                vec![],
893                vec![],
894                "http_path",
895                "remote_ip",
896                "/",
897                "POST",
898                StatusCode::OK,
899                None,
900                true,
901                EventStatus::Delivered,
902                true,
903                None,
904                Some(JsonDeserializerConfig::default().into()),
905            )
906            .await;
907
908            spawn_collect_n(
909                async move {
910                    assert_eq!(400, send(addr, "{").await); //malformed
911                    assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value
912
913                    assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects
914                    assert_eq!(200, send(addr, "[{},{},{}]").await);
915                },
916                rx,
917                2,
918            )
919            .await
920        })
921        .await;
922
923        assert!(events.remove(1).as_log().get_timestamp().is_some());
924        assert!(events.remove(0).as_log().get_timestamp().is_some());
925    }
926
927    #[tokio::test]
928    async fn http_json_values() {
929        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
930            let (rx, addr) = source(
931                vec![],
932                vec![],
933                "http_path",
934                "remote_ip",
935                "/",
936                "POST",
937                StatusCode::OK,
938                None,
939                true,
940                EventStatus::Delivered,
941                true,
942                None,
943                Some(JsonDeserializerConfig::default().into()),
944            )
945            .await;
946
947            spawn_collect_n(
948                async move {
949                    assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
950                    assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
951                },
952                rx,
953                2,
954            )
955            .await
956        })
957        .await;
958
959        {
960            let event = events.remove(0);
961            let log = event.as_log();
962            assert_eq!(log["key"], "value".into());
963            assert_event_metadata(log).await;
964        }
965        {
966            let event = events.remove(0);
967            let log = event.as_log();
968            assert_eq!(log["key2"], "value2".into());
969            assert_event_metadata(log).await;
970        }
971    }
972
973    #[tokio::test]
974    async fn http_json_dotted_keys() {
975        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
976            let (rx, addr) = source(
977                vec![],
978                vec![],
979                "http_path",
980                "remote_ip",
981                "/",
982                "POST",
983                StatusCode::OK,
984                None,
985                true,
986                EventStatus::Delivered,
987                true,
988                None,
989                Some(JsonDeserializerConfig::default().into()),
990            )
991            .await;
992
993            spawn_collect_n(
994                async move {
995                    assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
996                    assert_eq!(
997                        200,
998                        send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
999                    );
1000                },
1001                rx,
1002                2,
1003            )
1004            .await
1005        })
1006        .await;
1007
1008        {
1009            let event = events.remove(0);
1010            let log = event.as_log();
1011            assert_eq!(
1012                log.get(event_path!("dotted.key")).unwrap(),
1013                &Value::from("value")
1014            );
1015        }
1016        {
1017            let event = events.remove(0);
1018            let log = event.as_log();
1019            let mut map = ObjectMap::new();
1020            map.insert("dotted.key2".into(), Value::from("value2"));
1021            assert_eq!(log["nested"], map.into());
1022        }
1023    }
1024
1025    #[tokio::test]
1026    async fn http_ndjson() {
1027        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1028            let (rx, addr) = source(
1029                vec![],
1030                vec![],
1031                "http_path",
1032                "remote_ip",
1033                "/",
1034                "POST",
1035                StatusCode::OK,
1036                None,
1037                true,
1038                EventStatus::Delivered,
1039                true,
1040                None,
1041                Some(JsonDeserializerConfig::default().into()),
1042            )
1043            .await;
1044
1045            spawn_collect_n(
1046                async move {
1047                    assert_eq!(
1048                        200,
1049                        send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
1050                    );
1051
1052                    assert_eq!(
1053                        200,
1054                        send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
1055                    );
1056                },
1057                rx,
1058                4,
1059            )
1060            .await
1061        })
1062        .await;
1063
1064        {
1065            let event = events.remove(0);
1066            let log = event.as_log();
1067            assert_eq!(log["key1"], "value1".into());
1068            assert_event_metadata(log).await;
1069        }
1070        {
1071            let event = events.remove(0);
1072            let log = event.as_log();
1073            assert_eq!(log["key2"], "value2".into());
1074            assert_event_metadata(log).await;
1075        }
1076        {
1077            let event = events.remove(0);
1078            let log = event.as_log();
1079            assert_eq!(log["key1"], "value1".into());
1080            assert_event_metadata(log).await;
1081        }
1082        {
1083            let event = events.remove(0);
1084            let log = event.as_log();
1085            assert_eq!(log["key2"], "value2".into());
1086            assert_event_metadata(log).await;
1087        }
1088    }
1089
1090    async fn assert_event_metadata(log: &LogEvent) {
1091        assert!(log.get_timestamp().is_some());
1092
1093        let source_type_key_value = log
1094            .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1095            .unwrap()
1096            .as_str()
1097            .unwrap();
1098        assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1099        assert_eq!(log["http_path"], "/".into());
1100    }
1101
1102    #[tokio::test]
1103    async fn http_headers() {
1104        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1105            let mut headers = HeaderMap::new();
1106            headers.insert("User-Agent", "test_client".parse().unwrap());
1107            headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1108            headers.insert("X-Test-Header", "true".parse().unwrap());
1109
1110            let (rx, addr) = source(
1111                vec![
1112                    "User-Agent".to_string(),
1113                    "Upgrade-Insecure-Requests".to_string(),
1114                    "X-*".to_string(),
1115                    "AbsentHeader".to_string(),
1116                ],
1117                vec![],
1118                "http_path",
1119                "remote_ip",
1120                "/",
1121                "POST",
1122                StatusCode::OK,
1123                None,
1124                true,
1125                EventStatus::Delivered,
1126                true,
1127                None,
1128                Some(JsonDeserializerConfig::default().into()),
1129            )
1130            .await;
1131
1132            spawn_ok_collect_n(
1133                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1134                rx,
1135                1,
1136            )
1137            .await
1138        })
1139        .await;
1140
1141        {
1142            let event = events.remove(0);
1143            let log = event.as_log();
1144            assert_eq!(log["key1"], "value1".into());
1145            assert_eq!(log["\"User-Agent\""], "test_client".into());
1146            assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1147            assert_eq!(log["\"x-test-header\""], "true".into());
1148            assert_eq!(log["AbsentHeader"], Value::Null);
1149            assert_event_metadata(log).await;
1150        }
1151    }
1152
1153    #[tokio::test]
1154    async fn http_headers_wildcard() {
1155        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1156            let mut headers = HeaderMap::new();
1157            headers.insert("User-Agent", "test_client".parse().unwrap());
1158            headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1159            // Header that conflicts with an existing field.
1160            headers.insert("key1", "value_from_header".parse().unwrap());
1161
1162            let (rx, addr) = source(
1163                vec!["*".to_string()],
1164                vec![],
1165                "http_path",
1166                "remote_ip",
1167                "/",
1168                "POST",
1169                StatusCode::OK,
1170                None,
1171                true,
1172                EventStatus::Delivered,
1173                true,
1174                None,
1175                Some(JsonDeserializerConfig::default().into()),
1176            )
1177            .await;
1178
1179            spawn_ok_collect_n(
1180                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1181                rx,
1182                1,
1183            )
1184            .await
1185        })
1186        .await;
1187
1188        {
1189            let event = events.remove(0);
1190            let log = event.as_log();
1191            assert_eq!(log["key1"], "value1".into());
1192            assert_eq!(log["\"user-agent\""], "test_client".into());
1193            assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1194            assert_event_metadata(log).await;
1195        }
1196    }
1197
1198    #[tokio::test]
1199    async fn http_query() {
1200        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1201            let (rx, addr) = source(
1202                vec![],
1203                vec![
1204                    "source".to_string(),
1205                    "region".to_string(),
1206                    "absent".to_string(),
1207                ],
1208                "http_path",
1209                "remote_ip",
1210                "/",
1211                "POST",
1212                StatusCode::OK,
1213                None,
1214                true,
1215                EventStatus::Delivered,
1216                true,
1217                None,
1218                Some(JsonDeserializerConfig::default().into()),
1219            )
1220            .await;
1221
1222            spawn_ok_collect_n(
1223                send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
1224                rx,
1225                1,
1226            )
1227            .await
1228        })
1229        .await;
1230
1231        {
1232            let event = events.remove(0);
1233            let log = event.as_log();
1234            assert_eq!(log["key1"], "value1".into());
1235            assert_eq!(log["source"], "staging".into());
1236            assert_eq!(log["region"], "gb".into());
1237            assert_eq!(log["absent"], Value::Null);
1238            assert_event_metadata(log).await;
1239        }
1240    }
1241
1242    #[tokio::test]
1243    async fn http_query_wildcard() {
1244        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1245            let (rx, addr) = source(
1246                vec![],
1247                vec!["*".to_string()],
1248                "http_path",
1249                "remote_ip",
1250                "/",
1251                "POST",
1252                StatusCode::OK,
1253                None,
1254                true,
1255                EventStatus::Delivered,
1256                true,
1257                None,
1258                Some(JsonDeserializerConfig::default().into()),
1259            )
1260            .await;
1261
1262            spawn_ok_collect_n(
1263                send_with_query(
1264                    addr,
1265                    "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1266                    "source=staging&region=gb&key1=value_from_query",
1267                ),
1268                rx,
1269                1,
1270            )
1271            .await
1272        })
1273        .await;
1274
1275        {
1276            let event = events.remove(0);
1277            let log = event.as_log();
1278            assert_eq!(log["key1"], "value_from_query".into());
1279            assert_eq!(log["key2"], "value2".into());
1280            assert_eq!(log["source"], "staging".into());
1281            assert_eq!(log["region"], "gb".into());
1282            assert_event_metadata(log).await;
1283        }
1284    }
1285
1286    #[tokio::test]
1287    async fn http_gzip_deflate() {
1288        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1289            let body = "test body";
1290
1291            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1292            encoder.write_all(body.as_bytes()).unwrap();
1293            let body = encoder.finish().unwrap();
1294
1295            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1296            encoder.write_all(body.as_slice()).unwrap();
1297            let body = encoder.finish().unwrap();
1298
1299            let mut headers = HeaderMap::new();
1300            headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1301
1302            let (rx, addr) = source(
1303                vec![],
1304                vec![],
1305                "http_path",
1306                "remote_ip",
1307                "/",
1308                "POST",
1309                StatusCode::OK,
1310                None,
1311                true,
1312                EventStatus::Delivered,
1313                true,
1314                None,
1315                None,
1316            )
1317            .await;
1318
1319            spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1320        })
1321        .await;
1322
1323        {
1324            let event = events.remove(0);
1325            let log = event.as_log();
1326            assert_eq!(*log.get_message().unwrap(), "test body".into());
1327            assert_event_metadata(log).await;
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn http_rejects_gzip_bomb_with_413() {
1333        // A modestly-sized gzipped blob of zeros that would expand past the default
1334        // 100 MiB cap if decompression were unbounded.
1335        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1336        let chunk = [0u8; 8 * 1024];
1337        for _ in 0..(200 * 1024 * 1024 / chunk.len()) {
1338            encoder.write_all(&chunk).unwrap();
1339        }
1340        let body = encoder.finish().unwrap();
1341
1342        let mut headers = HeaderMap::new();
1343        headers.insert("Content-Encoding", "gzip".parse().unwrap());
1344
1345        components::init_test();
1346        let (_rx, addr) = source(
1347            vec![],
1348            vec![],
1349            "http_path",
1350            "remote_ip",
1351            "/",
1352            "POST",
1353            StatusCode::OK,
1354            None,
1355            true,
1356            EventStatus::Delivered,
1357            true,
1358            None,
1359            None,
1360        )
1361        .await;
1362
1363        assert_eq!(413, send_bytes(addr, body, headers).await);
1364    }
1365
1366    #[tokio::test]
1367    async fn http_path() {
1368        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1369            let (rx, addr) = source(
1370                vec![],
1371                vec![],
1372                "vector_http_path",
1373                "vector_remote_ip",
1374                "/event/path",
1375                "POST",
1376                StatusCode::OK,
1377                None,
1378                true,
1379                EventStatus::Delivered,
1380                true,
1381                None,
1382                Some(JsonDeserializerConfig::default().into()),
1383            )
1384            .await;
1385
1386            spawn_ok_collect_n(
1387                send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1388                rx,
1389                1,
1390            )
1391            .await
1392        })
1393        .await;
1394
1395        {
1396            let event = events.remove(0);
1397            let log = event.as_log();
1398            assert_eq!(log["key1"], "value1".into());
1399            assert_eq!(log["vector_http_path"], "/event/path".into());
1400            assert!(log.get_timestamp().is_some());
1401            assert_eq!(
1402                *log.get_source_type().unwrap(),
1403                SimpleHttpConfig::NAME.into()
1404            );
1405        }
1406    }
1407
1408    #[tokio::test]
1409    async fn http_path_no_restriction() {
1410        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1411            let (rx, addr) = source(
1412                vec![],
1413                vec![],
1414                "vector_http_path",
1415                "vector_remote_ip",
1416                "/event",
1417                "POST",
1418                StatusCode::OK,
1419                None,
1420                false,
1421                EventStatus::Delivered,
1422                true,
1423                None,
1424                Some(JsonDeserializerConfig::default().into()),
1425            )
1426            .await;
1427
1428            spawn_collect_n(
1429                async move {
1430                    assert_eq!(
1431                        200,
1432                        send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1433                    );
1434                    assert_eq!(
1435                        200,
1436                        send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1437                    );
1438                },
1439                rx,
1440                2,
1441            )
1442            .await
1443        })
1444        .await;
1445
1446        {
1447            let event = events.remove(0);
1448            let log = event.as_log();
1449            assert_eq!(log["key1"], "value1".into());
1450            assert_eq!(log["vector_http_path"], "/event/path1".into());
1451            assert!(log.get_timestamp().is_some());
1452            assert_eq!(
1453                *log.get_source_type().unwrap(),
1454                SimpleHttpConfig::NAME.into()
1455            );
1456        }
1457        {
1458            let event = events.remove(0);
1459            let log = event.as_log();
1460            assert_eq!(log["key2"], "value2".into());
1461            assert_eq!(log["vector_http_path"], "/event/path2".into());
1462            assert!(log.get_timestamp().is_some());
1463            assert_eq!(
1464                *log.get_source_type().unwrap(),
1465                SimpleHttpConfig::NAME.into()
1466            );
1467        }
1468    }
1469
1470    #[tokio::test]
1471    async fn http_wrong_path() {
1472        components::init_test();
1473        let (_rx, addr) = source(
1474            vec![],
1475            vec![],
1476            "vector_http_path",
1477            "vector_remote_ip",
1478            "/",
1479            "POST",
1480            StatusCode::OK,
1481            None,
1482            true,
1483            EventStatus::Delivered,
1484            true,
1485            None,
1486            Some(JsonDeserializerConfig::default().into()),
1487        )
1488        .await;
1489
1490        assert_eq!(
1491            404,
1492            send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1493        );
1494    }
1495
1496    #[tokio::test]
1497    async fn http_status_code() {
1498        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1499            let (rx, addr) = source(
1500                vec![],
1501                vec![],
1502                "http_path",
1503                "remote_ip",
1504                "/",
1505                "POST",
1506                StatusCode::ACCEPTED,
1507                None,
1508                true,
1509                EventStatus::Delivered,
1510                true,
1511                None,
1512                None,
1513            )
1514            .await;
1515
1516            spawn_collect_n(
1517                async move {
1518                    assert_eq!(
1519                        StatusCode::ACCEPTED,
1520                        send(addr, "{\"key1\":\"value1\"}").await
1521                    );
1522                },
1523                rx,
1524                1,
1525            )
1526            .await;
1527        })
1528        .await;
1529    }
1530
1531    #[tokio::test]
1532    async fn http_delivery_failure() {
1533        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1534            let (rx, addr) = source(
1535                vec![],
1536                vec![],
1537                "http_path",
1538                "remote_ip",
1539                "/",
1540                "POST",
1541                StatusCode::OK,
1542                None,
1543                true,
1544                EventStatus::Rejected,
1545                true,
1546                None,
1547                None,
1548            )
1549            .await;
1550
1551            spawn_collect_n(
1552                async move {
1553                    assert_eq!(400, send(addr, "test body\n").await);
1554                },
1555                rx,
1556                1,
1557            )
1558            .await;
1559        })
1560        .await;
1561    }
1562
1563    #[tokio::test]
1564    async fn ignores_disabled_acknowledgements() {
1565        let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1566            let (rx, addr) = source(
1567                vec![],
1568                vec![],
1569                "http_path",
1570                "remote_ip",
1571                "/",
1572                "POST",
1573                StatusCode::OK,
1574                None,
1575                true,
1576                EventStatus::Rejected,
1577                false,
1578                None,
1579                None,
1580            )
1581            .await;
1582
1583            spawn_collect_n(
1584                async move {
1585                    assert_eq!(200, send(addr, "test body\n").await);
1586                },
1587                rx,
1588                1,
1589            )
1590            .await
1591        })
1592        .await;
1593
1594        assert_eq!(events.len(), 1);
1595    }
1596
1597    #[tokio::test]
1598    async fn http_get_method() {
1599        components::init_test();
1600        let (_rx, addr) = source(
1601            vec![],
1602            vec![],
1603            "http_path",
1604            "remote_ip",
1605            "/",
1606            "GET",
1607            StatusCode::OK,
1608            None,
1609            true,
1610            EventStatus::Delivered,
1611            true,
1612            None,
1613            None,
1614        )
1615        .await;
1616
1617        assert_eq!(200, send_request(addr, "GET", "", "/").await);
1618    }
1619
1620    #[tokio::test]
1621    async fn returns_401_when_required_auth_is_missing() {
1622        components::init_test();
1623        let (_rx, addr) = source(
1624            vec![],
1625            vec![],
1626            "http_path",
1627            "remote_ip",
1628            "/",
1629            "GET",
1630            StatusCode::OK,
1631            Some(HttpServerAuthConfig::Basic {
1632                username: "test".to_string(),
1633                password: "test".to_string().into(),
1634            }),
1635            true,
1636            EventStatus::Delivered,
1637            true,
1638            None,
1639            None,
1640        )
1641        .await;
1642
1643        assert_eq!(401, send_request(addr, "GET", "", "/").await);
1644    }
1645
1646    #[tokio::test]
1647    async fn returns_401_when_required_auth_is_wrong() {
1648        components::init_test();
1649        let (_rx, addr) = source(
1650            vec![],
1651            vec![],
1652            "http_path",
1653            "remote_ip",
1654            "/",
1655            "POST",
1656            StatusCode::OK,
1657            Some(HttpServerAuthConfig::Basic {
1658                username: "test".to_string(),
1659                password: "test".to_string().into(),
1660            }),
1661            true,
1662            EventStatus::Delivered,
1663            true,
1664            None,
1665            None,
1666        )
1667        .await;
1668
1669        let mut headers = HeaderMap::new();
1670        headers.insert(
1671            AUTHORIZATION,
1672            Authorization::basic("wrong", "test").0.encode(),
1673        );
1674        assert_eq!(401, send_with_headers(addr, "", headers).await);
1675    }
1676
1677    #[tokio::test]
1678    async fn http_get_with_correct_auth() {
1679        components::init_test();
1680        let (_rx, addr) = source(
1681            vec![],
1682            vec![],
1683            "http_path",
1684            "remote_ip",
1685            "/",
1686            "POST",
1687            StatusCode::OK,
1688            Some(HttpServerAuthConfig::Basic {
1689                username: "test".to_string(),
1690                password: "test".to_string().into(),
1691            }),
1692            true,
1693            EventStatus::Delivered,
1694            true,
1695            None,
1696            None,
1697        )
1698        .await;
1699
1700        let mut headers = HeaderMap::new();
1701        headers.insert(
1702            AUTHORIZATION,
1703            Authorization::basic("test", "test").0.encode(),
1704        );
1705        assert_eq!(200, send_with_headers(addr, "", headers).await);
1706    }
1707
1708    #[test]
1709    fn output_schema_definition_vector_namespace() {
1710        let config = SimpleHttpConfig {
1711            log_namespace: Some(true),
1712            ..Default::default()
1713        };
1714
1715        let definitions = config
1716            .outputs(LogNamespace::Vector)
1717            .remove(0)
1718            .schema_definition(true);
1719
1720        let expected_definition =
1721            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1722                .with_meaning(OwnedTargetPath::event_root(), "message")
1723                .with_metadata_field(
1724                    &owned_value_path!("vector", "source_type"),
1725                    Kind::bytes(),
1726                    None,
1727                )
1728                .with_metadata_field(
1729                    &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1730                    Kind::bytes(),
1731                    None,
1732                )
1733                .with_metadata_field(
1734                    &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1735                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1736                    None,
1737                )
1738                .with_metadata_field(
1739                    &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1740                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1741                    None,
1742                )
1743                .with_metadata_field(
1744                    &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1745                    Kind::bytes().or_undefined(),
1746                    None,
1747                )
1748                .with_metadata_field(
1749                    &owned_value_path!("vector", "ingest_timestamp"),
1750                    Kind::timestamp(),
1751                    None,
1752                );
1753
1754        assert_eq!(definitions, Some(expected_definition))
1755    }
1756
1757    #[test]
1758    fn output_schema_definition_legacy_namespace() {
1759        let config = SimpleHttpConfig::default();
1760
1761        let definitions = config
1762            .outputs(LogNamespace::Legacy)
1763            .remove(0)
1764            .schema_definition(true);
1765
1766        let expected_definition = Definition::new_with_default_metadata(
1767            Kind::object(Collection::empty()),
1768            [LogNamespace::Legacy],
1769        )
1770        .with_event_field(
1771            &owned_value_path!("message"),
1772            Kind::bytes(),
1773            Some("message"),
1774        )
1775        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1776        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1777        .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1778        .with_event_field(
1779            &owned_value_path!("host"),
1780            Kind::bytes().or_undefined(),
1781            None,
1782        )
1783        .unknown_fields(Kind::bytes());
1784
1785        assert_eq!(definitions, Some(expected_definition))
1786    }
1787
1788    #[test]
1789    fn validate_remove_duplicates() {
1790        let mut list = vec![
1791            "a".to_owned(),
1792            "b".to_owned(),
1793            "c".to_owned(),
1794            "d".to_owned(),
1795        ];
1796
1797        // no duplicates should be identical
1798        {
1799            let list_dedup = remove_duplicates(list.clone(), "foo");
1800
1801            assert_eq!(list, list_dedup);
1802        }
1803
1804        list.push("b".to_owned());
1805
1806        // remove duplicate "b"
1807        {
1808            let list_dedup = remove_duplicates(list.clone(), "foo");
1809            assert_eq!(
1810                vec![
1811                    "a".to_owned(),
1812                    "b".to_owned(),
1813                    "c".to_owned(),
1814                    "d".to_owned()
1815                ],
1816                list_dedup
1817            );
1818        }
1819    }
1820
1821    #[test]
1822    fn inject_auth_enrichment_does_not_clobber_vector_namespace_builtin_fields() {
1823        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1824        use vector_lib::codecs::BytesDeserializerConfig;
1825        use vrl::value::KeyString;
1826
1827        let decoder = DecodingConfig::new(
1828            BytesDecoderConfig::new().into(),
1829            BytesDeserializerConfig::new().into(),
1830            LogNamespace::Vector,
1831        )
1832        .build()
1833        .unwrap()
1834        .with_log_namespace(LogNamespace::Vector);
1835
1836        let source = super::SimpleHttpSource {
1837            headers: vec![],
1838            query_parameters: vec![],
1839            path_key: OptionalValuePath::none(),
1840            host_key: OptionalValuePath::none(),
1841            decoder,
1842            log_namespace: LogNamespace::Vector,
1843        };
1844
1845        let mut log = LogEvent::default();
1846        // Pre-populate %http_server.path as enrich_events would.
1847        log.insert(
1848            (
1849                PathPrefix::Metadata,
1850                path!(SimpleHttpConfig::NAME).concat(path!("path")),
1851            ),
1852            "/real/path",
1853        );
1854
1855        let mut events = vec![Event::Log(log)];
1856        let mut enrichment = ObjectMap::new();
1857        // Attempt to clobber the built-in `path` field and inject a new field.
1858        enrichment.insert(KeyString::from("path"), Value::from("/clobbered"));
1859        enrichment.insert(KeyString::from("tenant_id"), Value::from("t-123"));
1860
1861        source.inject_auth_enrichment(&mut events, enrichment);
1862
1863        let Event::Log(log) = &events[0] else {
1864            panic!("expected log event");
1865        };
1866        assert_eq!(
1867            log.get((
1868                PathPrefix::Metadata,
1869                path!(SimpleHttpConfig::NAME).concat(path!("path")),
1870            )),
1871            Some(&Value::from("/real/path")),
1872            "auth enrichment must not overwrite built-in source metadata"
1873        );
1874        assert_eq!(
1875            log.get((
1876                PathPrefix::Metadata,
1877                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1878            )),
1879            Some(&Value::from("t-123")),
1880            "new auth enrichment field must be injected"
1881        );
1882    }
1883
1884    #[test]
1885    fn inject_auth_enrichment_does_not_overwrite_existing_metadata_in_vector_namespace() {
1886        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1887        use vector_lib::codecs::BytesDeserializerConfig;
1888        use vrl::value::KeyString;
1889
1890        let decoder = DecodingConfig::new(
1891            BytesDecoderConfig::new().into(),
1892            BytesDeserializerConfig::new().into(),
1893            LogNamespace::Vector,
1894        )
1895        .build()
1896        .unwrap()
1897        .with_log_namespace(LogNamespace::Vector);
1898
1899        let source = super::SimpleHttpSource {
1900            headers: vec![],
1901            query_parameters: vec![],
1902            path_key: OptionalValuePath::none(),
1903            host_key: OptionalValuePath::none(),
1904            decoder,
1905            log_namespace: LogNamespace::Vector,
1906        };
1907
1908        let mut log = LogEvent::default();
1909        // Pre-populate a key (e.g. already written by enrich_events or the decoded event).
1910        log.insert(
1911            (
1912                PathPrefix::Metadata,
1913                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1914            ),
1915            "existing",
1916        );
1917
1918        let mut events = vec![Event::Log(log)];
1919        let mut enrichment = ObjectMap::new();
1920        enrichment.insert(KeyString::from("tenant_id"), Value::from("auth-value"));
1921
1922        source.inject_auth_enrichment(&mut events, enrichment);
1923
1924        let Event::Log(log) = &events[0] else {
1925            panic!("expected log event");
1926        };
1927        assert_eq!(
1928            log.get((
1929                PathPrefix::Metadata,
1930                path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),
1931            )),
1932            Some(&Value::from("existing")),
1933            "auth enrichment must not overwrite already-present metadata keys"
1934        );
1935    }
1936
1937    #[test]
1938    fn inject_auth_enrichment_applies_to_non_log_events_in_vector_namespace() {
1939        use crate::{codecs::DecodingConfig, sources::util::HttpSource as _};
1940        use vector_lib::{
1941            codecs::BytesDeserializerConfig,
1942            event::{Metric, MetricKind, MetricValue},
1943        };
1944        use vrl::value::KeyString;
1945
1946        let decoder = DecodingConfig::new(
1947            BytesDecoderConfig::new().into(),
1948            BytesDeserializerConfig::new().into(),
1949            LogNamespace::Vector,
1950        )
1951        .build()
1952        .unwrap()
1953        .with_log_namespace(LogNamespace::Vector);
1954
1955        let source = super::SimpleHttpSource {
1956            headers: vec![],
1957            query_parameters: vec![],
1958            path_key: OptionalValuePath::none(),
1959            host_key: OptionalValuePath::none(),
1960            decoder,
1961            log_namespace: LogNamespace::Vector,
1962        };
1963
1964        let metric = Metric::new(
1965            "requests",
1966            MetricKind::Incremental,
1967            MetricValue::Counter { value: 1.0 },
1968        );
1969        let mut events = vec![Event::Metric(metric)];
1970
1971        let mut enrichment = ObjectMap::new();
1972        enrichment.insert(KeyString::from("tenant_id"), Value::from("t-456"));
1973
1974        source.inject_auth_enrichment(&mut events, enrichment);
1975
1976        let Event::Metric(metric) = &events[0] else {
1977            panic!("expected metric event");
1978        };
1979        assert_eq!(
1980            metric
1981                .metadata()
1982                .value()
1983                .get(path!(SimpleHttpConfig::NAME).concat(path!("tenant_id")),),
1984            Some(&Value::from("t-456")),
1985            "auth enrichment must be written to non-log event metadata"
1986        );
1987    }
1988
1989    impl ValidatableComponent for SimpleHttpConfig {
1990        fn validation_configuration() -> ValidationConfiguration {
1991            let config = Self {
1992                decoding: Some(DeserializerConfig::Json(Default::default())),
1993                ..Default::default()
1994            };
1995
1996            let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1997
1998            let listen_addr_http = format!("http://{}/", config.address);
1999            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2000
2001            let external_resource = ExternalResource::new(
2002                ResourceDirection::Push,
2003                HttpResourceConfig::from_parts(uri, Some(config.method.into())),
2004                config
2005                    .get_decoding_config()
2006                    .expect("should not fail to get decoding config"),
2007            );
2008
2009            ValidationConfiguration::from_source(
2010                Self::NAME,
2011                log_namespace,
2012                vec![ComponentTestCaseConfig::from_source(
2013                    config,
2014                    None,
2015                    Some(external_resource),
2016                )],
2017            )
2018        }
2019    }
2020
2021    register_validatable_component!(SimpleHttpConfig);
2022}