1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_common::compression::gzip_multiple_decoder;
9use vector_lib::{
10 codecs::{
11 NewlineDelimitedDecoderConfig,
12 decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
13 },
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 lookup::owned_value_path,
17};
18use vrl::value::{Kind, kind::Collection};
19
20use super::util::MultilineConfig;
21use crate::{
22 aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
23 codecs::DecodingConfig,
24 common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
25 config::{
26 ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
27 },
28 line_agg,
29 serde::{bool_or_struct, default_decoding},
30 tls::TlsConfig,
31};
32
33pub mod sqs;
34
35#[configurable_component]
37#[configurable(metadata(docs::advanced))]
38#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
39#[serde(rename_all = "lowercase")]
40pub enum Compression {
41 #[default]
48 Auto,
49
50 None,
52
53 Gzip,
55
56 Zstd,
58}
59
60#[configurable_component]
62#[derive(Clone, Copy, Debug, Default)]
63#[serde(rename_all = "lowercase")]
64enum Strategy {
65 #[default]
69 Sqs,
70}
71
72#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
79#[derive(Clone, Debug, Derivative)]
80#[derivative(Default)]
81#[serde(default, deny_unknown_fields)]
82pub struct AwsS3Config {
83 #[serde(flatten)]
84 region: RegionOrEndpoint,
85
86 compression: Compression,
88
89 #[configurable(metadata(docs::hidden))]
91 strategy: Strategy,
92
93 sqs: Option<sqs::Config>,
95
96 #[configurable(deprecated)]
100 #[configurable(metadata(docs::hidden))]
101 assume_role: Option<String>,
102
103 #[configurable(derived)]
104 #[serde(default)]
105 auth: AwsAuthentication,
106
107 #[configurable(derived)]
111 multiline: Option<MultilineConfig>,
112
113 #[configurable(derived)]
114 #[serde(default, deserialize_with = "bool_or_struct")]
115 acknowledgements: SourceAcknowledgementsConfig,
116
117 #[configurable(derived)]
118 tls_options: Option<TlsConfig>,
119
120 #[configurable(metadata(docs::hidden))]
122 #[serde(default)]
123 log_namespace: Option<bool>,
124
125 #[configurable(derived)]
126 #[serde(default = "default_framing")]
127 #[derivative(Default(value = "default_framing()"))]
128 pub framing: FramingConfig,
129
130 #[configurable(derived)]
131 #[serde(default = "default_decoding")]
132 #[derivative(Default(value = "default_decoding()"))]
133 pub decoding: DeserializerConfig,
134
135 #[serde(default = "default_true")]
139 #[derivative(Default(value = "default_true()"))]
140 pub force_path_style: bool,
141}
142
143const fn default_framing() -> FramingConfig {
144 FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
146 newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
147 })
148}
149
150const fn default_true() -> bool {
151 true
152}
153
154impl_generate_config_from_default!(AwsS3Config);
155
156#[async_trait::async_trait]
157#[typetag::serde(name = "aws_s3")]
158impl SourceConfig for AwsS3Config {
159 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
160 let log_namespace = cx.log_namespace(self.log_namespace);
161
162 let multiline_config: Option<line_agg::Config> = self
163 .multiline
164 .as_ref()
165 .map(|config| config.try_into())
166 .transpose()?;
167
168 match self.strategy {
169 Strategy::Sqs => Ok(Box::pin(
170 self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
171 .await?
172 .run(cx, self.acknowledgements, log_namespace),
173 )),
174 }
175 }
176
177 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
178 let log_namespace = global_log_namespace.merge(self.log_namespace);
179 let mut schema_definition = self
180 .decoding
181 .schema_definition(log_namespace)
182 .with_source_metadata(
183 Self::NAME,
184 Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
185 &owned_value_path!("bucket"),
186 Kind::bytes(),
187 None,
188 )
189 .with_source_metadata(
190 Self::NAME,
191 Some(LegacyKey::Overwrite(owned_value_path!("object"))),
192 &owned_value_path!("object"),
193 Kind::bytes(),
194 None,
195 )
196 .with_source_metadata(
197 Self::NAME,
198 Some(LegacyKey::Overwrite(owned_value_path!("region"))),
199 &owned_value_path!("region"),
200 Kind::bytes(),
201 None,
202 )
203 .with_source_metadata(
204 Self::NAME,
205 None,
206 &owned_value_path!("timestamp"),
207 Kind::timestamp(),
208 Some("timestamp"),
209 )
210 .with_standard_vector_source_metadata()
211 .with_source_metadata(
213 Self::NAME,
214 None,
215 &owned_value_path!("metadata"),
216 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
217 None,
218 );
219
220 if log_namespace == LogNamespace::Legacy {
222 schema_definition = schema_definition.unknown_fields(Kind::bytes());
223 }
224
225 vec![SourceOutput::new_maybe_logs(
226 self.decoding.output_type(),
227 schema_definition,
228 )]
229 }
230
231 fn can_acknowledge(&self) -> bool {
232 true
233 }
234}
235
236impl AwsS3Config {
237 async fn create_sqs_ingestor(
238 &self,
239 multiline: Option<line_agg::Config>,
240 proxy: &ProxyConfig,
241 log_namespace: LogNamespace,
242 ) -> crate::Result<sqs::Ingestor> {
243 let region = self.region.region();
244 let endpoint = self.region.endpoint();
245
246 let s3_client = create_client::<S3ClientBuilder>(
247 &S3ClientBuilder {
248 force_path_style: Some(self.force_path_style),
249 },
250 &self.auth,
251 region.clone(),
252 endpoint.clone(),
253 proxy,
254 self.tls_options.as_ref(),
255 None,
256 )
257 .await?;
258
259 let decoder =
260 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
261 .build()?;
262
263 match self.sqs {
264 Some(ref sqs) => {
265 let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
266 &SqsClientBuilder {},
267 &self.auth,
268 region.clone(),
269 endpoint,
270 proxy,
271 sqs.tls_options.as_ref(),
272 sqs.timeout.as_ref(),
273 )
274 .await?;
275
276 let ingestor = sqs::Ingestor::new(
277 region,
278 sqs_client,
279 s3_client,
280 sqs.clone(),
281 self.compression,
282 multiline,
283 decoder,
284 )
285 .await?;
286
287 Ok(ingestor)
288 }
289 None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
290 }
291 }
292}
293
294#[derive(Debug, Snafu)]
295enum CreateSqsIngestorError {
296 #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
297 ConfigMissing,
298}
299
300async fn s3_object_decoder(
302 compression: Compression,
303 key: &str,
304 content_encoding: Option<&str>,
305 content_type: Option<&str>,
306 mut body: ByteStream,
307) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
308 let first = match body.next().await {
309 Some(first) => first,
310 _ => {
311 return Box::new(tokio::io::empty());
312 }
313 };
314
315 let r = tokio::io::BufReader::new(StreamReader::new(
316 stream::iter(Some(first))
317 .chain(Box::pin(async_stream::stream! {
318 while let Some(next) = body.next().await {
319 yield next;
320 }
321 }))
322 .map_err(std::io::Error::other),
323 ));
324
325 let compression = match compression {
326 Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
327 _ => compression,
328 };
329
330 use Compression::*;
331 match compression {
332 Auto => unreachable!(), None => Box::new(r),
334 Gzip => Box::new(gzip_multiple_decoder(r)),
335 Zstd => Box::new({
336 let mut decoder = bufread::ZstdDecoder::new(r);
337 decoder.multiple_members(true);
338 decoder
339 }),
340 }
341}
342
343fn determine_compression(
350 content_encoding: Option<&str>,
351 content_type: Option<&str>,
352 key: &str,
353) -> Option<Compression> {
354 content_encoding
355 .and_then(content_encoding_to_compression)
356 .or_else(|| content_type.and_then(content_type_to_compression))
357 .or_else(|| object_key_to_compression(key))
358}
359
360fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
361 match content_encoding {
362 "gzip" => Some(Compression::Gzip),
363 "zstd" => Some(Compression::Zstd),
364 _ => None,
365 }
366}
367
368fn content_type_to_compression(content_type: &str) -> Option<Compression> {
369 match content_type {
370 "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
371 "application/zstd" => Some(Compression::Zstd),
372 _ => None,
373 }
374}
375
376fn object_key_to_compression(key: &str) -> Option<Compression> {
377 let extension = std::path::Path::new(key)
378 .extension()
379 .and_then(std::ffi::OsStr::to_str);
380
381 use Compression::*;
382 extension.and_then(|extension| match extension {
383 "gz" => Some(Gzip),
384 "zst" => Some(Zstd),
385 _ => Option::None,
386 })
387}
388
389#[cfg(test)]
390mod test {
391 use tokio::io::AsyncReadExt;
392
393 use super::*;
394
395 #[test]
396 fn determine_compression() {
397 use super::Compression;
398
399 let cases = vec![
400 ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
401 (
402 "out.log",
403 None,
404 Some("application/gzip"),
405 Some(Compression::Gzip),
406 ),
407 ("out.log.gz", None, None, Some(Compression::Gzip)),
408 ("out.txt", None, None, None),
409 ];
410 for case in cases {
411 let (key, content_encoding, content_type, expected) = case;
412 assert_eq!(
413 super::determine_compression(content_encoding, content_type, key),
414 expected,
415 "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
416 );
417 }
418 }
419
420 #[tokio::test]
421 async fn decode_empty_message_gzip() {
422 let key = uuid::Uuid::new_v4().to_string();
423
424 let mut data = Vec::new();
425 s3_object_decoder(
426 Compression::Auto,
427 &key,
428 Some("gzip"),
429 None,
430 ByteStream::default(),
431 )
432 .await
433 .read_to_end(&mut data)
434 .await
435 .unwrap();
436
437 assert!(data.is_empty());
438 }
439}
440
441#[cfg(feature = "aws-s3-integration-tests")]
442#[cfg(test)]
443mod integration_tests {
444 use std::{
445 any::Any,
446 collections::HashMap,
447 fs::File,
448 io::{self, BufRead},
449 path::Path,
450 time::Duration,
451 };
452
453 use aws_sdk_s3::Client as S3Client;
454 use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
455 use similar_asserts::assert_eq;
456 use vector_lib::{
457 codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
458 lookup::path,
459 };
460 use vrl::value::Value;
461
462 use super::*;
463 use crate::{
464 SourceSender,
465 aws::{AwsAuthentication, RegionOrEndpoint, create_client},
466 common::sqs::SqsClientBuilder,
467 config::{ProxyConfig, SourceConfig, SourceContext},
468 event::EventStatus::{self, *},
469 line_agg,
470 sources::{
471 aws_s3::{S3ClientBuilder, sqs::S3Event},
472 util::MultilineConfig,
473 },
474 test_util::{
475 collect_n,
476 components::{SOURCE_TAGS, assert_source_compliance},
477 lines_from_gzip_file, random_lines, trace_init,
478 },
479 };
480
481 fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
482 let file = io::BufReader::new(File::open(path).unwrap());
483 file.lines().map(|x| x.unwrap()).collect()
484 }
485
486 #[tokio::test]
487 async fn s3_process_message() {
488 trace_init();
489
490 let logs: Vec<String> = random_lines(100).take(10).collect();
491
492 test_event(
493 None,
494 None,
495 None,
496 None,
497 logs.join("\n").into_bytes(),
498 logs,
499 Delivered,
500 false,
501 DeserializerConfig::Bytes,
502 None,
503 )
504 .await;
505 }
506
507 #[tokio::test]
508 async fn s3_process_json_message() {
509 trace_init();
510
511 let logs: Vec<String> = random_lines(100).take(10).collect();
512
513 let json_logs: Vec<String> = logs
514 .iter()
515 .map(|msg| {
516 format!(r#"{{"message": "{msg}"}}"#)
518 })
519 .collect();
520
521 test_event(
522 None,
523 None,
524 None,
525 None,
526 json_logs.join("\n").into_bytes(),
527 logs,
528 Delivered,
529 false,
530 DeserializerConfig::Json(JsonDeserializerConfig::default()),
531 None,
532 )
533 .await;
534 }
535
536 #[tokio::test]
537 async fn s3_process_message_with_log_namespace() {
538 trace_init();
539
540 let logs: Vec<String> = random_lines(100).take(10).collect();
541
542 test_event(
543 None,
544 None,
545 None,
546 None,
547 logs.join("\n").into_bytes(),
548 logs,
549 Delivered,
550 true,
551 DeserializerConfig::Bytes,
552 None,
553 )
554 .await;
555 }
556
557 #[tokio::test]
558 async fn s3_process_message_spaces() {
559 trace_init();
560
561 let key = "key with spaces".to_string();
562 let logs: Vec<String> = random_lines(100).take(10).collect();
563
564 test_event(
565 Some(key),
566 None,
567 None,
568 None,
569 logs.join("\n").into_bytes(),
570 logs,
571 Delivered,
572 false,
573 DeserializerConfig::Bytes,
574 None,
575 )
576 .await;
577 }
578
579 #[tokio::test]
580 async fn s3_process_message_special_characters() {
581 trace_init();
582
583 let key = format!("special:{}", uuid::Uuid::new_v4());
584 let logs: Vec<String> = random_lines(100).take(10).collect();
585
586 test_event(
587 Some(key),
588 None,
589 None,
590 None,
591 logs.join("\n").into_bytes(),
592 logs,
593 Delivered,
594 false,
595 DeserializerConfig::Bytes,
596 None,
597 )
598 .await;
599 }
600
601 #[tokio::test]
602 async fn s3_process_message_gzip() {
603 use std::io::Read;
604
605 trace_init();
606
607 let logs: Vec<String> = random_lines(100).take(10).collect();
608
609 let mut gz = flate2::read::GzEncoder::new(
610 io::Cursor::new(logs.join("\n").into_bytes()),
611 flate2::Compression::fast(),
612 );
613 let mut buffer = Vec::new();
614 gz.read_to_end(&mut buffer).unwrap();
615
616 test_event(
617 None,
618 Some("gzip"),
619 None,
620 None,
621 buffer,
622 logs,
623 Delivered,
624 false,
625 DeserializerConfig::Bytes,
626 None,
627 )
628 .await;
629 }
630
631 #[tokio::test]
632 async fn s3_process_message_multipart_gzip() {
633 use std::io::Read;
634
635 trace_init();
636
637 let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
638
639 let buffer = {
640 let mut file =
641 File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
642 let mut data = Vec::new();
643 file.read_to_end(&mut data).expect("file can be read");
644 data
645 };
646
647 test_event(
648 None,
649 Some("gzip"),
650 None,
651 None,
652 buffer,
653 logs,
654 Delivered,
655 false,
656 DeserializerConfig::Bytes,
657 None,
658 )
659 .await;
660 }
661
662 #[tokio::test]
663 async fn s3_process_message_multipart_zstd() {
664 use std::io::Read;
665
666 trace_init();
667
668 let logs = lines_from_plaintext("tests/data/multipart-zst.log");
669
670 let buffer = {
671 let mut file =
672 File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
673 let mut data = Vec::new();
674 file.read_to_end(&mut data).expect("file can be read");
675 data
676 };
677
678 test_event(
679 None,
680 Some("zstd"),
681 None,
682 None,
683 buffer,
684 logs,
685 Delivered,
686 false,
687 DeserializerConfig::Bytes,
688 None,
689 )
690 .await;
691 }
692
693 #[tokio::test]
694 async fn s3_process_message_multiline() {
695 trace_init();
696
697 let logs: Vec<String> = vec!["abc", "def", "geh"]
698 .into_iter()
699 .map(ToOwned::to_owned)
700 .collect();
701
702 test_event(
703 None,
704 None,
705 None,
706 Some(MultilineConfig {
707 start_pattern: "abc".to_owned(),
708 mode: line_agg::Mode::HaltWith,
709 condition_pattern: "geh".to_owned(),
710 timeout_ms: Duration::from_millis(1000),
711 }),
712 logs.join("\n").into_bytes(),
713 vec!["abc\ndef\ngeh".to_owned()],
714 Delivered,
715 false,
716 DeserializerConfig::Bytes,
717 None,
718 )
719 .await;
720 }
721
722 #[ignore]
725 #[tokio::test]
726 async fn handles_errored_status() {
727 trace_init();
728
729 let logs: Vec<String> = random_lines(100).take(10).collect();
730
731 test_event(
732 None,
733 None,
734 None,
735 None,
736 logs.join("\n").into_bytes(),
737 logs,
738 Errored,
739 false,
740 DeserializerConfig::Bytes,
741 None,
742 )
743 .await;
744 }
745
746 #[tokio::test]
747 async fn handles_failed_status() {
748 trace_init();
749
750 let logs: Vec<String> = random_lines(100).take(10).collect();
751
752 test_event(
753 None,
754 None,
755 None,
756 None,
757 logs.join("\n").into_bytes(),
758 logs,
759 Rejected,
760 false,
761 DeserializerConfig::Bytes,
762 None,
763 )
764 .await;
765 }
766
767 #[tokio::test]
768 async fn handles_failed_status_without_deletion() {
769 trace_init();
770
771 let logs: Vec<String> = random_lines(100).take(10).collect();
772
773 let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
774 custom_options.insert("delete_failed_message".to_string(), Box::new(false));
775
776 test_event(
777 None,
778 None,
779 None,
780 None,
781 logs.join("\n").into_bytes(),
782 logs,
783 Rejected,
784 false,
785 DeserializerConfig::Bytes,
786 Some(custom_options),
787 )
788 .await;
789 }
790
791 fn s3_address() -> String {
792 std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
793 }
794
795 fn config(
796 queue_url: &str,
797 multiline: Option<MultilineConfig>,
798 log_namespace: bool,
799 decoding: DeserializerConfig,
800 ) -> AwsS3Config {
801 AwsS3Config {
802 region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
803 strategy: Strategy::Sqs,
804 compression: Compression::Auto,
805 multiline,
806 sqs: Some(sqs::Config {
807 queue_url: queue_url.to_string(),
808 poll_secs: 1,
809 max_number_of_messages: 10,
810 visibility_timeout_secs: 0,
811 client_concurrency: None,
812 ..Default::default()
813 }),
814 acknowledgements: true.into(),
815 log_namespace: Some(log_namespace),
816 decoding,
817 ..Default::default()
818 }
819 }
820
821 #[allow(clippy::too_many_arguments)]
823 async fn test_event(
824 key: Option<String>,
825 content_encoding: Option<&str>,
826 content_type: Option<&str>,
827 multiline: Option<MultilineConfig>,
828 payload: Vec<u8>,
829 expected_lines: Vec<String>,
830 status: EventStatus,
831 log_namespace: bool,
832 decoding: DeserializerConfig,
833 custom_options: Option<HashMap<String, Box<dyn Any>>>,
834 ) {
835 assert_source_compliance(&SOURCE_TAGS, async move {
836 let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
837
838 let s3 = s3_client().await;
839 let sqs = sqs_client().await;
840
841 let queue = create_queue(&sqs).await;
842 let bucket = create_bucket(&s3).await;
843
844 tokio::time::sleep(Duration::from_secs(1)).await;
845
846 let mut config = config(&queue, multiline, log_namespace, decoding);
847
848 if let Some(false) = custom_options
849 .as_ref()
850 .and_then(|opts| opts.get("delete_failed_message"))
851 .and_then(|val| val.downcast_ref::<bool>())
852 .copied()
853 {
854 config.sqs.as_mut().unwrap().delete_failed_message = false;
855 }
856
857 s3.put_object()
858 .bucket(bucket.clone())
859 .key(key.clone())
860 .body(ByteStream::from(payload))
861 .set_content_type(content_type.map(|t| t.to_owned()))
862 .set_content_encoding(content_encoding.map(|t| t.to_owned()))
863 .send()
864 .await
865 .expect("Could not put object");
866
867 let sqs_client = sqs_client().await;
868
869 let mut s3_event: S3Event = serde_json::from_str(
870 r#"
871{
872 "Records":[
873 {
874 "eventVersion":"2.1",
875 "eventSource":"aws:s3",
876 "awsRegion":"us-east-1",
877 "eventTime":"2022-03-24T19:43:00.548Z",
878 "eventName":"ObjectCreated:Put",
879 "userIdentity":{
880 "principalId":"AWS:ARNOTAREALIDD4:user.name"
881 },
882 "requestParameters":{
883 "sourceIPAddress":"136.56.73.213"
884 },
885 "responseElements":{
886 "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
887 "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
888 },
889 "s3":{
890 "s3SchemaVersion":"1.0",
891 "configurationId":"asdfasdf",
892 "bucket":{
893 "name":"bucket-name",
894 "ownerIdentity":{
895 "principalId":"A3PEG170DF9VNQ"
896 },
897 "arn":"arn:aws:s3:::nfox-testing-vector"
898 },
899 "object":{
900 "key":"test-log.txt",
901 "size":33,
902 "eTag":"c981ce6672c4251048b0b834e334007f",
903 "sequencer":"00623CC9C47AB5634C"
904 }
905 }
906 }
907 ]
908}
909 "#,
910 )
911 .unwrap();
912
913 s3_event.records[0].s3.bucket.name.clone_from(&bucket);
914 s3_event.records[0].s3.object.key.clone_from(&key);
915
916 let _send_message_output = sqs_client
919 .send_message()
920 .queue_url(queue.clone())
921 .message_body(serde_json::to_string(&s3_event).unwrap())
922 .send()
923 .await
924 .unwrap();
925
926 let (tx, rx) = SourceSender::new_test_finalize(status);
927 let cx = SourceContext::new_test(tx, None);
928 let namespace = cx.log_namespace(Some(log_namespace));
929 let source = config.build(cx).await.unwrap();
930 tokio::spawn(async move { source.await.unwrap() });
931
932 let events = collect_n(rx, expected_lines.len()).await;
933
934 assert_eq!(expected_lines.len(), events.len());
935 for (i, event) in events.iter().enumerate() {
936
937 if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
938 schema_definition.is_valid_for_event(event).unwrap();
939 }
940
941 let message = expected_lines[i].as_str();
942
943 let log = event.as_log();
944 if log_namespace {
945 assert_eq!(log.value(), &Value::from(message));
946 } else {
947 assert_eq!(log["message"], message.into());
948 }
949 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
950 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
951 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
952 }
953
954 tokio::time::sleep(Duration::from_secs(10)).await;
958 match status {
960 Errored => {
961 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
963 }
964 Rejected if !config.sqs.unwrap().delete_failed_message => {
965 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
966 }
967 _ => {
968 assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
969 }
970 };
971 }).await;
972 }
973
974 async fn create_queue(client: &SqsClient) -> String {
978 let queue_name = uuid::Uuid::new_v4().to_string();
979
980 let res = client
981 .create_queue()
982 .queue_name(queue_name.clone())
983 .attributes(QueueAttributeName::VisibilityTimeout, "2")
984 .send()
985 .await
986 .expect("Could not create queue");
987
988 res.queue_url.expect("no queue url")
989 }
990
991 async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
993 let sqs_result = client
994 .receive_message()
995 .queue_url(queue)
996 .visibility_timeout(0)
997 .wait_time_seconds(wait_time_seconds)
998 .send()
999 .await
1000 .unwrap();
1001
1002 sqs_result
1003 .messages
1004 .map(|messages| messages.len())
1005 .unwrap_or(0)
1006 }
1007
1008 async fn create_bucket(client: &S3Client) -> String {
1012 let bucket_name = uuid::Uuid::new_v4().to_string();
1013
1014 client
1015 .create_bucket()
1016 .bucket(bucket_name.clone())
1017 .send()
1018 .await
1019 .expect("Could not create bucket");
1020
1021 bucket_name
1022 }
1023
1024 async fn s3_client() -> S3Client {
1025 let auth = AwsAuthentication::test_auth();
1026 let region_endpoint = RegionOrEndpoint {
1027 region: Some("us-east-1".to_owned()),
1028 endpoint: Some(s3_address()),
1029 };
1030 let proxy_config = ProxyConfig::default();
1031 let force_path_style_value: bool = true;
1032 create_client::<S3ClientBuilder>(
1033 &S3ClientBuilder {
1034 force_path_style: Some(force_path_style_value),
1035 },
1036 &auth,
1037 region_endpoint.region(),
1038 region_endpoint.endpoint(),
1039 &proxy_config,
1040 None,
1041 None,
1042 )
1043 .await
1044 .unwrap()
1045 }
1046
1047 async fn sqs_client() -> SqsClient {
1048 let auth = AwsAuthentication::test_auth();
1049 let region_endpoint = RegionOrEndpoint {
1050 region: Some("us-east-1".to_owned()),
1051 endpoint: Some(s3_address()),
1052 };
1053 let proxy_config = ProxyConfig::default();
1054 create_client::<SqsClientBuilder>(
1055 &SqsClientBuilder {},
1056 &auth,
1057 region_endpoint.region(),
1058 region_endpoint.endpoint(),
1059 &proxy_config,
1060 None,
1061 None,
1062 )
1063 .await
1064 .unwrap()
1065 }
1066}