vector/sources/util/http/
prelude.rs1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 config::SourceAcknowledgementsConfig,
12 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use vrl::value::ObjectMap;
15use warp::{
16 Filter,
17 filters::{
18 BoxedFilter,
19 path::{FullPath, Tail},
20 },
21 http::{HeaderMap, StatusCode},
22 reject::Rejection,
23};
24
25use super::encoding::{decompress_body, limited_body, max_decompressed_size_bytes};
26use crate::{
27 SourceSender,
28 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
29 config::SourceContext,
30 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
31 internal_events::{
32 HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
33 },
34 sources::util::http::HttpMethod,
35 tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
36};
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39 fn enrich_events(
43 &self,
44 _events: &mut [Event],
45 _request_path: &str,
46 _headers_config: &HeaderMap,
47 _query_parameters: &HashMap<String, String>,
48 _source_ip: Option<&SocketAddr>,
49 ) {
50 }
51
52 fn build_events(
53 &self,
54 body: Bytes,
55 header_map: &HeaderMap,
56 query_parameters: &HashMap<String, String>,
57 path: &str,
58 ) -> Result<Vec<Event>, ErrorMessage>;
59
60 fn inject_auth_enrichment(&self, _events: &mut [Event], enrichment: ObjectMap) {
63 if !enrichment.is_empty() {
64 warn!(
65 message = "Auth metadata enrichment is not supported by this source and will be dropped. \
66 Remove %field writes from the custom auth VRL program or switch to a source that supports enrichment.",
67 fields = ?enrichment.keys().collect::<Vec<_>>(),
68 );
69 }
70 }
71
72 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
73 decompress_body(encoding_header, body)
74 }
75
76 #[allow(clippy::too_many_arguments)]
77 fn run(
78 self,
79 address: SocketAddr,
80 path: &str,
81 method: HttpMethod,
82 response_code: StatusCode,
83 strict_path: bool,
84 tls: Option<&TlsEnableableConfig>,
85 auth: Option<&HttpServerAuthConfig>,
86 cx: SourceContext,
87 acknowledgements: SourceAcknowledgementsConfig,
88 keepalive_settings: KeepaliveConfig,
89 ) -> crate::Result<crate::sources::Source> {
90 let tls = MaybeTlsSettings::from_config(tls, true)?;
91 let protocol = tls.http_protocol_name();
92 let auth_matcher = auth
93 .map(|a| a.build(&cx.enrichment_tables, &cx.metrics_storage))
94 .transpose()?;
95 let path = path.to_owned();
96 let acknowledgements = cx.do_acknowledgements(acknowledgements);
97 let enable_source_ip = self.enable_source_ip();
98
99 Ok(Box::pin(async move {
100 let mut filter: BoxedFilter<()> = match method {
101 HttpMethod::Head => warp::head().boxed(),
102 HttpMethod::Get => warp::get().boxed(),
103 HttpMethod::Put => warp::put().boxed(),
104 HttpMethod::Post => warp::post().boxed(),
105 HttpMethod::Patch => warp::patch().boxed(),
106 HttpMethod::Delete => warp::delete().boxed(),
107 HttpMethod::Options => warp::options().boxed(),
108 };
109
110 #[allow(clippy::unnecessary_to_owned)]
112 for s in path.split('/').filter(|&x| !x.is_empty()) {
113 filter = filter.and(warp::path(s.to_string())).boxed()
114 }
115 let body_filter = limited_body(max_decompressed_size_bytes());
116
117 let svc = filter
118 .and(warp::path::tail())
119 .and_then(move |tail: Tail| async move {
120 if !strict_path || tail.as_str().is_empty() {
121 Ok(())
122 } else {
123 emit!(HttpInternalError {
124 message: "Path not found."
125 });
126 Err(warp::reject::custom(ErrorMessage::new(
127 StatusCode::NOT_FOUND,
128 "Not found".to_string(),
129 )))
130 }
131 })
132 .untuple_one()
133 .and(warp::path::full())
134 .and(warp::header::optional::<String>("content-encoding"))
135 .and(warp::header::headers_cloned())
136 .and(body_filter)
137 .and(warp::query::<HashMap<String, String>>())
138 .and(warp::filters::ext::optional())
139 .and_then(
140 move |path: FullPath,
141 encoding_header: Option<String>,
142 headers: HeaderMap,
143 body: Bytes,
144 query_parameters: HashMap<String, String>,
145 addr: Option<PeerAddr>| {
146 debug!(message = "Handling HTTP request.", headers = ?headers);
147 let http_path = path.as_str();
148 let events = auth_matcher
149 .as_ref()
150 .map_or(Ok(None), |a| {
151 a.handle_auth(
152 addr.as_ref().map(|a| a.0).as_ref(),
153 &headers,
154 path.as_str(),
155 )
156 })
157 .and_then(|auth_enrichment| {
158 self.decode(encoding_header.as_deref(), body)
159 .map(|body| (body, auth_enrichment))
160 })
161 .and_then(|(body, auth_enrichment)| {
162 emit!(HttpBytesReceived {
163 byte_size: body.len(),
164 http_path,
165 protocol,
166 });
167 self.build_events(body, &headers, &query_parameters, path.as_str())
168 .map(|events| (events, auth_enrichment))
169 })
170 .map(|(mut events, auth_enrichment)| {
171 emit!(HttpEventsReceived {
172 count: events.len(),
173 byte_size: events.estimated_json_encoded_size_of(),
174 http_path,
175 protocol,
176 });
177
178 self.enrich_events(
179 &mut events,
180 path.as_str(),
181 &headers,
182 &query_parameters,
183 addr.and_then(|a| enable_source_ip.then_some(a))
184 .map(|PeerAddr(inner_addr)| inner_addr)
185 .as_ref(),
186 );
187
188 if let Some(enrichment) = auth_enrichment {
189 self.inject_auth_enrichment(&mut events, enrichment);
190 }
191
192 events
193 });
194
195 handle_request(events, acknowledgements, response_code, cx.out.clone())
196 },
197 );
198
199 let ping = warp::get().and(warp::path("ping")).map(|| "pong");
200 let routes = svc.or(ping).recover(|r: Rejection| async move {
201 if let Some(e_msg) = r.find::<ErrorMessage>() {
202 let json = warp::reply::json(e_msg);
203 Ok(warp::reply::with_status(json, e_msg.status_code()))
204 } else {
205 emit!(HttpInternalError {
207 message: &format!("Internal error: {r:?}")
208 });
209 Err(r)
210 }
211 });
212
213 let span = Span::current();
214 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
215 let remote_addr = conn.peer_addr();
216 let svc = ServiceBuilder::new()
217 .layer(build_http_trace_layer(span.clone()))
218 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
219 MaxConnectionAgeLayer::new(
220 Duration::from_secs(secs),
221 keepalive_settings.max_connection_age_jitter_factor,
222 remote_addr,
223 )
224 }))
225 .map_request(move |mut request: hyper::Request<_>| {
226 request.extensions_mut().insert(PeerAddr::new(remote_addr));
227
228 request
229 })
230 .service(warp::service(routes.clone()));
231 futures_util::future::ok::<_, Infallible>(svc)
232 });
233
234 info!(message = "Building HTTP server.", address = %address);
235
236 let listener = tls.bind(&address).await.map_err(|err| {
237 error!("An error occurred: {:?}.", err);
238 })?;
239
240 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
241 .serve(make_svc)
242 .with_graceful_shutdown(cx.shutdown.map(|_| ()))
243 .await
244 .map_err(|err| {
245 error!("An error occurred: {:?}.", err);
246 })?;
247
248 Ok(())
249 }))
250 }
251
252 fn enable_source_ip(&self) -> bool {
253 false
254 }
255}
256
257#[derive(Clone)]
258#[repr(transparent)]
259struct PeerAddr(SocketAddr);
260
261impl PeerAddr {
262 const fn new(addr: SocketAddr) -> Self {
263 Self(addr)
264 }
265}
266
267struct RejectShuttingDown;
268
269impl fmt::Debug for RejectShuttingDown {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.write_str("shutting down")
272 }
273}
274
275impl warp::reject::Reject for RejectShuttingDown {}
276
277async fn handle_request(
278 events: Result<Vec<Event>, ErrorMessage>,
279 acknowledgements: bool,
280 response_code: StatusCode,
281 mut out: SourceSender,
282) -> Result<impl warp::Reply, Rejection> {
283 match events {
284 Ok(mut events) => {
285 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
286
287 let count = events.len();
288 out.send_batch(events)
289 .map_err(|_| {
290 emit!(StreamClosedError { count });
293 warp::reject::custom(RejectShuttingDown)
294 })
295 .and_then(|_| handle_batch_status(response_code, receiver))
296 .await
297 }
298 Err(error) => {
299 emit!(HttpBadRequest::new(error.code(), error.message()));
300 Err(warp::reject::custom(error))
301 }
302 }
303}
304
305async fn handle_batch_status(
306 success_response_code: StatusCode,
307 receiver: Option<BatchStatusReceiver>,
308) -> Result<impl warp::Reply, Rejection> {
309 match receiver {
310 None => Ok(success_response_code),
311 Some(receiver) => match receiver.await {
312 BatchStatus::Delivered => Ok(success_response_code),
313 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
314 StatusCode::INTERNAL_SERVER_ERROR,
315 "Error delivering contents to sink".into(),
316 ))),
317 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
318 StatusCode::BAD_REQUEST,
319 "Contents failed to deliver to sink".into(),
320 ))),
321 },
322 }
323}