Skip to main content

vector/sources/util/http/
prelude.rs

1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    config::SourceAcknowledgementsConfig,
12    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use vrl::value::ObjectMap;
15use warp::{
16    Filter,
17    filters::{
18        BoxedFilter,
19        path::{FullPath, Tail},
20    },
21    http::{HeaderMap, StatusCode},
22    reject::Rejection,
23};
24
25use super::encoding::{decompress_body, limited_body, max_decompressed_size_bytes};
26use crate::{
27    SourceSender,
28    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
29    config::SourceContext,
30    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
31    internal_events::{
32        HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
33    },
34    sources::util::http::HttpMethod,
35    tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
36};
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39    // This function can be defined to enrich events with additional HTTP
40    // metadata. This function should be used rather than internal enrichment so
41    // that accurate byte count metrics can be emitted.
42    fn enrich_events(
43        &self,
44        _events: &mut [Event],
45        _request_path: &str,
46        _headers_config: &HeaderMap,
47        _query_parameters: &HashMap<String, String>,
48        _source_ip: Option<&SocketAddr>,
49    ) {
50    }
51
52    fn build_events(
53        &self,
54        body: Bytes,
55        header_map: &HeaderMap,
56        query_parameters: &HashMap<String, String>,
57        path: &str,
58    ) -> Result<Vec<Event>, ErrorMessage>;
59
60    /// Called after `enrich_events` when `custom` auth returned metadata enrichment fields.
61    /// Sources that do not override this will emit a warning and drop the enrichment.
62    fn inject_auth_enrichment(&self, _events: &mut [Event], enrichment: ObjectMap) {
63        if !enrichment.is_empty() {
64            warn!(
65                message = "Auth metadata enrichment is not supported by this source and will be dropped. \
66                           Remove %field writes from the custom auth VRL program or switch to a source that supports enrichment.",
67                fields = ?enrichment.keys().collect::<Vec<_>>(),
68            );
69        }
70    }
71
72    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
73        decompress_body(encoding_header, body)
74    }
75
76    #[allow(clippy::too_many_arguments)]
77    fn run(
78        self,
79        address: SocketAddr,
80        path: &str,
81        method: HttpMethod,
82        response_code: StatusCode,
83        strict_path: bool,
84        tls: Option<&TlsEnableableConfig>,
85        auth: Option<&HttpServerAuthConfig>,
86        cx: SourceContext,
87        acknowledgements: SourceAcknowledgementsConfig,
88        keepalive_settings: KeepaliveConfig,
89    ) -> crate::Result<crate::sources::Source> {
90        let tls = MaybeTlsSettings::from_config(tls, true)?;
91        let protocol = tls.http_protocol_name();
92        let auth_matcher = auth
93            .map(|a| a.build(&cx.enrichment_tables, &cx.metrics_storage))
94            .transpose()?;
95        let path = path.to_owned();
96        let acknowledgements = cx.do_acknowledgements(acknowledgements);
97        let enable_source_ip = self.enable_source_ip();
98
99        Ok(Box::pin(async move {
100            let mut filter: BoxedFilter<()> = match method {
101                HttpMethod::Head => warp::head().boxed(),
102                HttpMethod::Get => warp::get().boxed(),
103                HttpMethod::Put => warp::put().boxed(),
104                HttpMethod::Post => warp::post().boxed(),
105                HttpMethod::Patch => warp::patch().boxed(),
106                HttpMethod::Delete => warp::delete().boxed(),
107                HttpMethod::Options => warp::options().boxed(),
108            };
109
110            // https://github.com/rust-lang/rust-clippy/issues/8148
111            #[allow(clippy::unnecessary_to_owned)]
112            for s in path.split('/').filter(|&x| !x.is_empty()) {
113                filter = filter.and(warp::path(s.to_string())).boxed()
114            }
115            let body_filter = limited_body(max_decompressed_size_bytes());
116
117            let svc = filter
118                .and(warp::path::tail())
119                .and_then(move |tail: Tail| async move {
120                    if !strict_path || tail.as_str().is_empty() {
121                        Ok(())
122                    } else {
123                        emit!(HttpInternalError {
124                            message: "Path not found."
125                        });
126                        Err(warp::reject::custom(ErrorMessage::new(
127                            StatusCode::NOT_FOUND,
128                            "Not found".to_string(),
129                        )))
130                    }
131                })
132                .untuple_one()
133                .and(warp::path::full())
134                .and(warp::header::optional::<String>("content-encoding"))
135                .and(warp::header::headers_cloned())
136                .and(body_filter)
137                .and(warp::query::<HashMap<String, String>>())
138                .and(warp::filters::ext::optional())
139                .and_then(
140                    move |path: FullPath,
141                          encoding_header: Option<String>,
142                          headers: HeaderMap,
143                          body: Bytes,
144                          query_parameters: HashMap<String, String>,
145                          addr: Option<PeerAddr>| {
146                        debug!(message = "Handling HTTP request.", headers = ?headers);
147                        let http_path = path.as_str();
148                        let events = auth_matcher
149                            .as_ref()
150                            .map_or(Ok(None), |a| {
151                                a.handle_auth(
152                                    addr.as_ref().map(|a| a.0).as_ref(),
153                                    &headers,
154                                    path.as_str(),
155                                )
156                            })
157                            .and_then(|auth_enrichment| {
158                                self.decode(encoding_header.as_deref(), body)
159                                    .map(|body| (body, auth_enrichment))
160                            })
161                            .and_then(|(body, auth_enrichment)| {
162                                emit!(HttpBytesReceived {
163                                    byte_size: body.len(),
164                                    http_path,
165                                    protocol,
166                                });
167                                self.build_events(body, &headers, &query_parameters, path.as_str())
168                                    .map(|events| (events, auth_enrichment))
169                            })
170                            .map(|(mut events, auth_enrichment)| {
171                                emit!(HttpEventsReceived {
172                                    count: events.len(),
173                                    byte_size: events.estimated_json_encoded_size_of(),
174                                    http_path,
175                                    protocol,
176                                });
177
178                                self.enrich_events(
179                                    &mut events,
180                                    path.as_str(),
181                                    &headers,
182                                    &query_parameters,
183                                    addr.and_then(|a| enable_source_ip.then_some(a))
184                                        .map(|PeerAddr(inner_addr)| inner_addr)
185                                        .as_ref(),
186                                );
187
188                                if let Some(enrichment) = auth_enrichment {
189                                    self.inject_auth_enrichment(&mut events, enrichment);
190                                }
191
192                                events
193                            });
194
195                        handle_request(events, acknowledgements, response_code, cx.out.clone())
196                    },
197                );
198
199            let ping = warp::get().and(warp::path("ping")).map(|| "pong");
200            let routes = svc.or(ping).recover(|r: Rejection| async move {
201                if let Some(e_msg) = r.find::<ErrorMessage>() {
202                    let json = warp::reply::json(e_msg);
203                    Ok(warp::reply::with_status(json, e_msg.status_code()))
204                } else {
205                    //other internal error - will return 500 internal server error
206                    emit!(HttpInternalError {
207                        message: &format!("Internal error: {r:?}")
208                    });
209                    Err(r)
210                }
211            });
212
213            let span = Span::current();
214            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
215                let remote_addr = conn.peer_addr();
216                let svc = ServiceBuilder::new()
217                    .layer(build_http_trace_layer(span.clone()))
218                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
219                        MaxConnectionAgeLayer::new(
220                            Duration::from_secs(secs),
221                            keepalive_settings.max_connection_age_jitter_factor,
222                            remote_addr,
223                        )
224                    }))
225                    .map_request(move |mut request: hyper::Request<_>| {
226                        request.extensions_mut().insert(PeerAddr::new(remote_addr));
227
228                        request
229                    })
230                    .service(warp::service(routes.clone()));
231                futures_util::future::ok::<_, Infallible>(svc)
232            });
233
234            info!(message = "Building HTTP server.", address = %address);
235
236            let listener = tls.bind(&address).await.map_err(|err| {
237                error!("An error occurred: {:?}.", err);
238            })?;
239
240            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
241                .serve(make_svc)
242                .with_graceful_shutdown(cx.shutdown.map(|_| ()))
243                .await
244                .map_err(|err| {
245                    error!("An error occurred: {:?}.", err);
246                })?;
247
248            Ok(())
249        }))
250    }
251
252    fn enable_source_ip(&self) -> bool {
253        false
254    }
255}
256
257#[derive(Clone)]
258#[repr(transparent)]
259struct PeerAddr(SocketAddr);
260
261impl PeerAddr {
262    const fn new(addr: SocketAddr) -> Self {
263        Self(addr)
264    }
265}
266
267struct RejectShuttingDown;
268
269impl fmt::Debug for RejectShuttingDown {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.write_str("shutting down")
272    }
273}
274
275impl warp::reject::Reject for RejectShuttingDown {}
276
277async fn handle_request(
278    events: Result<Vec<Event>, ErrorMessage>,
279    acknowledgements: bool,
280    response_code: StatusCode,
281    mut out: SourceSender,
282) -> Result<impl warp::Reply, Rejection> {
283    match events {
284        Ok(mut events) => {
285            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
286
287            let count = events.len();
288            out.send_batch(events)
289                .map_err(|_| {
290                    // can only fail if receiving end disconnected, so we are shutting down,
291                    // probably not gracefully.
292                    emit!(StreamClosedError { count });
293                    warp::reject::custom(RejectShuttingDown)
294                })
295                .and_then(|_| handle_batch_status(response_code, receiver))
296                .await
297        }
298        Err(error) => {
299            emit!(HttpBadRequest::new(error.code(), error.message()));
300            Err(warp::reject::custom(error))
301        }
302    }
303}
304
305async fn handle_batch_status(
306    success_response_code: StatusCode,
307    receiver: Option<BatchStatusReceiver>,
308) -> Result<impl warp::Reply, Rejection> {
309    match receiver {
310        None => Ok(success_response_code),
311        Some(receiver) => match receiver.await {
312            BatchStatus::Delivered => Ok(success_response_code),
313            BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
314                StatusCode::INTERNAL_SERVER_ERROR,
315                "Error delivering contents to sink".into(),
316            ))),
317            BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
318                StatusCode::BAD_REQUEST,
319                "Contents failed to deliver to sink".into(),
320            ))),
321        },
322    }
323}