Skip to main content

vector/sources/aws_s3/
sqs.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    panic,
6    sync::{Arc, LazyLock},
7    time::{Duration, Instant},
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12    Client as SqsClient,
13    operation::{
14        delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15        receive_message::ReceiveMessageError,
16        send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17    },
18    types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use vector_lib::{
32    codecs::decoding::FramingError,
33    config::{LegacyKey, LogNamespace, log_schema},
34    configurable::configurable_component,
35    event::MaybeAsLogMut,
36    internal_event::{
37        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
38    },
39    lookup::{PathPrefix, metadata_path, path},
40    source_sender::SendError,
41};
42
43use crate::{
44    SourceSender,
45    aws::AwsTimeout,
46    codecs::Decoder,
47    common::backoff::ExponentialBackoff,
48    config::{SourceAcknowledgementsConfig, SourceContext},
49    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
50    internal_events::{
51        EventsReceived, S3ObjectProcessingFailed, S3ObjectProcessingSucceeded,
52        SqsMessageDeleteBatchError, SqsMessageDeletePartialError, SqsMessageDeleteSucceeded,
53        SqsMessageProcessingError, SqsMessageProcessingSucceeded, SqsMessageReceiveError,
54        SqsMessageReceiveSucceeded, SqsMessageSendBatchError, SqsMessageSentPartialError,
55        SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored, StreamClosedError,
56    },
57    line_agg::{self, LineAgg},
58    shutdown::ShutdownSignal,
59    sources::aws_s3::AwsS3Config,
60    tls::TlsConfig,
61};
62
63static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
64    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
65
66/// Configuration for deferring events based on their age.
67#[serde_as]
68#[configurable_component]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub(super) struct DeferredConfig {
72    /// The URL of the queue to forward events to when they are older than `max_age_secs`.
73    #[configurable(metadata(
74        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
75    ))]
76    #[configurable(validation(format = "uri"))]
77    pub(super) queue_url: String,
78
79    /// Event must have been emitted within the last `max_age_secs` seconds to be processed.
80    ///
81    /// If the event is older, it is forwarded to the `queue_url` for later processing.
82    #[configurable(metadata(docs::type_unit = "seconds"))]
83    #[configurable(metadata(docs::examples = 3600))]
84    pub(super) max_age_secs: u64,
85}
86
87/// SQS configuration options.
88//
89// TODO: It seems awfully likely that we could re-use the existing configuration type for the `aws_sqs` source in some
90// way, given the near 100% overlap in configurable values.
91#[serde_as]
92#[configurable_component]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub(super) struct Config {
97    /// The URL of the SQS queue to poll for bucket notifications.
98    #[configurable(metadata(
99        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
100    ))]
101    #[configurable(validation(format = "uri"))]
102    pub(super) queue_url: String,
103
104    /// How long to wait while polling the queue for new messages, in seconds.
105    ///
106    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
107    /// they are always consumed, regardless of the value of `poll_secs`.
108    // NOTE: We restrict this to u32 for safe conversion to i32 later.
109    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
110    #[serde(default = "default_poll_secs")]
111    #[derivative(Default(value = "default_poll_secs()"))]
112    #[configurable(metadata(docs::type_unit = "seconds"))]
113    pub(super) poll_secs: u32,
114
115    /// The visibility timeout to use for messages, in seconds.
116    ///
117    /// This controls how long a message is left unavailable after it is received. If a message is received, and
118    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
119    ///
120    /// This can happen if there is an issue between consuming a message and deleting it.
121    // NOTE: We restrict this to u32 for safe conversion to i32 later.
122    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
123    #[serde(default = "default_visibility_timeout_secs")]
124    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
125    #[configurable(metadata(docs::type_unit = "seconds"))]
126    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
127    pub(super) visibility_timeout_secs: u32,
128
129    /// Whether to delete the message once it is processed.
130    ///
131    /// It can be useful to set this to `false` for debugging or during the initial setup.
132    #[serde(default = "default_true")]
133    #[derivative(Default(value = "default_true()"))]
134    pub(super) delete_message: bool,
135
136    /// Whether to delete non-retryable messages.
137    ///
138    /// If a message is rejected by the sink and not retryable, it is deleted from the queue.
139    #[serde(default = "default_true")]
140    #[derivative(Default(value = "default_true()"))]
141    pub(super) delete_failed_message: bool,
142
143    /// Number of concurrent tasks to create for polling the queue for messages.
144    ///
145    /// Defaults to the number of available CPUs on the system.
146    ///
147    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
148    /// value when there is a high rate of messages being pushed into the queue and the objects
149    /// being fetched are small. In these cases, system resources may not be fully utilized without
150    /// fetching more messages per second, as the SQS message consumption rate affects the S3 object
151    /// retrieval rate.
152    #[configurable(metadata(docs::type_unit = "tasks"))]
153    #[configurable(metadata(docs::examples = 5))]
154    pub(super) client_concurrency: Option<NonZeroUsize>,
155
156    /// Maximum number of messages to poll from SQS in a batch
157    ///
158    /// Defaults to 10
159    ///
160    /// Should be set to a smaller value when the files are large to help prevent the ingestion of
161    /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
162    // NOTE: We restrict this to u32 for safe conversion to i32 later.
163    #[serde(default = "default_max_number_of_messages")]
164    #[derivative(Default(value = "default_max_number_of_messages()"))]
165    #[configurable(metadata(docs::human_name = "Max Messages"))]
166    #[configurable(metadata(docs::examples = 1))]
167    pub(super) max_number_of_messages: u32,
168
169    #[configurable(derived)]
170    #[serde(default)]
171    #[derivative(Default)]
172    pub(super) tls_options: Option<TlsConfig>,
173
174    // Client timeout configuration for SQS operations. Take care when configuring these settings
175    // to allow enough time for the polling interval configured in `poll_secs`.
176    #[configurable(derived)]
177    #[derivative(Default)]
178    #[serde(default)]
179    #[serde(flatten)]
180    pub(super) timeout: Option<AwsTimeout>,
181
182    /// Configuration for deferring events to another queue based on their age.
183    #[configurable(derived)]
184    pub(super) deferred: Option<DeferredConfig>,
185}
186
187const fn default_poll_secs() -> u32 {
188    15
189}
190
191const fn default_visibility_timeout_secs() -> u32 {
192    300
193}
194
195const fn default_max_number_of_messages() -> u32 {
196    10
197}
198
199const fn default_true() -> bool {
200    true
201}
202
203#[derive(Debug, Snafu)]
204pub(super) enum IngestorNewError {
205    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
206    InvalidNumberOfMessages { messages: u32 },
207}
208
209#[allow(clippy::large_enum_variant)]
210#[derive(Debug, Snafu)]
211pub enum ProcessingError {
212    #[snafu(display(
213        "Could not parse SQS message with id {} as S3 notification: {}",
214        message_id,
215        source
216    ))]
217    InvalidSqsMessage {
218        source: serde_json::Error,
219        message_id: String,
220    },
221    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
222    GetObject {
223        source: SdkError<GetObjectError, HttpResponse>,
224        bucket: String,
225        key: String,
226    },
227    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
228    ReadObject {
229        source: Box<dyn FramingError>,
230        bucket: String,
231        key: String,
232    },
233    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
234    PipelineSend {
235        source: vector_lib::source_sender::SendError,
236        bucket: String,
237        key: String,
238    },
239    #[snafu(display(
240        "Object notification for s3://{}/{} is a bucket in another region: {}",
241        bucket,
242        key,
243        region
244    ))]
245    WrongRegion {
246        region: String,
247        bucket: String,
248        key: String,
249    },
250    #[snafu(display("Unsupported S3 event version: {}.", version,))]
251    UnsupportedS3EventVersion { version: semver::Version },
252    #[snafu(display(
253        "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
254        region,
255        bucket,
256        key
257    ))]
258    ErrorAcknowledgement {
259        region: String,
260        bucket: String,
261        key: String,
262    },
263    #[snafu(display(
264        "File s3://{}/{} too old.  Forwarded to deferred queue {}",
265        bucket,
266        key,
267        deferred_queue
268    ))]
269    FileTooOld {
270        bucket: String,
271        key: String,
272        deferred_queue: String,
273    },
274}
275
276pub struct State {
277    region: Region,
278
279    s3_client: S3Client,
280    sqs_client: SqsClient,
281
282    multiline: Option<line_agg::Config>,
283    compression: super::Compression,
284
285    queue_url: String,
286    poll_secs: i32,
287    max_number_of_messages: i32,
288    client_concurrency: usize,
289    visibility_timeout_secs: i32,
290    delete_message: bool,
291    delete_failed_message: bool,
292    decoder: Decoder,
293
294    deferred: Option<DeferredConfig>,
295}
296
297pub(super) struct Ingestor {
298    state: Arc<State>,
299}
300
301impl Ingestor {
302    pub(super) async fn new(
303        region: Region,
304        sqs_client: SqsClient,
305        s3_client: S3Client,
306        config: Config,
307        compression: super::Compression,
308        multiline: Option<line_agg::Config>,
309        decoder: Decoder,
310    ) -> Result<Ingestor, IngestorNewError> {
311        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
312            return Err(IngestorNewError::InvalidNumberOfMessages {
313                messages: config.max_number_of_messages,
314            });
315        }
316        let state = Arc::new(State {
317            region,
318
319            s3_client,
320            sqs_client,
321
322            compression,
323            multiline,
324
325            queue_url: config.queue_url,
326            poll_secs: config.poll_secs as i32,
327            max_number_of_messages: config.max_number_of_messages as i32,
328            client_concurrency: config
329                .client_concurrency
330                .map(|n| n.get())
331                .unwrap_or_else(crate::num_threads),
332            visibility_timeout_secs: config.visibility_timeout_secs as i32,
333            delete_message: config.delete_message,
334            delete_failed_message: config.delete_failed_message,
335            decoder,
336
337            deferred: config.deferred,
338        });
339
340        Ok(Ingestor { state })
341    }
342
343    pub(super) async fn run(
344        self,
345        cx: SourceContext,
346        acknowledgements: SourceAcknowledgementsConfig,
347        log_namespace: LogNamespace,
348    ) -> Result<(), ()> {
349        let acknowledgements = cx.do_acknowledgements(acknowledgements);
350        let mut handles = Vec::new();
351        for _ in 0..self.state.client_concurrency {
352            let process = IngestorProcess::new(
353                Arc::clone(&self.state),
354                cx.out.clone(),
355                cx.shutdown.clone(),
356                log_namespace,
357                acknowledgements,
358            );
359            let fut = process.run();
360            let handle = crate::spawn_in_current_span(fut);
361            handles.push(handle);
362        }
363
364        // Wait for all of the processes to finish.  If any one of them panics, we resume
365        // that panic here to properly shutdown Vector.
366        for handle in handles.drain(..) {
367            if let Err(e) = handle.await
368                && e.is_panic()
369            {
370                panic::resume_unwind(e.into_panic());
371            }
372        }
373
374        Ok(())
375    }
376}
377
378pub struct IngestorProcess {
379    state: Arc<State>,
380    out: SourceSender,
381    shutdown: ShutdownSignal,
382    acknowledgements: bool,
383    log_namespace: LogNamespace,
384    bytes_received: Registered<BytesReceived>,
385    events_received: Registered<EventsReceived>,
386    backoff: ExponentialBackoff,
387}
388
389impl IngestorProcess {
390    pub fn new(
391        state: Arc<State>,
392        out: SourceSender,
393        shutdown: ShutdownSignal,
394        log_namespace: LogNamespace,
395        acknowledgements: bool,
396    ) -> Self {
397        Self {
398            state,
399            out,
400            shutdown,
401            acknowledgements,
402            log_namespace,
403            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
404            events_received: register!(EventsReceived),
405            backoff: ExponentialBackoff::default().max_delay(Duration::from_secs(30)),
406        }
407    }
408
409    async fn run(mut self) {
410        let shutdown = self.shutdown.clone().fuse();
411        pin!(shutdown);
412
413        loop {
414            select! {
415                _ = &mut shutdown => break,
416                result = self.run_once() => {
417                    match result {
418                        Ok(()) => {
419                            // Reset backoff on successful receive
420                            self.backoff.reset();
421                        }
422                        Err(_) => {
423                            let delay = self.backoff.next().expect("backoff never ends");
424                            trace!(
425                                delay_ms = delay.as_millis(),
426                                "`run_once` failed, will retry after delay.",
427                            );
428                            tokio::time::sleep(delay).await;
429                        }
430                    }
431                },
432            }
433        }
434    }
435
436    async fn run_once(&mut self) -> Result<(), ()> {
437        let messages = match self.receive_messages().await {
438            Ok(messages) => {
439                emit!(SqsMessageReceiveSucceeded {
440                    count: messages.len(),
441                });
442                messages
443            }
444            Err(err) => {
445                emit!(SqsMessageReceiveError { error: &err });
446                return Err(());
447            }
448        };
449
450        let mut delete_entries = Vec::new();
451        let mut deferred_entries = Vec::new();
452        for message in messages {
453            let receipt_handle = match message.receipt_handle {
454                None => {
455                    // I don't think this will ever actually happen, but is just an artifact of the
456                    // AWS's API predilection for returning nullable values for all response
457                    // attributes
458                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
459                    continue;
460                }
461                Some(ref handle) => handle.to_owned(),
462            };
463
464            let message_id = message
465                .message_id
466                .clone()
467                .unwrap_or_else(|| "<unknown>".to_owned());
468            match self.handle_sqs_message(message.clone()).await {
469                Ok(()) => {
470                    emit!(SqsMessageProcessingSucceeded {
471                        message_id: &message_id
472                    });
473                    if self.state.delete_message {
474                        trace!(
475                            message = "Queued SQS message for deletion.",
476                            id = message_id,
477                            receipt_handle = receipt_handle,
478                        );
479                        delete_entries.push(
480                            DeleteMessageBatchRequestEntry::builder()
481                                .id(message_id.clone())
482                                .receipt_handle(receipt_handle)
483                                .build()
484                                .expect("all required builder params specified"),
485                        );
486                    }
487                }
488                Err(err) => {
489                    match err {
490                        ProcessingError::FileTooOld { .. } => {
491                            emit!(SqsMessageProcessingSucceeded {
492                                message_id: &message_id
493                            });
494                            if let Some(deferred) = &self.state.deferred {
495                                trace!(
496                                    message = "Forwarding message to deferred queue.",
497                                    id = message_id,
498                                    receipt_handle = receipt_handle,
499                                    deferred_queue = deferred.queue_url,
500                                );
501
502                                deferred_entries.push(
503                                    SendMessageBatchRequestEntry::builder()
504                                        .id(message_id.clone())
505                                        .message_body(message.body.unwrap_or_default())
506                                        .build()
507                                        .expect("all required builder params specified"),
508                                );
509                            }
510                            //  maybe delete the message from current queue since we have processed it
511                            if self.state.delete_message {
512                                trace!(
513                                    message = "Queued SQS message for deletion.",
514                                    id = message_id,
515                                    receipt_handle = receipt_handle,
516                                );
517                                delete_entries.push(
518                                    DeleteMessageBatchRequestEntry::builder()
519                                        .id(message_id)
520                                        .receipt_handle(receipt_handle)
521                                        .build()
522                                        .expect("all required builder params specified"),
523                                );
524                            }
525                        }
526                        _ => {
527                            emit!(SqsMessageProcessingError {
528                                message_id: &message_id,
529                                error: &err,
530                            });
531                        }
532                    }
533                }
534            }
535        }
536
537        // Should consider removing failed deferrals from the delete_entries
538        if !deferred_entries.is_empty() {
539            let Some(deferred) = &self.state.deferred else {
540                warn!("Deferred queue not configured, but received deferred entries.");
541                return Ok(());
542            };
543            let cloned_entries = deferred_entries.clone();
544            match self
545                .send_messages(deferred_entries, deferred.queue_url.clone())
546                .await
547            {
548                Ok(result) => {
549                    if !result.successful.is_empty() {
550                        emit!(SqsMessageSentSucceeded {
551                            message_ids: result.successful,
552                        })
553                    }
554
555                    if !result.failed.is_empty() {
556                        emit!(SqsMessageSentPartialError {
557                            entries: result.failed
558                        })
559                    }
560                }
561                Err(err) => {
562                    emit!(SqsMessageSendBatchError {
563                        entries: cloned_entries,
564                        error: err,
565                    });
566                }
567            }
568        }
569        if !delete_entries.is_empty() {
570            // We need these for a correct error message if the batch fails overall.
571            let cloned_entries = delete_entries.clone();
572            match self.delete_messages(delete_entries).await {
573                Ok(result) => {
574                    // Batch deletes can have partial successes/failures, so we have to check
575                    // for both cases and emit accordingly.
576                    if !result.successful.is_empty() {
577                        emit!(SqsMessageDeleteSucceeded {
578                            message_ids: result.successful,
579                        });
580                    }
581
582                    if !result.failed.is_empty() {
583                        emit!(SqsMessageDeletePartialError {
584                            entries: result.failed
585                        });
586                    }
587                }
588                Err(err) => {
589                    emit!(SqsMessageDeleteBatchError {
590                        entries: cloned_entries,
591                        error: err,
592                    });
593                }
594            }
595        }
596        Ok(())
597    }
598
599    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
600        let sqs_body = message.body.unwrap_or_default();
601        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
602            .map(|notification| notification.message)
603            .unwrap_or(sqs_body);
604        let s3_event: SqsEvent =
605            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
606                message_id: message
607                    .message_id
608                    .clone()
609                    .unwrap_or_else(|| "<empty>".to_owned()),
610            })?;
611
612        match s3_event {
613            SqsEvent::TestEvent(_s3_test_event) => {
614                debug!(?message.message_id, message = "Found S3 Test Event.");
615                Ok(())
616            }
617            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
618        }
619    }
620
621    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
622        for record in s3_event.records {
623            self.handle_s3_event_record(record, self.log_namespace)
624                .await?
625        }
626        Ok(())
627    }
628
629    async fn handle_s3_event_record(
630        &mut self,
631        s3_event: S3EventRecord,
632        log_namespace: LogNamespace,
633    ) -> Result<(), ProcessingError> {
634        let event_version: semver::Version = s3_event.event_version.clone().into();
635        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
636            return Err(ProcessingError::UnsupportedS3EventVersion {
637                version: event_version.clone(),
638            });
639        }
640
641        if s3_event.event_name.kind != "ObjectCreated" {
642            emit!(SqsS3EventRecordInvalidEventIgnored {
643                bucket: &s3_event.s3.bucket.name,
644                key: &s3_event.s3.object.key,
645                kind: &s3_event.event_name.kind,
646                name: &s3_event.event_name.name,
647            });
648            return Ok(());
649        }
650
651        // S3 has to send notifications to a queue in the same region so I don't think this will
652        // actually ever be hit unless messages are being forwarded from one queue to another
653        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
654            return Err(ProcessingError::WrongRegion {
655                bucket: s3_event.s3.bucket.name.clone(),
656                key: s3_event.s3.object.key.clone(),
657                region: s3_event.aws_region,
658            });
659        }
660
661        if let Some(deferred) = &self.state.deferred {
662            let delta = Utc::now() - s3_event.event_time;
663            if delta.num_seconds() > deferred.max_age_secs as i64 {
664                return Err(ProcessingError::FileTooOld {
665                    bucket: s3_event.s3.bucket.name.clone(),
666                    key: s3_event.s3.object.key.clone(),
667                    deferred_queue: deferred.queue_url.clone(),
668                });
669            }
670        }
671
672        let download_start = Instant::now();
673
674        let object_result = self
675            .state
676            .s3_client
677            .get_object()
678            .bucket(s3_event.s3.bucket.name.clone())
679            .key(s3_event.s3.object.key.clone())
680            .send()
681            .await
682            .context(GetObjectSnafu {
683                bucket: s3_event.s3.bucket.name.clone(),
684                key: s3_event.s3.object.key.clone(),
685            });
686
687        let object = object_result?;
688
689        debug!(
690            message = "Got S3 object from SQS notification.",
691            bucket = s3_event.s3.bucket.name,
692            key = s3_event.s3.object.key,
693        );
694
695        let metadata = object.metadata;
696
697        let timestamp = object.last_modified.map(|ts| {
698            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
699                .single()
700                .expect("invalid timestamp")
701        });
702
703        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
704        let object_reader = super::s3_object_decoder(
705            self.state.compression,
706            &s3_event.s3.object.key,
707            object.content_encoding.as_deref(),
708            object.content_type.as_deref(),
709            object.body,
710        )
711        .await;
712
713        // Record the read error seen to propagate up later so we avoid ack'ing the SQS
714        // message
715        //
716        // String is used as we cannot clone std::io::Error to take ownership in closure
717        //
718        // FramedRead likely stops when it gets an i/o error but I found it more clear to
719        // show that we `take_while` there hasn't been an error
720        //
721        // This can result in objects being partially processed before an error, but we
722        // prefer duplicate lines over message loss. Future work could include recording
723        // the offset of the object that has been read, but this would only be relevant in
724        // the case that the same vector instance processes the same message.
725        let mut read_error = None;
726        let bytes_received = self.bytes_received.clone();
727        let events_received = self.events_received.clone();
728        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
729            FramedRead::new(object_reader, self.state.decoder.framer.clone())
730                .map(|res| {
731                    res.inspect(|bytes| {
732                        bytes_received.emit(ByteSize(bytes.len()));
733                    })
734                    .map_err(|err| {
735                        read_error = Some(err);
736                    })
737                    .ok()
738                })
739                .take_while(|res| ready(res.is_some()))
740                .map(|r| r.expect("validated by take_while")),
741        );
742
743        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
744            Some(config) => Box::new(
745                LineAgg::new(
746                    lines.map(|line| ((), line, ())),
747                    line_agg::Logic::new(config.clone()),
748                )
749                .map(|(_src, line, _context, _lastline_context)| line),
750            ),
751            None => lines,
752        };
753
754        let mut stream = lines.flat_map(|line| {
755            let events = match self.state.decoder.deserializer_parse(line) {
756                Ok((events, _events_size)) => events,
757                Err(_error) => {
758                    // Error is handled by `codecs::Decoder`, no further handling
759                    // is needed here.
760                    SmallVec::new()
761                }
762            };
763
764            let events = events
765                .into_iter()
766                .map(|mut event: Event| {
767                    event = event.with_batch_notifier_option(&batch);
768                    if let Some(log_event) = event.maybe_as_log_mut() {
769                        handle_single_log(
770                            log_event,
771                            log_namespace,
772                            &s3_event,
773                            &metadata,
774                            timestamp,
775                        );
776                    }
777                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
778                    event
779                })
780                .collect::<Vec<Event>>();
781            futures::stream::iter(events)
782        });
783
784        let send_error = match self.out.send_event_stream(&mut stream).await {
785            Ok(_) => None,
786            Err(SendError::Closed) => {
787                let (count, _) = stream.size_hint();
788                emit!(StreamClosedError { count });
789                Some(SendError::Closed)
790            }
791            Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
792        };
793
794        // Up above, `lines` captures `read_error`, and eventually is captured by `stream`,
795        // so we explicitly drop it so that we can again utilize `read_error` below.
796        drop(stream);
797
798        let bucket = &s3_event.s3.bucket.name;
799        let duration = download_start.elapsed();
800
801        if read_error.is_some() {
802            emit!(S3ObjectProcessingFailed { bucket, duration });
803        } else {
804            emit!(S3ObjectProcessingSucceeded { bucket, duration });
805        }
806
807        // The BatchNotifier is cloned for each LogEvent in the batch stream, but the last
808        // reference must be dropped before the status of the batch is sent to the channel.
809        drop(batch);
810
811        if let Some(error) = read_error {
812            Err(ProcessingError::ReadObject {
813                source: error,
814                bucket: s3_event.s3.bucket.name.clone(),
815                key: s3_event.s3.object.key.clone(),
816            })
817        } else if let Some(error) = send_error {
818            Err(ProcessingError::PipelineSend {
819                source: error,
820                bucket: s3_event.s3.bucket.name.clone(),
821                key: s3_event.s3.object.key.clone(),
822            })
823        } else {
824            match receiver {
825                None => Ok(()),
826                Some(receiver) => {
827                    let result = receiver.await;
828                    match result {
829                        BatchStatus::Delivered => {
830                            debug!(
831                                message = "S3 object from SQS delivered.",
832                                bucket = s3_event.s3.bucket.name,
833                                key = s3_event.s3.object.key,
834                            );
835                            Ok(())
836                        }
837                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
838                            bucket: s3_event.s3.bucket.name,
839                            key: s3_event.s3.object.key,
840                            region: s3_event.aws_region,
841                        }),
842                        BatchStatus::Rejected => {
843                            if self.state.delete_failed_message {
844                                warn!(
845                                    message =
846                                        "S3 object from SQS was rejected. Deleting failed message.",
847                                    bucket = s3_event.s3.bucket.name,
848                                    key = s3_event.s3.object.key,
849                                );
850                                Ok(())
851                            } else {
852                                Err(ProcessingError::ErrorAcknowledgement {
853                                    bucket: s3_event.s3.bucket.name,
854                                    key: s3_event.s3.object.key,
855                                    region: s3_event.aws_region,
856                                })
857                            }
858                        }
859                    }
860                }
861            }
862        }
863    }
864
865    async fn receive_messages(
866        &mut self,
867    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
868        self.state
869            .sqs_client
870            .receive_message()
871            .queue_url(self.state.queue_url.clone())
872            .max_number_of_messages(self.state.max_number_of_messages)
873            .visibility_timeout(self.state.visibility_timeout_secs)
874            .wait_time_seconds(self.state.poll_secs)
875            .send()
876            .map_ok(|res| res.messages.unwrap_or_default())
877            .await
878    }
879
880    async fn delete_messages(
881        &mut self,
882        entries: Vec<DeleteMessageBatchRequestEntry>,
883    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
884        self.state
885            .sqs_client
886            .delete_message_batch()
887            .queue_url(self.state.queue_url.clone())
888            .set_entries(Some(entries))
889            .send()
890            .await
891    }
892
893    async fn send_messages(
894        &mut self,
895        entries: Vec<SendMessageBatchRequestEntry>,
896        queue_url: String,
897    ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
898        self.state
899            .sqs_client
900            .send_message_batch()
901            .queue_url(queue_url.clone())
902            .set_entries(Some(entries))
903            .send()
904            .await
905    }
906}
907
908fn handle_single_log(
909    log: &mut LogEvent,
910    log_namespace: LogNamespace,
911    s3_event: &S3EventRecord,
912    metadata: &Option<HashMap<String, String>>,
913    timestamp: Option<DateTime<Utc>>,
914) {
915    log_namespace.insert_source_metadata(
916        AwsS3Config::NAME,
917        log,
918        Some(LegacyKey::Overwrite(path!("bucket"))),
919        path!("bucket"),
920        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
921    );
922
923    log_namespace.insert_source_metadata(
924        AwsS3Config::NAME,
925        log,
926        Some(LegacyKey::Overwrite(path!("object"))),
927        path!("object"),
928        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
929    );
930    log_namespace.insert_source_metadata(
931        AwsS3Config::NAME,
932        log,
933        Some(LegacyKey::Overwrite(path!("region"))),
934        path!("region"),
935        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
936    );
937
938    if let Some(metadata) = metadata {
939        for (key, value) in metadata {
940            log_namespace.insert_source_metadata(
941                AwsS3Config::NAME,
942                log,
943                Some(LegacyKey::Overwrite(path!(key))),
944                path!("metadata", key.as_str()),
945                value.clone(),
946            );
947        }
948    }
949
950    log_namespace.insert_vector_metadata(
951        log,
952        log_schema().source_type_key(),
953        path!("source_type"),
954        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
955    );
956
957    // This handles the transition from the original timestamp logic. Originally the
958    // `timestamp_key` was populated by the `last_modified` time on the object, falling
959    // back to calling `now()`.
960    match log_namespace {
961        LogNamespace::Vector => {
962            if let Some(timestamp) = timestamp {
963                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
964            }
965
966            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
967        }
968        LogNamespace::Legacy => {
969            if let Some(timestamp_key) = log_schema().timestamp_key() {
970                log.try_insert(
971                    (PathPrefix::Event, timestamp_key),
972                    timestamp.unwrap_or_else(Utc::now),
973                );
974            }
975        }
976    };
977}
978
979// https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
980#[derive(Clone, Debug, Deserialize)]
981#[serde(rename_all = "PascalCase")]
982pub struct SnsNotification {
983    pub message: String,
984    pub timestamp: DateTime<Utc>,
985}
986
987// https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html
988#[derive(Clone, Debug, Deserialize)]
989#[serde(untagged)]
990enum SqsEvent {
991    Event(S3Event),
992    TestEvent(S3TestEvent),
993}
994
995#[derive(Clone, Debug, Deserialize)]
996#[serde(rename_all = "PascalCase")]
997pub struct S3TestEvent {
998    pub service: String,
999    pub event: S3EventName,
1000    pub bucket: String,
1001}
1002
1003// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1004#[derive(Clone, Debug, Deserialize, Serialize)]
1005#[serde(rename_all = "PascalCase")]
1006pub struct S3Event {
1007    pub records: Vec<S3EventRecord>,
1008}
1009
1010#[derive(Clone, Debug, Deserialize, Serialize)]
1011#[serde(rename_all = "camelCase")]
1012pub struct S3EventRecord {
1013    pub event_version: S3EventVersion,
1014    pub event_source: String,
1015    pub aws_region: String,
1016    pub event_name: S3EventName,
1017    pub event_time: DateTime<Utc>,
1018
1019    pub s3: S3Message,
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct S3EventVersion {
1024    pub major: u64,
1025    pub minor: u64,
1026}
1027
1028impl From<S3EventVersion> for semver::Version {
1029    fn from(v: S3EventVersion) -> semver::Version {
1030        semver::Version::new(v.major, v.minor, 0)
1031    }
1032}
1033
1034// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1035// <major>.<minor>
1036impl<'de> Deserialize<'de> for S3EventVersion {
1037    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1038    where
1039        D: Deserializer<'de>,
1040    {
1041        use serde::de::Error;
1042
1043        let s = String::deserialize(deserializer)?;
1044
1045        let mut parts = s.splitn(2, '.');
1046
1047        let major = parts
1048            .next()
1049            .ok_or_else(|| D::Error::custom("Missing major version number"))?
1050            .parse::<u64>()
1051            .map_err(D::Error::custom)?;
1052
1053        let minor = parts
1054            .next()
1055            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1056            .parse::<u64>()
1057            .map_err(D::Error::custom)?;
1058
1059        Ok(S3EventVersion { major, minor })
1060    }
1061}
1062
1063impl Serialize for S3EventVersion {
1064    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1065    where
1066        S: Serializer,
1067    {
1068        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1069    }
1070}
1071
1072#[derive(Clone, Debug)]
1073pub struct S3EventName {
1074    pub kind: String,
1075    pub name: String,
1076}
1077
1078// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
1079//
1080// we could use enums here, but that seems overly brittle as deserialization would break if they
1081// add new event types or names
1082impl<'de> Deserialize<'de> for S3EventName {
1083    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1084    where
1085        D: Deserializer<'de>,
1086    {
1087        use serde::de::Error;
1088
1089        let s = String::deserialize(deserializer)?;
1090
1091        let mut parts = s.splitn(2, ':');
1092
1093        let kind = parts
1094            .next()
1095            .ok_or_else(|| D::Error::custom("Missing event kind"))?
1096            .parse::<String>()
1097            .map_err(D::Error::custom)?;
1098
1099        let name = parts
1100            .next()
1101            .ok_or_else(|| D::Error::custom("Missing event name"))?
1102            .parse::<String>()
1103            .map_err(D::Error::custom)?;
1104
1105        Ok(S3EventName { kind, name })
1106    }
1107}
1108
1109impl Serialize for S3EventName {
1110    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1111    where
1112        S: Serializer,
1113    {
1114        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1115    }
1116}
1117
1118#[derive(Clone, Debug, Deserialize, Serialize)]
1119#[serde(rename_all = "camelCase")]
1120pub struct S3Message {
1121    pub bucket: S3Bucket,
1122    pub object: S3Object,
1123}
1124
1125#[derive(Clone, Debug, Deserialize, Serialize)]
1126#[serde(rename_all = "camelCase")]
1127pub struct S3Bucket {
1128    pub name: String,
1129}
1130
1131#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1132#[serde(rename_all = "camelCase")]
1133pub struct S3Object {
1134    // S3ObjectKeys are URL encoded
1135    // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
1136    #[serde(with = "urlencoded_string")]
1137    pub key: String,
1138}
1139
1140mod urlencoded_string {
1141    use percent_encoding::{percent_decode, utf8_percent_encode};
1142
1143    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1144    where
1145        D: serde::de::Deserializer<'de>,
1146    {
1147        use serde::de::Error;
1148
1149        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1150            let decoded = if s.contains(&b'+') {
1151                // AWS encodes spaces as `+` rather than `%20`, so we first need to handle this.
1152                let s = s
1153                    .iter()
1154                    .map(|c| if *c == b'+' { b' ' } else { *c })
1155                    .collect::<Vec<_>>();
1156                percent_decode(&s).decode_utf8().map(Into::into)
1157            } else {
1158                percent_decode(s).decode_utf8().map(Into::into)
1159            };
1160
1161            decoded
1162                .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1163        })
1164    }
1165
1166    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1167    where
1168        S: serde::ser::Serializer,
1169    {
1170        serializer.serialize_str(
1171            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1172        )
1173    }
1174}
1175
1176#[test]
1177fn test_key_deserialize() {
1178    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1179    assert_eq!(
1180        S3Object {
1181            key: "noog nork".to_string(),
1182        },
1183        value
1184    );
1185
1186    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1187    assert_eq!(
1188        S3Object {
1189            key: "noog+nork".to_string(),
1190        },
1191        value
1192    );
1193}
1194
1195#[test]
1196fn test_s3_testevent() {
1197    let value: S3TestEvent = serde_json::from_str(
1198        r#"{
1199        "Service":"Amazon S3",
1200        "Event":"s3:TestEvent",
1201        "Time":"2014-10-13T15:57:02.089Z",
1202        "Bucket":"bucketname",
1203        "RequestId":"5582815E1AEA5ADF",
1204        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1205     }"#,
1206    )
1207    .unwrap();
1208
1209    assert_eq!(value.service, "Amazon S3".to_string());
1210    assert_eq!(value.bucket, "bucketname".to_string());
1211    assert_eq!(value.event.kind, "s3".to_string());
1212    assert_eq!(value.event.name, "TestEvent".to_string());
1213}
1214
1215#[test]
1216fn test_s3_sns_testevent() {
1217    let sns_value: SnsNotification = serde_json::from_str(
1218        r#"{
1219        "Type" : "Notification",
1220        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1221        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1222        "Subject" : "Testing publish to subscribed queues",
1223        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1224        "Timestamp" : "2012-03-29T05:12:16.901Z",
1225        "SignatureVersion" : "1",
1226        "Signature" : "EXAMPLEnTrFPa3...",
1227        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1228        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1229     }"#,
1230    ).unwrap();
1231
1232    assert_eq!(
1233        sns_value.timestamp,
1234        DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1235            .unwrap()
1236            .to_utc()
1237    );
1238
1239    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1240
1241    assert_eq!(value.service, "Amazon S3".to_string());
1242    assert_eq!(value.bucket, "bucketname".to_string());
1243    assert_eq!(value.event.kind, "s3".to_string());
1244    assert_eq!(value.event.name, "TestEvent".to_string());
1245}
1246
1247#[test]
1248fn parse_sqs_config() {
1249    let config: Config = serde_yaml::from_str(
1250        r#"queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1251"#,
1252    )
1253    .unwrap();
1254    assert_eq!(
1255        config.queue_url,
1256        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1257    );
1258    assert!(config.deferred.is_none());
1259
1260    let config: Config = serde_yaml::from_str(indoc::indoc! {r#"
1261        queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1262        deferred:
1263          queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1264          max_age_secs: 3600
1265    "#})
1266    .unwrap();
1267    assert_eq!(
1268        config.queue_url,
1269        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1270    );
1271    let Some(deferred) = config.deferred else {
1272        panic!("Expected deferred config");
1273    };
1274    assert_eq!(
1275        deferred.queue_url,
1276        "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1277    );
1278    assert_eq!(deferred.max_age_secs, 3600);
1279
1280    let test: Result<Config, serde_yaml::Error> = serde_yaml::from_str(indoc::indoc! {r#"
1281        queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1282        deferred:
1283          max_age_secs: 3600
1284    "#});
1285    assert!(test.is_err());
1286
1287    let test: Result<Config, serde_yaml::Error> = serde_yaml::from_str(indoc::indoc! {r#"
1288        queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1289        deferred:
1290          queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1291    "#});
1292    assert!(test.is_err());
1293}