Skip to main content

vector/sources/aws_s3/
mod.rs

1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_common::compression::gzip_multiple_decoder;
9use vector_lib::{
10    codecs::{
11        NewlineDelimitedDecoderConfig,
12        decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
13    },
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    lookup::owned_value_path,
17};
18use vrl::value::{Kind, kind::Collection};
19
20use super::util::MultilineConfig;
21use crate::{
22    aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
23    codecs::DecodingConfig,
24    common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
25    config::{
26        ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
27    },
28    line_agg,
29    serde::{bool_or_struct, default_decoding},
30    tls::TlsConfig,
31};
32
33pub mod sqs;
34
35/// Compression scheme for objects retrieved from S3.
36#[configurable_component]
37#[configurable(metadata(docs::advanced))]
38#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
39#[serde(rename_all = "lowercase")]
40pub enum Compression {
41    /// Automatically attempt to determine the compression scheme.
42    ///
43    /// The compression scheme of the object is determined from its `Content-Encoding` and
44    /// `Content-Type` metadata, as well as the key suffix (for example, `.gz`).
45    ///
46    /// It is set to `none` if the compression scheme cannot be determined.
47    #[default]
48    Auto,
49
50    /// Uncompressed.
51    None,
52
53    /// GZIP.
54    Gzip,
55
56    /// ZSTD.
57    Zstd,
58}
59
60/// Strategies for consuming objects from AWS S3.
61#[configurable_component]
62#[derive(Clone, Copy, Debug, Default)]
63#[serde(rename_all = "lowercase")]
64enum Strategy {
65    /// Consumes objects by processing bucket notification events sent to an [AWS SQS queue][aws_sqs].
66    ///
67    /// [aws_sqs]: https://aws.amazon.com/sqs/
68    #[default]
69    Sqs,
70}
71
72/// Configuration for the `aws_s3` source.
73// TODO: The `Default` impl here makes the configuration schema output look pretty weird, especially because all the
74// usage of optionals means we're spewing out a ton of `"foo": null` stuff in the default value, and that's not helpful
75// when there's required fields.
76//
77// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
78#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
79#[derive(Clone, Debug, Derivative)]
80#[derivative(Default)]
81#[serde(default, deny_unknown_fields)]
82pub struct AwsS3Config {
83    #[serde(flatten)]
84    region: RegionOrEndpoint,
85
86    /// The compression scheme used for decompressing objects retrieved from S3.
87    compression: Compression,
88
89    /// The strategy to use to consume objects from S3.
90    #[configurable(metadata(docs::hidden))]
91    strategy: Strategy,
92
93    /// Configuration options for SQS.
94    sqs: Option<sqs::Config>,
95
96    /// The ARN of an [IAM role][iam_role] to assume at startup.
97    ///
98    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
99    #[configurable(deprecated)]
100    #[configurable(metadata(docs::hidden))]
101    assume_role: Option<String>,
102
103    #[configurable(derived)]
104    #[serde(default)]
105    auth: AwsAuthentication,
106
107    /// Multiline aggregation configuration.
108    ///
109    /// If not specified, multiline aggregation is disabled.
110    #[configurable(derived)]
111    multiline: Option<MultilineConfig>,
112
113    #[configurable(derived)]
114    #[serde(default, deserialize_with = "bool_or_struct")]
115    acknowledgements: SourceAcknowledgementsConfig,
116
117    #[configurable(derived)]
118    tls_options: Option<TlsConfig>,
119
120    /// The namespace to use for logs. This overrides the global setting.
121    #[configurable(metadata(docs::hidden))]
122    #[serde(default)]
123    log_namespace: Option<bool>,
124
125    #[configurable(derived)]
126    #[serde(default = "default_framing")]
127    #[derivative(Default(value = "default_framing()"))]
128    pub framing: FramingConfig,
129
130    #[configurable(derived)]
131    #[serde(default = "default_decoding")]
132    #[derivative(Default(value = "default_decoding()"))]
133    pub decoding: DeserializerConfig,
134
135    /// Specifies which addressing style to use.
136    ///
137    /// This controls whether the bucket name is in the hostname, or part of the URL.
138    #[serde(default = "default_true")]
139    #[derivative(Default(value = "default_true()"))]
140    pub force_path_style: bool,
141}
142
143const fn default_framing() -> FramingConfig {
144    // This is used for backwards compatibility. It used to be the only (hardcoded) option.
145    FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
146        newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
147    })
148}
149
150const fn default_true() -> bool {
151    true
152}
153
154impl_generate_config_from_default!(AwsS3Config);
155
156#[async_trait::async_trait]
157#[typetag::serde(name = "aws_s3")]
158impl SourceConfig for AwsS3Config {
159    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
160        let log_namespace = cx.log_namespace(self.log_namespace);
161
162        let multiline_config: Option<line_agg::Config> = self
163            .multiline
164            .as_ref()
165            .map(|config| config.try_into())
166            .transpose()?;
167
168        match self.strategy {
169            Strategy::Sqs => Ok(Box::pin(
170                self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
171                    .await?
172                    .run(cx, self.acknowledgements, log_namespace),
173            )),
174        }
175    }
176
177    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
178        let log_namespace = global_log_namespace.merge(self.log_namespace);
179        let mut schema_definition = self
180            .decoding
181            .schema_definition(log_namespace)
182            .with_source_metadata(
183                Self::NAME,
184                Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
185                &owned_value_path!("bucket"),
186                Kind::bytes(),
187                None,
188            )
189            .with_source_metadata(
190                Self::NAME,
191                Some(LegacyKey::Overwrite(owned_value_path!("object"))),
192                &owned_value_path!("object"),
193                Kind::bytes(),
194                None,
195            )
196            .with_source_metadata(
197                Self::NAME,
198                Some(LegacyKey::Overwrite(owned_value_path!("region"))),
199                &owned_value_path!("region"),
200                Kind::bytes(),
201                None,
202            )
203            .with_source_metadata(
204                Self::NAME,
205                None,
206                &owned_value_path!("timestamp"),
207                Kind::timestamp(),
208                Some("timestamp"),
209            )
210            .with_standard_vector_source_metadata()
211            // for metadata that is added to the events dynamically from the metadata
212            .with_source_metadata(
213                Self::NAME,
214                None,
215                &owned_value_path!("metadata"),
216                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
217                None,
218            );
219
220        // for metadata that is added to the events dynamically from the metadata
221        if log_namespace == LogNamespace::Legacy {
222            schema_definition = schema_definition.unknown_fields(Kind::bytes());
223        }
224
225        vec![SourceOutput::new_maybe_logs(
226            self.decoding.output_type(),
227            schema_definition,
228        )]
229    }
230
231    fn can_acknowledge(&self) -> bool {
232        true
233    }
234}
235
236impl AwsS3Config {
237    async fn create_sqs_ingestor(
238        &self,
239        multiline: Option<line_agg::Config>,
240        proxy: &ProxyConfig,
241        log_namespace: LogNamespace,
242    ) -> crate::Result<sqs::Ingestor> {
243        let region = self.region.region();
244        let endpoint = self.region.endpoint();
245
246        let s3_client = create_client::<S3ClientBuilder>(
247            &S3ClientBuilder {
248                force_path_style: Some(self.force_path_style),
249            },
250            &self.auth,
251            region.clone(),
252            endpoint.clone(),
253            proxy,
254            self.tls_options.as_ref(),
255            None,
256        )
257        .await?;
258
259        let decoder =
260            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
261                .build()?;
262
263        match self.sqs {
264            Some(ref sqs) => {
265                let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
266                    &SqsClientBuilder {},
267                    &self.auth,
268                    region.clone(),
269                    endpoint,
270                    proxy,
271                    sqs.tls_options.as_ref(),
272                    sqs.timeout.as_ref(),
273                )
274                .await?;
275
276                let ingestor = sqs::Ingestor::new(
277                    region,
278                    sqs_client,
279                    s3_client,
280                    sqs.clone(),
281                    self.compression,
282                    multiline,
283                    decoder,
284                )
285                .await?;
286
287                Ok(ingestor)
288            }
289            None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
290        }
291    }
292}
293
294#[derive(Debug, Snafu)]
295enum CreateSqsIngestorError {
296    #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
297    ConfigMissing,
298}
299
300/// None if body is empty
301async fn s3_object_decoder(
302    compression: Compression,
303    key: &str,
304    content_encoding: Option<&str>,
305    content_type: Option<&str>,
306    mut body: ByteStream,
307) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
308    let first = match body.next().await {
309        Some(first) => first,
310        _ => {
311            return Box::new(tokio::io::empty());
312        }
313    };
314
315    let r = tokio::io::BufReader::new(StreamReader::new(
316        stream::iter(Some(first))
317            .chain(Box::pin(async_stream::stream! {
318                while let Some(next) = body.next().await {
319                    yield next;
320                }
321            }))
322            .map_err(std::io::Error::other),
323    ));
324
325    let compression = match compression {
326        Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
327        _ => compression,
328    };
329
330    use Compression::*;
331    match compression {
332        Auto => unreachable!(), // is mapped above
333        None => Box::new(r),
334        Gzip => Box::new(gzip_multiple_decoder(r)),
335        Zstd => Box::new({
336            let mut decoder = bufread::ZstdDecoder::new(r);
337            decoder.multiple_members(true);
338            decoder
339        }),
340    }
341}
342
343// try to determine the compression given the:
344// * content-encoding
345// * content-type
346// * key name (for file extension)
347//
348// It will use this information in this order
349fn determine_compression(
350    content_encoding: Option<&str>,
351    content_type: Option<&str>,
352    key: &str,
353) -> Option<Compression> {
354    content_encoding
355        .and_then(content_encoding_to_compression)
356        .or_else(|| content_type.and_then(content_type_to_compression))
357        .or_else(|| object_key_to_compression(key))
358}
359
360fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
361    match content_encoding {
362        "gzip" => Some(Compression::Gzip),
363        "zstd" => Some(Compression::Zstd),
364        _ => None,
365    }
366}
367
368fn content_type_to_compression(content_type: &str) -> Option<Compression> {
369    match content_type {
370        "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
371        "application/zstd" => Some(Compression::Zstd),
372        _ => None,
373    }
374}
375
376fn object_key_to_compression(key: &str) -> Option<Compression> {
377    let extension = std::path::Path::new(key)
378        .extension()
379        .and_then(std::ffi::OsStr::to_str);
380
381    use Compression::*;
382    extension.and_then(|extension| match extension {
383        "gz" => Some(Gzip),
384        "zst" => Some(Zstd),
385        _ => Option::None,
386    })
387}
388
389#[cfg(test)]
390mod test {
391    use tokio::io::AsyncReadExt;
392
393    use super::*;
394
395    #[test]
396    fn determine_compression() {
397        use super::Compression;
398
399        let cases = vec![
400            ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
401            (
402                "out.log",
403                None,
404                Some("application/gzip"),
405                Some(Compression::Gzip),
406            ),
407            ("out.log.gz", None, None, Some(Compression::Gzip)),
408            ("out.txt", None, None, None),
409        ];
410        for case in cases {
411            let (key, content_encoding, content_type, expected) = case;
412            assert_eq!(
413                super::determine_compression(content_encoding, content_type, key),
414                expected,
415                "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
416            );
417        }
418    }
419
420    #[tokio::test]
421    async fn decode_empty_message_gzip() {
422        let key = uuid::Uuid::new_v4().to_string();
423
424        let mut data = Vec::new();
425        s3_object_decoder(
426            Compression::Auto,
427            &key,
428            Some("gzip"),
429            None,
430            ByteStream::default(),
431        )
432        .await
433        .read_to_end(&mut data)
434        .await
435        .unwrap();
436
437        assert!(data.is_empty());
438    }
439}
440
441#[cfg(feature = "aws-s3-integration-tests")]
442#[cfg(test)]
443mod integration_tests {
444    use std::{
445        any::Any,
446        collections::HashMap,
447        fs::File,
448        io::{self, BufRead},
449        path::Path,
450        time::Duration,
451    };
452
453    use aws_sdk_s3::Client as S3Client;
454    use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
455    use similar_asserts::assert_eq;
456    use vector_lib::{
457        codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
458        lookup::path,
459    };
460    use vrl::value::Value;
461
462    use super::*;
463    use crate::{
464        SourceSender,
465        aws::{AwsAuthentication, RegionOrEndpoint, create_client},
466        common::sqs::SqsClientBuilder,
467        config::{ProxyConfig, SourceConfig, SourceContext},
468        event::EventStatus::{self, *},
469        line_agg,
470        sources::{
471            aws_s3::{S3ClientBuilder, sqs::S3Event},
472            util::MultilineConfig,
473        },
474        test_util::{
475            collect_n,
476            components::{SOURCE_TAGS, assert_source_compliance},
477            lines_from_gzip_file, random_lines, trace_init,
478        },
479    };
480
481    fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
482        let file = io::BufReader::new(File::open(path).unwrap());
483        file.lines().map(|x| x.unwrap()).collect()
484    }
485
486    #[tokio::test]
487    async fn s3_process_message() {
488        trace_init();
489
490        let logs: Vec<String> = random_lines(100).take(10).collect();
491
492        test_event(
493            None,
494            None,
495            None,
496            None,
497            logs.join("\n").into_bytes(),
498            logs,
499            Delivered,
500            false,
501            DeserializerConfig::Bytes,
502            None,
503        )
504        .await;
505    }
506
507    #[tokio::test]
508    async fn s3_process_json_message() {
509        trace_init();
510
511        let logs: Vec<String> = random_lines(100).take(10).collect();
512
513        let json_logs: Vec<String> = logs
514            .iter()
515            .map(|msg| {
516                // convert to JSON object
517                format!(r#"{{"message": "{msg}"}}"#)
518            })
519            .collect();
520
521        test_event(
522            None,
523            None,
524            None,
525            None,
526            json_logs.join("\n").into_bytes(),
527            logs,
528            Delivered,
529            false,
530            DeserializerConfig::Json(JsonDeserializerConfig::default()),
531            None,
532        )
533        .await;
534    }
535
536    #[tokio::test]
537    async fn s3_process_message_with_log_namespace() {
538        trace_init();
539
540        let logs: Vec<String> = random_lines(100).take(10).collect();
541
542        test_event(
543            None,
544            None,
545            None,
546            None,
547            logs.join("\n").into_bytes(),
548            logs,
549            Delivered,
550            true,
551            DeserializerConfig::Bytes,
552            None,
553        )
554        .await;
555    }
556
557    #[tokio::test]
558    async fn s3_process_message_spaces() {
559        trace_init();
560
561        let key = "key with spaces".to_string();
562        let logs: Vec<String> = random_lines(100).take(10).collect();
563
564        test_event(
565            Some(key),
566            None,
567            None,
568            None,
569            logs.join("\n").into_bytes(),
570            logs,
571            Delivered,
572            false,
573            DeserializerConfig::Bytes,
574            None,
575        )
576        .await;
577    }
578
579    #[tokio::test]
580    async fn s3_process_message_special_characters() {
581        trace_init();
582
583        let key = format!("special:{}", uuid::Uuid::new_v4());
584        let logs: Vec<String> = random_lines(100).take(10).collect();
585
586        test_event(
587            Some(key),
588            None,
589            None,
590            None,
591            logs.join("\n").into_bytes(),
592            logs,
593            Delivered,
594            false,
595            DeserializerConfig::Bytes,
596            None,
597        )
598        .await;
599    }
600
601    #[tokio::test]
602    async fn s3_process_message_gzip() {
603        use std::io::Read;
604
605        trace_init();
606
607        let logs: Vec<String> = random_lines(100).take(10).collect();
608
609        let mut gz = flate2::read::GzEncoder::new(
610            io::Cursor::new(logs.join("\n").into_bytes()),
611            flate2::Compression::fast(),
612        );
613        let mut buffer = Vec::new();
614        gz.read_to_end(&mut buffer).unwrap();
615
616        test_event(
617            None,
618            Some("gzip"),
619            None,
620            None,
621            buffer,
622            logs,
623            Delivered,
624            false,
625            DeserializerConfig::Bytes,
626            None,
627        )
628        .await;
629    }
630
631    #[tokio::test]
632    async fn s3_process_message_multipart_gzip() {
633        use std::io::Read;
634
635        trace_init();
636
637        let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
638
639        let buffer = {
640            let mut file =
641                File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
642            let mut data = Vec::new();
643            file.read_to_end(&mut data).expect("file can be read");
644            data
645        };
646
647        test_event(
648            None,
649            Some("gzip"),
650            None,
651            None,
652            buffer,
653            logs,
654            Delivered,
655            false,
656            DeserializerConfig::Bytes,
657            None,
658        )
659        .await;
660    }
661
662    #[tokio::test]
663    async fn s3_process_message_multipart_zstd() {
664        use std::io::Read;
665
666        trace_init();
667
668        let logs = lines_from_plaintext("tests/data/multipart-zst.log");
669
670        let buffer = {
671            let mut file =
672                File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
673            let mut data = Vec::new();
674            file.read_to_end(&mut data).expect("file can be read");
675            data
676        };
677
678        test_event(
679            None,
680            Some("zstd"),
681            None,
682            None,
683            buffer,
684            logs,
685            Delivered,
686            false,
687            DeserializerConfig::Bytes,
688            None,
689        )
690        .await;
691    }
692
693    #[tokio::test]
694    async fn s3_process_message_multiline() {
695        trace_init();
696
697        let logs: Vec<String> = vec!["abc", "def", "geh"]
698            .into_iter()
699            .map(ToOwned::to_owned)
700            .collect();
701
702        test_event(
703            None,
704            None,
705            None,
706            Some(MultilineConfig {
707                start_pattern: "abc".to_owned(),
708                mode: line_agg::Mode::HaltWith,
709                condition_pattern: "geh".to_owned(),
710                timeout_ms: Duration::from_millis(1000),
711            }),
712            logs.join("\n").into_bytes(),
713            vec!["abc\ndef\ngeh".to_owned()],
714            Delivered,
715            false,
716            DeserializerConfig::Bytes,
717            None,
718        )
719        .await;
720    }
721
722    // TODO: re-enable this after figuring out why it is so flakey in CI
723    //       https://github.com/vectordotdev/vector/issues/17456
724    #[ignore]
725    #[tokio::test]
726    async fn handles_errored_status() {
727        trace_init();
728
729        let logs: Vec<String> = random_lines(100).take(10).collect();
730
731        test_event(
732            None,
733            None,
734            None,
735            None,
736            logs.join("\n").into_bytes(),
737            logs,
738            Errored,
739            false,
740            DeserializerConfig::Bytes,
741            None,
742        )
743        .await;
744    }
745
746    #[tokio::test]
747    async fn handles_failed_status() {
748        trace_init();
749
750        let logs: Vec<String> = random_lines(100).take(10).collect();
751
752        test_event(
753            None,
754            None,
755            None,
756            None,
757            logs.join("\n").into_bytes(),
758            logs,
759            Rejected,
760            false,
761            DeserializerConfig::Bytes,
762            None,
763        )
764        .await;
765    }
766
767    #[tokio::test]
768    async fn handles_failed_status_without_deletion() {
769        trace_init();
770
771        let logs: Vec<String> = random_lines(100).take(10).collect();
772
773        let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
774        custom_options.insert("delete_failed_message".to_string(), Box::new(false));
775
776        test_event(
777            None,
778            None,
779            None,
780            None,
781            logs.join("\n").into_bytes(),
782            logs,
783            Rejected,
784            false,
785            DeserializerConfig::Bytes,
786            Some(custom_options),
787        )
788        .await;
789    }
790
791    fn s3_address() -> String {
792        std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
793    }
794
795    fn config(
796        queue_url: &str,
797        multiline: Option<MultilineConfig>,
798        log_namespace: bool,
799        decoding: DeserializerConfig,
800    ) -> AwsS3Config {
801        AwsS3Config {
802            region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
803            strategy: Strategy::Sqs,
804            compression: Compression::Auto,
805            multiline,
806            sqs: Some(sqs::Config {
807                queue_url: queue_url.to_string(),
808                poll_secs: 1,
809                max_number_of_messages: 10,
810                visibility_timeout_secs: 0,
811                client_concurrency: None,
812                ..Default::default()
813            }),
814            acknowledgements: true.into(),
815            log_namespace: Some(log_namespace),
816            decoding,
817            ..Default::default()
818        }
819    }
820
821    // puts an object and asserts that the logs it gets back match
822    #[allow(clippy::too_many_arguments)]
823    async fn test_event(
824        key: Option<String>,
825        content_encoding: Option<&str>,
826        content_type: Option<&str>,
827        multiline: Option<MultilineConfig>,
828        payload: Vec<u8>,
829        expected_lines: Vec<String>,
830        status: EventStatus,
831        log_namespace: bool,
832        decoding: DeserializerConfig,
833        custom_options: Option<HashMap<String, Box<dyn Any>>>,
834    ) {
835        assert_source_compliance(&SOURCE_TAGS, async move {
836            let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
837
838            let s3 = s3_client().await;
839            let sqs = sqs_client().await;
840
841            let queue = create_queue(&sqs).await;
842            let bucket = create_bucket(&s3).await;
843
844            tokio::time::sleep(Duration::from_secs(1)).await;
845
846            let mut config = config(&queue, multiline, log_namespace, decoding);
847
848            if let Some(false) = custom_options
849                .as_ref()
850                .and_then(|opts| opts.get("delete_failed_message"))
851                .and_then(|val| val.downcast_ref::<bool>())
852                .copied()
853            {
854                config.sqs.as_mut().unwrap().delete_failed_message = false;
855            }
856
857            s3.put_object()
858                .bucket(bucket.clone())
859                .key(key.clone())
860                .body(ByteStream::from(payload))
861                .set_content_type(content_type.map(|t| t.to_owned()))
862                .set_content_encoding(content_encoding.map(|t| t.to_owned()))
863                .send()
864                .await
865                .expect("Could not put object");
866
867            let sqs_client = sqs_client().await;
868
869            let mut s3_event: S3Event = serde_json::from_str(
870            r#"
871{
872   "Records":[
873      {
874         "eventVersion":"2.1",
875         "eventSource":"aws:s3",
876         "awsRegion":"us-east-1",
877         "eventTime":"2022-03-24T19:43:00.548Z",
878         "eventName":"ObjectCreated:Put",
879         "userIdentity":{
880            "principalId":"AWS:ARNOTAREALIDD4:user.name"
881         },
882         "requestParameters":{
883            "sourceIPAddress":"136.56.73.213"
884         },
885         "responseElements":{
886            "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
887            "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
888         },
889         "s3":{
890            "s3SchemaVersion":"1.0",
891            "configurationId":"asdfasdf",
892            "bucket":{
893               "name":"bucket-name",
894               "ownerIdentity":{
895                  "principalId":"A3PEG170DF9VNQ"
896               },
897               "arn":"arn:aws:s3:::nfox-testing-vector"
898            },
899            "object":{
900               "key":"test-log.txt",
901               "size":33,
902               "eTag":"c981ce6672c4251048b0b834e334007f",
903               "sequencer":"00623CC9C47AB5634C"
904            }
905         }
906      }
907   ]
908}
909        "#,
910            )
911            .unwrap();
912
913            s3_event.records[0].s3.bucket.name.clone_from(&bucket);
914            s3_event.records[0].s3.object.key.clone_from(&key);
915
916            // send SQS message (this is usually sent by S3 itself when an object is uploaded)
917            // This does not automatically work with localstack and the AWS SDK, so this is done manually
918            let _send_message_output = sqs_client
919                .send_message()
920                .queue_url(queue.clone())
921                .message_body(serde_json::to_string(&s3_event).unwrap())
922                .send()
923                .await
924                .unwrap();
925
926            let (tx, rx) = SourceSender::new_test_finalize(status);
927            let cx = SourceContext::new_test(tx, None);
928            let namespace = cx.log_namespace(Some(log_namespace));
929            let source = config.build(cx).await.unwrap();
930            tokio::spawn(async move { source.await.unwrap() });
931
932            let events = collect_n(rx, expected_lines.len()).await;
933
934            assert_eq!(expected_lines.len(), events.len());
935            for (i, event) in events.iter().enumerate() {
936
937                if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
938                    schema_definition.is_valid_for_event(event).unwrap();
939                }
940
941                let message = expected_lines[i].as_str();
942
943                let log = event.as_log();
944                if log_namespace {
945                    assert_eq!(log.value(), &Value::from(message));
946                } else {
947                    assert_eq!(log["message"], message.into());
948                }
949                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
950                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
951                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
952            }
953
954            // Unfortunately we need a fairly large sleep here to ensure that the source has actually managed to delete the SQS message.
955            // The deletion of this message occurs after the Event has been sent out by the source and there is no way of knowing when this
956            // process has finished other than waiting around for a while.
957            tokio::time::sleep(Duration::from_secs(10)).await;
958            // Make sure the SQS message is deleted
959            match status {
960                Errored => {
961                    // need to wait up to the visibility timeout before it will be counted again
962                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
963                }
964                Rejected if !config.sqs.unwrap().delete_failed_message => {
965                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
966                }
967                _ => {
968                    assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
969                }
970            };
971        }).await;
972    }
973
974    /// creates a new SQS queue
975    ///
976    /// returns the queue name
977    async fn create_queue(client: &SqsClient) -> String {
978        let queue_name = uuid::Uuid::new_v4().to_string();
979
980        let res = client
981            .create_queue()
982            .queue_name(queue_name.clone())
983            .attributes(QueueAttributeName::VisibilityTimeout, "2")
984            .send()
985            .await
986            .expect("Could not create queue");
987
988        res.queue_url.expect("no queue url")
989    }
990
991    /// count the number of messages in a SQS queue
992    async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
993        let sqs_result = client
994            .receive_message()
995            .queue_url(queue)
996            .visibility_timeout(0)
997            .wait_time_seconds(wait_time_seconds)
998            .send()
999            .await
1000            .unwrap();
1001
1002        sqs_result
1003            .messages
1004            .map(|messages| messages.len())
1005            .unwrap_or(0)
1006    }
1007
1008    /// creates a new S3 bucket
1009    ///
1010    /// returns the bucket name
1011    async fn create_bucket(client: &S3Client) -> String {
1012        let bucket_name = uuid::Uuid::new_v4().to_string();
1013
1014        client
1015            .create_bucket()
1016            .bucket(bucket_name.clone())
1017            .send()
1018            .await
1019            .expect("Could not create bucket");
1020
1021        bucket_name
1022    }
1023
1024    async fn s3_client() -> S3Client {
1025        let auth = AwsAuthentication::test_auth();
1026        let region_endpoint = RegionOrEndpoint {
1027            region: Some("us-east-1".to_owned()),
1028            endpoint: Some(s3_address()),
1029        };
1030        let proxy_config = ProxyConfig::default();
1031        let force_path_style_value: bool = true;
1032        create_client::<S3ClientBuilder>(
1033            &S3ClientBuilder {
1034                force_path_style: Some(force_path_style_value),
1035            },
1036            &auth,
1037            region_endpoint.region(),
1038            region_endpoint.endpoint(),
1039            &proxy_config,
1040            None,
1041            None,
1042        )
1043        .await
1044        .unwrap()
1045    }
1046
1047    async fn sqs_client() -> SqsClient {
1048        let auth = AwsAuthentication::test_auth();
1049        let region_endpoint = RegionOrEndpoint {
1050            region: Some("us-east-1".to_owned()),
1051            endpoint: Some(s3_address()),
1052        };
1053        let proxy_config = ProxyConfig::default();
1054        create_client::<SqsClientBuilder>(
1055            &SqsClientBuilder {},
1056            &auth,
1057            region_endpoint.region(),
1058            region_endpoint.endpoint(),
1059            &proxy_config,
1060            None,
1061            None,
1062        )
1063        .await
1064        .unwrap()
1065    }
1066}