1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9 codecs::decoding::{DeserializerConfig, FramingConfig},
10 config::{LegacyKey, LogNamespace},
11 configurable::configurable_component,
12 lookup::owned_value_path,
13 sensitive_string::SensitiveString,
14 tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19 codecs::DecodingConfig,
20 config::{
21 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22 SourceOutput,
23 },
24 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25 serde::{bool_or_struct, default_decoding, default_framing_message_based},
26 tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34#[configurable_component(source(
36 "aws_kinesis_firehose",
37 "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41 #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43 #[configurable(metadata(docs::examples = "localhost:443"))]
44 address: SocketAddr,
45
46 #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52 access_key: Option<SensitiveString>,
53
54 #[configurable(metadata(docs::examples = "access_keys_example()"))]
59 access_keys: Option<Vec<SensitiveString>>,
60
61 #[configurable(derived)]
66 store_access_key: bool,
67
68 #[serde(default)]
80 record_compression: Compression,
81
82 #[configurable(derived)]
83 tls: Option<TlsEnableableConfig>,
84
85 #[configurable(derived)]
86 #[configurable(metadata(docs::advanced))]
87 #[serde(default = "default_framing_message_based")]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[configurable(metadata(docs::advanced))]
92 #[serde(default = "default_decoding")]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110 ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118pub enum Compression {
119 #[default]
130 Auto,
131
132 None,
134
135 Gzip,
137}
138
139impl fmt::Display for Compression {
140 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
141 match self {
142 Compression::Auto => write!(fmt, "auto"),
143 Compression::None => write!(fmt, "none"),
144 Compression::Gzip => write!(fmt, "gzip"),
145 }
146 }
147}
148
149#[async_trait::async_trait]
150#[typetag::serde(name = "aws_kinesis_firehose")]
151impl SourceConfig for AwsKinesisFirehoseConfig {
152 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
153 let log_namespace = cx.log_namespace(self.log_namespace);
154 let decoder =
155 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
156 .build()?;
157
158 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
159
160 if self.access_key.is_some() {
161 warn!("DEPRECATION `access_key`, use `access_keys` instead.")
162 }
163
164 let access_keys = self
166 .access_keys
167 .iter()
168 .flatten()
169 .chain(self.access_key.iter());
170
171 let svc = filters::firehose(
172 access_keys.map(|key| key.inner().to_string()).collect(),
173 self.store_access_key,
174 self.record_compression,
175 decoder,
176 acknowledgements,
177 cx.out,
178 log_namespace,
179 );
180
181 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
182 let listener = tls.bind(&self.address).await?;
183
184 let keepalive_settings = self.keepalive.clone();
185 let shutdown = cx.shutdown;
186 Ok(Box::pin(async move {
187 let span = Span::current();
188 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
189 let svc = ServiceBuilder::new()
190 .layer(build_http_trace_layer(span.clone()))
191 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
192 MaxConnectionAgeLayer::new(
193 Duration::from_secs(secs),
194 keepalive_settings.max_connection_age_jitter_factor,
195 conn.peer_addr(),
196 )
197 }))
198 .service(warp::service(svc.clone()));
199 futures_util::future::ok::<_, Infallible>(svc)
200 });
201
202 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
203 .serve(make_svc)
204 .with_graceful_shutdown(shutdown.map(|_| ()))
205 .await
206 .map_err(|err| {
207 error!("An error occurred: {:?}.", err);
208 })?;
209
210 Ok(())
211 }))
212 }
213
214 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
215 let schema_definition = self
216 .decoding
217 .schema_definition(global_log_namespace.merge(self.log_namespace))
218 .with_standard_vector_source_metadata()
219 .with_source_metadata(
220 Self::NAME,
221 Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
222 &owned_value_path!("request_id"),
223 Kind::bytes(),
224 None,
225 )
226 .with_source_metadata(
227 Self::NAME,
228 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
229 &owned_value_path!("source_arn"),
230 Kind::bytes(),
231 None,
232 );
233
234 vec![SourceOutput::new_maybe_logs(
235 self.decoding.output_type(),
236 schema_definition,
237 )]
238 }
239
240 fn resources(&self) -> Vec<Resource> {
241 vec![Resource::tcp(self.address)]
242 }
243
244 fn can_acknowledge(&self) -> bool {
245 true
246 }
247}
248
249impl GenerateConfig for AwsKinesisFirehoseConfig {
250 fn generate_config() -> toml::Value {
251 toml::Value::try_from(Self {
252 address: "0.0.0.0:443".parse().unwrap(),
253 access_key: None,
254 access_keys: None,
255 store_access_key: false,
256 tls: None,
257 record_compression: Default::default(),
258 framing: default_framing_message_based(),
259 decoding: default_decoding(),
260 acknowledgements: Default::default(),
261 log_namespace: None,
262 keepalive: Default::default(),
263 })
264 .unwrap()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 #![allow(clippy::print_stdout)] use std::{
273 io::{Cursor, Read},
274 net::SocketAddr,
275 };
276
277 use base64::prelude::{BASE64_STANDARD, Engine as _};
278 use bytes::Bytes;
279 use chrono::{DateTime, SubsecRound, Utc};
280 use flate2::read::GzEncoder;
281 use futures::Stream;
282 use similar_asserts::assert_eq;
283 use vector_lib::{assert_event_data_eq, lookup::path};
284 use vrl::value;
285
286 use super::*;
287 use crate::{
288 SourceSender,
289 event::{Event, EventStatus},
290 log_event,
291 test_util::{
292 addr::{PortGuard, next_addr},
293 collect_n,
294 components::{SOURCE_TAGS, assert_source_compliance},
295 wait_for_tcp,
296 },
297 };
298
299 const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
300 const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
301 const RECORD: &str = r#"
303 {
304 "messageType": "DATA_MESSAGE",
305 "owner": "071959437513",
306 "logGroup": "/jesse/test",
307 "logStream": "test",
308 "subscriptionFilters": ["Destination"],
309 "logEvents": [
310 {
311 "id": "35683658089614582423604394983260738922885519999578275840",
312 "timestamp": 1600110569039,
313 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
314 },
315 {
316 "id": "35683658089659183914001456229543810359430816722590236673",
317 "timestamp": 1600110569041,
318 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
319 }
320 ]
321 }
322 "#;
323
324 #[test]
325 fn generate_config() {
326 crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
327 }
328
329 async fn source(
330 access_key: Option<SensitiveString>,
331 access_keys: Option<Vec<SensitiveString>>,
332 store_access_key: bool,
333 record_compression: Compression,
334 delivered: bool,
335 log_namespace: bool,
336 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
337 use EventStatus::*;
338 let status = if delivered { Delivered } else { Rejected };
339 let (sender, recv) = SourceSender::new_test_finalize(status);
340 let (_guard, address) = next_addr();
341 let cx = SourceContext::new_test(sender, None);
342 tokio::spawn(async move {
343 AwsKinesisFirehoseConfig {
344 address,
345 tls: None,
346 access_key,
347 access_keys,
348 store_access_key,
349 record_compression,
350 framing: default_framing_message_based(),
351 decoding: default_decoding(),
352 acknowledgements: true.into(),
353 log_namespace: Some(log_namespace),
354 keepalive: Default::default(),
355 }
356 .build(cx)
357 .await
358 .unwrap()
359 .await
360 .unwrap()
361 });
362 wait_for_tcp(address).await;
364 (recv, address, _guard)
365 }
366
367 async fn send(
371 address: SocketAddr,
372 timestamp: DateTime<Utc>,
373 records: Vec<&[u8]>,
374 key: Option<&str>,
375 gzip: bool,
376 record_compression: Compression,
377 ) -> reqwest::Result<reqwest::Response> {
378 let request = models::FirehoseRequest {
379 access_key: key.map(|s| s.to_string()),
380 request_id: REQUEST_ID.to_string(),
381 timestamp,
382 records: records
383 .into_iter()
384 .map(|record| models::EncodedFirehoseRecord {
385 data: encode_record(record, record_compression).unwrap(),
386 })
387 .collect(),
388 };
389
390 let mut builder = reqwest::Client::new()
391 .post(format!("http://{address}"))
392 .header("host", address.to_string())
393 .header(
394 "x-amzn-trace-id",
395 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
396 )
397 .header("x-amz-firehose-protocol-version", "1.0")
398 .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
399 .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
400 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
401 .header("content-type", "application/json");
402
403 if let Some(key) = key {
404 builder = builder.header("x-amz-firehose-access-key", key);
405 }
406
407 if gzip {
408 let mut gz = GzEncoder::new(
409 Cursor::new(serde_json::to_vec(&request).unwrap()),
410 flate2::Compression::fast(),
411 );
412 let mut buffer = Vec::new();
413 gz.read_to_end(&mut buffer).unwrap();
414 builder = builder.header("content-encoding", "gzip").body(buffer);
415 } else {
416 builder = builder.json(&request);
417 }
418
419 builder.send().await
420 }
421
422 async fn spawn_send(
423 address: SocketAddr,
424 timestamp: DateTime<Utc>,
425 records: Vec<&'static [u8]>,
426 key: Option<&'static str>,
427 gzip: bool,
428 record_compression: Compression,
429 ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
430 tokio::spawn(async move {
431 send(address, timestamp, records, key, gzip, record_compression).await
432 })
433 }
434
435 fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
438 let compressed = match compression {
439 Compression::Auto => panic!("cannot encode records as Auto"),
440 Compression::Gzip => {
441 let mut buffer = Vec::new();
442 if !record.is_empty() {
443 let mut gz = GzEncoder::new(record, flate2::Compression::fast());
444 gz.read_to_end(&mut buffer)?;
445 }
446 buffer
447 }
448 Compression::None => record.to_vec(),
449 };
450
451 Ok(BASE64_STANDARD.encode(compressed))
452 }
453
454 #[tokio::test]
455 async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
456 let gzipped_record = {
457 let mut buf = Vec::new();
458 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
459 gz.read_to_end(&mut buf).unwrap();
460 buf
461 };
462
463 for (source_record_compression, record_compression, success, record, expected) in [
464 (
465 Compression::Auto,
466 Compression::Gzip,
467 true,
468 RECORD.as_bytes(),
469 RECORD.as_bytes().to_owned(),
470 ),
471 (
472 Compression::Auto,
473 Compression::None,
474 true,
475 RECORD.as_bytes(),
476 RECORD.as_bytes().to_owned(),
477 ),
478 (
479 Compression::None,
480 Compression::Gzip,
481 true,
482 RECORD.as_bytes(),
483 gzipped_record,
484 ),
485 (
486 Compression::None,
487 Compression::None,
488 true,
489 RECORD.as_bytes(),
490 RECORD.as_bytes().to_owned(),
491 ),
492 (
493 Compression::Gzip,
494 Compression::Gzip,
495 true,
496 RECORD.as_bytes(),
497 RECORD.as_bytes().to_owned(),
498 ),
499 (
500 Compression::Gzip,
501 Compression::None,
502 false,
503 RECORD.as_bytes(),
504 RECORD.as_bytes().to_owned(),
505 ),
506 (
507 Compression::Gzip,
508 Compression::Gzip,
509 true,
510 "".as_bytes(),
511 Vec::new(),
512 ),
513 ] {
514 let (rx, addr, _guard) =
515 source(None, None, false, source_record_compression, true, false).await;
516
517 let timestamp: DateTime<Utc> = Utc::now();
518
519 let res = spawn_send(
520 addr,
521 timestamp,
522 vec![record],
523 None,
524 false,
525 record_compression,
526 )
527 .await;
528
529 if success {
530 let events = collect_n(rx, 1).await;
531
532 let res = res.await.unwrap().unwrap();
533 assert_eq!(200, res.status().as_u16());
534
535 assert_event_data_eq!(
536 events,
537 vec![log_event! {
538 "source_type" => Bytes::from("aws_kinesis_firehose"),
539 "timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
541 "request_id" => REQUEST_ID,
542 "source_arn" => SOURCE_ARN,
543 },]
544 );
545
546 let response: models::FirehoseResponse = res.json().await.unwrap();
547 assert_eq!(response.request_id, REQUEST_ID);
548 } else {
549 let res = res.await.unwrap().unwrap();
550 assert_eq!(400, res.status().as_u16());
551 }
552 }
553 }
554
555 #[tokio::test]
556 async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
557 let gzipped_record = {
558 let mut buf = Vec::new();
559 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
560 gz.read_to_end(&mut buf).unwrap();
561 buf
562 };
563
564 for (source_record_compression, record_compression, success, record, expected) in [
565 (
566 Compression::Auto,
567 Compression::Gzip,
568 true,
569 RECORD.as_bytes(),
570 RECORD.as_bytes().to_owned(),
571 ),
572 (
573 Compression::Auto,
574 Compression::None,
575 true,
576 RECORD.as_bytes(),
577 RECORD.as_bytes().to_owned(),
578 ),
579 (
580 Compression::None,
581 Compression::Gzip,
582 true,
583 RECORD.as_bytes(),
584 gzipped_record,
585 ),
586 (
587 Compression::None,
588 Compression::None,
589 true,
590 RECORD.as_bytes(),
591 RECORD.as_bytes().to_owned(),
592 ),
593 (
594 Compression::Gzip,
595 Compression::Gzip,
596 true,
597 RECORD.as_bytes(),
598 RECORD.as_bytes().to_owned(),
599 ),
600 (
601 Compression::Gzip,
602 Compression::None,
603 false,
604 RECORD.as_bytes(),
605 RECORD.as_bytes().to_owned(),
606 ),
607 (
608 Compression::Gzip,
609 Compression::Gzip,
610 true,
611 "".as_bytes(),
612 Vec::new(),
613 ),
614 ] {
615 let (rx, addr, _guard) =
616 source(None, None, false, source_record_compression, true, true).await;
617
618 let timestamp: DateTime<Utc> = Utc::now();
619
620 let res = spawn_send(
621 addr,
622 timestamp,
623 vec![record],
624 None,
625 false,
626 record_compression,
627 )
628 .await;
629
630 if success {
631 let events = collect_n(rx, 1).await;
632
633 let res = res.await.unwrap().unwrap();
634 assert_eq!(200, res.status().as_u16());
635
636 for event in events {
637 let log = event.as_log();
638 let meta = log.metadata();
639
640 assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
642
643 assert_eq!(
645 meta.value().get(path!("vector", "source_type")).unwrap(),
646 &value!("aws_kinesis_firehose")
647 );
648 assert!(
649 meta.value()
650 .get(path!("vector", "ingest_timestamp"))
651 .unwrap()
652 .is_timestamp()
653 );
654
655 assert_eq!(
657 meta.value()
658 .get(path!("aws_kinesis_firehose", "request_id"))
659 .unwrap(),
660 &value!(REQUEST_ID)
661 );
662 assert_eq!(
663 meta.value()
664 .get(path!("aws_kinesis_firehose", "source_arn"))
665 .unwrap(),
666 &value!(SOURCE_ARN)
667 );
668 assert_eq!(
669 meta.value()
670 .get(path!("aws_kinesis_firehose", "timestamp"))
671 .unwrap(),
672 &value!(timestamp.trunc_subsecs(3))
673 );
674 }
675
676 let response: models::FirehoseResponse = res.json().await.unwrap();
677 assert_eq!(response.request_id, REQUEST_ID);
678 } else {
679 let res = res.await.unwrap().unwrap();
680 assert_eq!(400, res.status().as_u16());
681 }
682 }
683 }
684
685 #[tokio::test]
686 async fn aws_kinesis_firehose_forwards_events_gzip_request() {
687 assert_source_compliance(&SOURCE_TAGS, async move {
688 let (rx, addr, _guard) =
689 source(None, None, false, Default::default(), true, false).await;
690
691 let timestamp: DateTime<Utc> = Utc::now();
692
693 let res = spawn_send(
694 addr,
695 timestamp,
696 vec![RECORD.as_bytes()],
697 None,
698 true,
699 Compression::None,
700 )
701 .await;
702
703 let events = collect_n(rx, 1).await;
704 let res = res.await.unwrap().unwrap();
705 assert_eq!(200, res.status().as_u16());
706
707 assert_event_data_eq!(
708 events,
709 vec![log_event! {
710 "source_type" => Bytes::from("aws_kinesis_firehose"),
711 "timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
713 "request_id" => REQUEST_ID,
714 "source_arn" => SOURCE_ARN,
715 },]
716 );
717
718 let response: models::FirehoseResponse = res.json().await.unwrap();
719 assert_eq!(response.request_id, REQUEST_ID);
720 })
721 .await;
722 }
723
724 #[tokio::test]
725 async fn aws_kinesis_firehose_rejects_bad_access_key() {
726 let (_rx, addr, _guard) = source(
727 Some("an access key".to_string().into()),
728 Some(vec!["an access key in list".to_string().into()]),
729 Default::default(),
730 Default::default(),
731 true,
732 false,
733 )
734 .await;
735
736 let res = send(
737 addr,
738 Utc::now(),
739 vec![],
740 Some("bad access key"),
741 false,
742 Compression::None,
743 )
744 .await
745 .unwrap();
746 assert_eq!(401, res.status().as_u16());
747
748 let response: models::FirehoseResponse = res.json().await.unwrap();
749 assert_eq!(response.request_id, REQUEST_ID);
750 }
751
752 #[tokio::test]
753 async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
754 let (_rx, addr, _guard) = source(
755 None,
756 Some(vec!["an access key in list".to_string().into()]),
757 Default::default(),
758 Default::default(),
759 true,
760 false,
761 )
762 .await;
763
764 let res = send(
765 addr,
766 Utc::now(),
767 vec![],
768 Some("bad access key"),
769 false,
770 Compression::None,
771 )
772 .await
773 .unwrap();
774 assert_eq!(401, res.status().as_u16());
775
776 let response: models::FirehoseResponse = res.json().await.unwrap();
777 assert_eq!(response.request_id, REQUEST_ID);
778 }
779
780 #[tokio::test]
781 async fn aws_kinesis_firehose_accepts_merged_access_keys() {
782 let valid_access_key = SensitiveString::from(String::from("an access key in list"));
783
784 let (_rx, addr, _guard) = source(
785 Some(valid_access_key.clone()),
786 Some(vec!["valid access key 2".to_string().into()]),
787 Default::default(),
788 Default::default(),
789 true,
790 false,
791 )
792 .await;
793
794 let res = send(
795 addr,
796 Utc::now(),
797 vec![],
798 Some(valid_access_key.clone().inner()),
799 false,
800 Compression::None,
801 )
802 .await
803 .unwrap();
804
805 assert_eq!(200, res.status().as_u16());
806
807 let response: models::FirehoseResponse = res.json().await.unwrap();
808 assert_eq!(response.request_id, REQUEST_ID);
809 }
810
811 #[tokio::test]
812 async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
813 let valid_access_key = "an access key in list".to_string();
814
815 let (_rx, addr, _guard) = source(
816 None,
817 Some(vec![
818 valid_access_key.clone().into(),
819 "valid access key 2".to_string().into(),
820 ]),
821 Default::default(),
822 Default::default(),
823 true,
824 false,
825 )
826 .await;
827
828 let res = send(
829 addr,
830 Utc::now(),
831 vec![],
832 Some(&valid_access_key),
833 false,
834 Compression::None,
835 )
836 .await
837 .unwrap();
838
839 assert_eq!(200, res.status().as_u16());
840
841 let response: models::FirehoseResponse = res.json().await.unwrap();
842 assert_eq!(response.request_id, REQUEST_ID);
843 }
844
845 #[tokio::test]
846 async fn handles_acknowledgement_failure() {
847 let expected = RECORD.as_bytes().to_owned();
848
849 let (rx, addr, _guard) = source(None, None, false, Compression::None, false, false).await;
850
851 let timestamp: DateTime<Utc> = Utc::now();
852
853 let res = spawn_send(
854 addr,
855 timestamp,
856 vec![RECORD.as_bytes()],
857 None,
858 false,
859 Compression::None,
860 )
861 .await;
862
863 let events = collect_n(rx, 1).await;
864
865 let res = res.await.unwrap().unwrap();
866 assert_eq!(406, res.status().as_u16());
867
868 assert_event_data_eq!(
869 events,
870 vec![log_event! {
871 "source_type" => Bytes::from("aws_kinesis_firehose"),
872 "timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
874 "request_id" => REQUEST_ID,
875 "source_arn" => SOURCE_ARN,
876 },]
877 );
878
879 let response: models::FirehoseResponse = res.json().await.unwrap();
880 assert_eq!(response.request_id, REQUEST_ID);
881 }
882
883 #[tokio::test]
884 async fn event_access_key_passthrough_enabled() {
885 let (rx, address, _guard) = source(
886 None,
887 Some(vec!["an access key".to_string().into()]),
888 true,
889 Default::default(),
890 true,
891 true,
892 )
893 .await;
894
895 let timestamp: DateTime<Utc> = Utc::now();
896
897 spawn_send(
898 address,
899 timestamp,
900 vec![RECORD.as_bytes()],
901 Some("an access key"),
902 false,
903 Compression::None,
904 )
905 .await;
906
907 let events = collect_n(rx, 1).await;
908 let access_key = events[0]
909 .metadata()
910 .secrets()
911 .get("aws_kinesis_firehose_access_key")
912 .unwrap();
913 assert_eq!(access_key.to_string(), "an access key".to_string());
914 }
915
916 #[tokio::test]
917 async fn no_authorization_access_key_passthrough_enabled() {
918 let (rx, address, _guard) = source(None, None, true, Default::default(), true, true).await;
919
920 let timestamp: DateTime<Utc> = Utc::now();
921
922 spawn_send(
923 address,
924 timestamp,
925 vec![RECORD.as_bytes()],
926 None,
927 false,
928 Compression::None,
929 )
930 .await;
931
932 let events = collect_n(rx, 1).await;
933
934 assert!(
935 events[0]
936 .metadata()
937 .secrets()
938 .get("aws_kinesis_firehose_access_key")
939 .is_none()
940 );
941 }
942}