1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 panic,
6 sync::{Arc, LazyLock},
7 time::{Duration, Instant},
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12 Client as SqsClient,
13 operation::{
14 delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15 receive_message::ReceiveMessageError,
16 send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17 },
18 types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use vector_lib::{
32 codecs::decoding::FramingError,
33 config::{LegacyKey, LogNamespace, log_schema},
34 configurable::configurable_component,
35 event::MaybeAsLogMut,
36 internal_event::{
37 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
38 },
39 lookup::{PathPrefix, metadata_path, path},
40 source_sender::SendError,
41};
42
43use crate::{
44 SourceSender,
45 aws::AwsTimeout,
46 codecs::Decoder,
47 common::backoff::ExponentialBackoff,
48 config::{SourceAcknowledgementsConfig, SourceContext},
49 event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
50 internal_events::{
51 EventsReceived, S3ObjectProcessingFailed, S3ObjectProcessingSucceeded,
52 SqsMessageDeleteBatchError, SqsMessageDeletePartialError, SqsMessageDeleteSucceeded,
53 SqsMessageProcessingError, SqsMessageProcessingSucceeded, SqsMessageReceiveError,
54 SqsMessageReceiveSucceeded, SqsMessageSendBatchError, SqsMessageSentPartialError,
55 SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored, StreamClosedError,
56 },
57 line_agg::{self, LineAgg},
58 shutdown::ShutdownSignal,
59 sources::aws_s3::AwsS3Config,
60 tls::TlsConfig,
61};
62
63static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
64 LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
65
66#[serde_as]
68#[configurable_component]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub(super) struct DeferredConfig {
72 #[configurable(metadata(
74 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
75 ))]
76 #[configurable(validation(format = "uri"))]
77 pub(super) queue_url: String,
78
79 #[configurable(metadata(docs::type_unit = "seconds"))]
83 #[configurable(metadata(docs::examples = 3600))]
84 pub(super) max_age_secs: u64,
85}
86
87#[serde_as]
92#[configurable_component]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub(super) struct Config {
97 #[configurable(metadata(
99 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
100 ))]
101 #[configurable(validation(format = "uri"))]
102 pub(super) queue_url: String,
103
104 #[serde(default = "default_poll_secs")]
111 #[derivative(Default(value = "default_poll_secs()"))]
112 #[configurable(metadata(docs::type_unit = "seconds"))]
113 pub(super) poll_secs: u32,
114
115 #[serde(default = "default_visibility_timeout_secs")]
124 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
125 #[configurable(metadata(docs::type_unit = "seconds"))]
126 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
127 pub(super) visibility_timeout_secs: u32,
128
129 #[serde(default = "default_true")]
133 #[derivative(Default(value = "default_true()"))]
134 pub(super) delete_message: bool,
135
136 #[serde(default = "default_true")]
140 #[derivative(Default(value = "default_true()"))]
141 pub(super) delete_failed_message: bool,
142
143 #[configurable(metadata(docs::type_unit = "tasks"))]
153 #[configurable(metadata(docs::examples = 5))]
154 pub(super) client_concurrency: Option<NonZeroUsize>,
155
156 #[serde(default = "default_max_number_of_messages")]
164 #[derivative(Default(value = "default_max_number_of_messages()"))]
165 #[configurable(metadata(docs::human_name = "Max Messages"))]
166 #[configurable(metadata(docs::examples = 1))]
167 pub(super) max_number_of_messages: u32,
168
169 #[configurable(derived)]
170 #[serde(default)]
171 #[derivative(Default)]
172 pub(super) tls_options: Option<TlsConfig>,
173
174 #[configurable(derived)]
177 #[derivative(Default)]
178 #[serde(default)]
179 #[serde(flatten)]
180 pub(super) timeout: Option<AwsTimeout>,
181
182 #[configurable(derived)]
184 pub(super) deferred: Option<DeferredConfig>,
185}
186
187const fn default_poll_secs() -> u32 {
188 15
189}
190
191const fn default_visibility_timeout_secs() -> u32 {
192 300
193}
194
195const fn default_max_number_of_messages() -> u32 {
196 10
197}
198
199const fn default_true() -> bool {
200 true
201}
202
203#[derive(Debug, Snafu)]
204pub(super) enum IngestorNewError {
205 #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
206 InvalidNumberOfMessages { messages: u32 },
207}
208
209#[allow(clippy::large_enum_variant)]
210#[derive(Debug, Snafu)]
211pub enum ProcessingError {
212 #[snafu(display(
213 "Could not parse SQS message with id {} as S3 notification: {}",
214 message_id,
215 source
216 ))]
217 InvalidSqsMessage {
218 source: serde_json::Error,
219 message_id: String,
220 },
221 #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
222 GetObject {
223 source: SdkError<GetObjectError, HttpResponse>,
224 bucket: String,
225 key: String,
226 },
227 #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
228 ReadObject {
229 source: Box<dyn FramingError>,
230 bucket: String,
231 key: String,
232 },
233 #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
234 PipelineSend {
235 source: vector_lib::source_sender::SendError,
236 bucket: String,
237 key: String,
238 },
239 #[snafu(display(
240 "Object notification for s3://{}/{} is a bucket in another region: {}",
241 bucket,
242 key,
243 region
244 ))]
245 WrongRegion {
246 region: String,
247 bucket: String,
248 key: String,
249 },
250 #[snafu(display("Unsupported S3 event version: {}.", version,))]
251 UnsupportedS3EventVersion { version: semver::Version },
252 #[snafu(display(
253 "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
254 region,
255 bucket,
256 key
257 ))]
258 ErrorAcknowledgement {
259 region: String,
260 bucket: String,
261 key: String,
262 },
263 #[snafu(display(
264 "File s3://{}/{} too old. Forwarded to deferred queue {}",
265 bucket,
266 key,
267 deferred_queue
268 ))]
269 FileTooOld {
270 bucket: String,
271 key: String,
272 deferred_queue: String,
273 },
274}
275
276pub struct State {
277 region: Region,
278
279 s3_client: S3Client,
280 sqs_client: SqsClient,
281
282 multiline: Option<line_agg::Config>,
283 compression: super::Compression,
284
285 queue_url: String,
286 poll_secs: i32,
287 max_number_of_messages: i32,
288 client_concurrency: usize,
289 visibility_timeout_secs: i32,
290 delete_message: bool,
291 delete_failed_message: bool,
292 decoder: Decoder,
293
294 deferred: Option<DeferredConfig>,
295}
296
297pub(super) struct Ingestor {
298 state: Arc<State>,
299}
300
301impl Ingestor {
302 pub(super) async fn new(
303 region: Region,
304 sqs_client: SqsClient,
305 s3_client: S3Client,
306 config: Config,
307 compression: super::Compression,
308 multiline: Option<line_agg::Config>,
309 decoder: Decoder,
310 ) -> Result<Ingestor, IngestorNewError> {
311 if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
312 return Err(IngestorNewError::InvalidNumberOfMessages {
313 messages: config.max_number_of_messages,
314 });
315 }
316 let state = Arc::new(State {
317 region,
318
319 s3_client,
320 sqs_client,
321
322 compression,
323 multiline,
324
325 queue_url: config.queue_url,
326 poll_secs: config.poll_secs as i32,
327 max_number_of_messages: config.max_number_of_messages as i32,
328 client_concurrency: config
329 .client_concurrency
330 .map(|n| n.get())
331 .unwrap_or_else(crate::num_threads),
332 visibility_timeout_secs: config.visibility_timeout_secs as i32,
333 delete_message: config.delete_message,
334 delete_failed_message: config.delete_failed_message,
335 decoder,
336
337 deferred: config.deferred,
338 });
339
340 Ok(Ingestor { state })
341 }
342
343 pub(super) async fn run(
344 self,
345 cx: SourceContext,
346 acknowledgements: SourceAcknowledgementsConfig,
347 log_namespace: LogNamespace,
348 ) -> Result<(), ()> {
349 let acknowledgements = cx.do_acknowledgements(acknowledgements);
350 let mut handles = Vec::new();
351 for _ in 0..self.state.client_concurrency {
352 let process = IngestorProcess::new(
353 Arc::clone(&self.state),
354 cx.out.clone(),
355 cx.shutdown.clone(),
356 log_namespace,
357 acknowledgements,
358 );
359 let fut = process.run();
360 let handle = crate::spawn_in_current_span(fut);
361 handles.push(handle);
362 }
363
364 for handle in handles.drain(..) {
367 if let Err(e) = handle.await
368 && e.is_panic()
369 {
370 panic::resume_unwind(e.into_panic());
371 }
372 }
373
374 Ok(())
375 }
376}
377
378pub struct IngestorProcess {
379 state: Arc<State>,
380 out: SourceSender,
381 shutdown: ShutdownSignal,
382 acknowledgements: bool,
383 log_namespace: LogNamespace,
384 bytes_received: Registered<BytesReceived>,
385 events_received: Registered<EventsReceived>,
386 backoff: ExponentialBackoff,
387}
388
389impl IngestorProcess {
390 pub fn new(
391 state: Arc<State>,
392 out: SourceSender,
393 shutdown: ShutdownSignal,
394 log_namespace: LogNamespace,
395 acknowledgements: bool,
396 ) -> Self {
397 Self {
398 state,
399 out,
400 shutdown,
401 acknowledgements,
402 log_namespace,
403 bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
404 events_received: register!(EventsReceived),
405 backoff: ExponentialBackoff::default().max_delay(Duration::from_secs(30)),
406 }
407 }
408
409 async fn run(mut self) {
410 let shutdown = self.shutdown.clone().fuse();
411 pin!(shutdown);
412
413 loop {
414 select! {
415 _ = &mut shutdown => break,
416 result = self.run_once() => {
417 match result {
418 Ok(()) => {
419 self.backoff.reset();
421 }
422 Err(_) => {
423 let delay = self.backoff.next().expect("backoff never ends");
424 trace!(
425 delay_ms = delay.as_millis(),
426 "`run_once` failed, will retry after delay.",
427 );
428 tokio::time::sleep(delay).await;
429 }
430 }
431 },
432 }
433 }
434 }
435
436 async fn run_once(&mut self) -> Result<(), ()> {
437 let messages = match self.receive_messages().await {
438 Ok(messages) => {
439 emit!(SqsMessageReceiveSucceeded {
440 count: messages.len(),
441 });
442 messages
443 }
444 Err(err) => {
445 emit!(SqsMessageReceiveError { error: &err });
446 return Err(());
447 }
448 };
449
450 let mut delete_entries = Vec::new();
451 let mut deferred_entries = Vec::new();
452 for message in messages {
453 let receipt_handle = match message.receipt_handle {
454 None => {
455 warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
459 continue;
460 }
461 Some(ref handle) => handle.to_owned(),
462 };
463
464 let message_id = message
465 .message_id
466 .clone()
467 .unwrap_or_else(|| "<unknown>".to_owned());
468 match self.handle_sqs_message(message.clone()).await {
469 Ok(()) => {
470 emit!(SqsMessageProcessingSucceeded {
471 message_id: &message_id
472 });
473 if self.state.delete_message {
474 trace!(
475 message = "Queued SQS message for deletion.",
476 id = message_id,
477 receipt_handle = receipt_handle,
478 );
479 delete_entries.push(
480 DeleteMessageBatchRequestEntry::builder()
481 .id(message_id.clone())
482 .receipt_handle(receipt_handle)
483 .build()
484 .expect("all required builder params specified"),
485 );
486 }
487 }
488 Err(err) => {
489 match err {
490 ProcessingError::FileTooOld { .. } => {
491 emit!(SqsMessageProcessingSucceeded {
492 message_id: &message_id
493 });
494 if let Some(deferred) = &self.state.deferred {
495 trace!(
496 message = "Forwarding message to deferred queue.",
497 id = message_id,
498 receipt_handle = receipt_handle,
499 deferred_queue = deferred.queue_url,
500 );
501
502 deferred_entries.push(
503 SendMessageBatchRequestEntry::builder()
504 .id(message_id.clone())
505 .message_body(message.body.unwrap_or_default())
506 .build()
507 .expect("all required builder params specified"),
508 );
509 }
510 if self.state.delete_message {
512 trace!(
513 message = "Queued SQS message for deletion.",
514 id = message_id,
515 receipt_handle = receipt_handle,
516 );
517 delete_entries.push(
518 DeleteMessageBatchRequestEntry::builder()
519 .id(message_id)
520 .receipt_handle(receipt_handle)
521 .build()
522 .expect("all required builder params specified"),
523 );
524 }
525 }
526 _ => {
527 emit!(SqsMessageProcessingError {
528 message_id: &message_id,
529 error: &err,
530 });
531 }
532 }
533 }
534 }
535 }
536
537 if !deferred_entries.is_empty() {
539 let Some(deferred) = &self.state.deferred else {
540 warn!("Deferred queue not configured, but received deferred entries.");
541 return Ok(());
542 };
543 let cloned_entries = deferred_entries.clone();
544 match self
545 .send_messages(deferred_entries, deferred.queue_url.clone())
546 .await
547 {
548 Ok(result) => {
549 if !result.successful.is_empty() {
550 emit!(SqsMessageSentSucceeded {
551 message_ids: result.successful,
552 })
553 }
554
555 if !result.failed.is_empty() {
556 emit!(SqsMessageSentPartialError {
557 entries: result.failed
558 })
559 }
560 }
561 Err(err) => {
562 emit!(SqsMessageSendBatchError {
563 entries: cloned_entries,
564 error: err,
565 });
566 }
567 }
568 }
569 if !delete_entries.is_empty() {
570 let cloned_entries = delete_entries.clone();
572 match self.delete_messages(delete_entries).await {
573 Ok(result) => {
574 if !result.successful.is_empty() {
577 emit!(SqsMessageDeleteSucceeded {
578 message_ids: result.successful,
579 });
580 }
581
582 if !result.failed.is_empty() {
583 emit!(SqsMessageDeletePartialError {
584 entries: result.failed
585 });
586 }
587 }
588 Err(err) => {
589 emit!(SqsMessageDeleteBatchError {
590 entries: cloned_entries,
591 error: err,
592 });
593 }
594 }
595 }
596 Ok(())
597 }
598
599 async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
600 let sqs_body = message.body.unwrap_or_default();
601 let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
602 .map(|notification| notification.message)
603 .unwrap_or(sqs_body);
604 let s3_event: SqsEvent =
605 serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
606 message_id: message
607 .message_id
608 .clone()
609 .unwrap_or_else(|| "<empty>".to_owned()),
610 })?;
611
612 match s3_event {
613 SqsEvent::TestEvent(_s3_test_event) => {
614 debug!(?message.message_id, message = "Found S3 Test Event.");
615 Ok(())
616 }
617 SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
618 }
619 }
620
621 async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
622 for record in s3_event.records {
623 self.handle_s3_event_record(record, self.log_namespace)
624 .await?
625 }
626 Ok(())
627 }
628
629 async fn handle_s3_event_record(
630 &mut self,
631 s3_event: S3EventRecord,
632 log_namespace: LogNamespace,
633 ) -> Result<(), ProcessingError> {
634 let event_version: semver::Version = s3_event.event_version.clone().into();
635 if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
636 return Err(ProcessingError::UnsupportedS3EventVersion {
637 version: event_version.clone(),
638 });
639 }
640
641 if s3_event.event_name.kind != "ObjectCreated" {
642 emit!(SqsS3EventRecordInvalidEventIgnored {
643 bucket: &s3_event.s3.bucket.name,
644 key: &s3_event.s3.object.key,
645 kind: &s3_event.event_name.kind,
646 name: &s3_event.event_name.name,
647 });
648 return Ok(());
649 }
650
651 if self.state.region.as_ref() != s3_event.aws_region.as_str() {
654 return Err(ProcessingError::WrongRegion {
655 bucket: s3_event.s3.bucket.name.clone(),
656 key: s3_event.s3.object.key.clone(),
657 region: s3_event.aws_region,
658 });
659 }
660
661 if let Some(deferred) = &self.state.deferred {
662 let delta = Utc::now() - s3_event.event_time;
663 if delta.num_seconds() > deferred.max_age_secs as i64 {
664 return Err(ProcessingError::FileTooOld {
665 bucket: s3_event.s3.bucket.name.clone(),
666 key: s3_event.s3.object.key.clone(),
667 deferred_queue: deferred.queue_url.clone(),
668 });
669 }
670 }
671
672 let download_start = Instant::now();
673
674 let object_result = self
675 .state
676 .s3_client
677 .get_object()
678 .bucket(s3_event.s3.bucket.name.clone())
679 .key(s3_event.s3.object.key.clone())
680 .send()
681 .await
682 .context(GetObjectSnafu {
683 bucket: s3_event.s3.bucket.name.clone(),
684 key: s3_event.s3.object.key.clone(),
685 });
686
687 let object = object_result?;
688
689 debug!(
690 message = "Got S3 object from SQS notification.",
691 bucket = s3_event.s3.bucket.name,
692 key = s3_event.s3.object.key,
693 );
694
695 let metadata = object.metadata;
696
697 let timestamp = object.last_modified.map(|ts| {
698 Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
699 .single()
700 .expect("invalid timestamp")
701 });
702
703 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
704 let object_reader = super::s3_object_decoder(
705 self.state.compression,
706 &s3_event.s3.object.key,
707 object.content_encoding.as_deref(),
708 object.content_type.as_deref(),
709 object.body,
710 )
711 .await;
712
713 let mut read_error = None;
726 let bytes_received = self.bytes_received.clone();
727 let events_received = self.events_received.clone();
728 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
729 FramedRead::new(object_reader, self.state.decoder.framer.clone())
730 .map(|res| {
731 res.inspect(|bytes| {
732 bytes_received.emit(ByteSize(bytes.len()));
733 })
734 .map_err(|err| {
735 read_error = Some(err);
736 })
737 .ok()
738 })
739 .take_while(|res| ready(res.is_some()))
740 .map(|r| r.expect("validated by take_while")),
741 );
742
743 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
744 Some(config) => Box::new(
745 LineAgg::new(
746 lines.map(|line| ((), line, ())),
747 line_agg::Logic::new(config.clone()),
748 )
749 .map(|(_src, line, _context, _lastline_context)| line),
750 ),
751 None => lines,
752 };
753
754 let mut stream = lines.flat_map(|line| {
755 let events = match self.state.decoder.deserializer_parse(line) {
756 Ok((events, _events_size)) => events,
757 Err(_error) => {
758 SmallVec::new()
761 }
762 };
763
764 let events = events
765 .into_iter()
766 .map(|mut event: Event| {
767 event = event.with_batch_notifier_option(&batch);
768 if let Some(log_event) = event.maybe_as_log_mut() {
769 handle_single_log(
770 log_event,
771 log_namespace,
772 &s3_event,
773 &metadata,
774 timestamp,
775 );
776 }
777 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
778 event
779 })
780 .collect::<Vec<Event>>();
781 futures::stream::iter(events)
782 });
783
784 let send_error = match self.out.send_event_stream(&mut stream).await {
785 Ok(_) => None,
786 Err(SendError::Closed) => {
787 let (count, _) = stream.size_hint();
788 emit!(StreamClosedError { count });
789 Some(SendError::Closed)
790 }
791 Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
792 };
793
794 drop(stream);
797
798 let bucket = &s3_event.s3.bucket.name;
799 let duration = download_start.elapsed();
800
801 if read_error.is_some() {
802 emit!(S3ObjectProcessingFailed { bucket, duration });
803 } else {
804 emit!(S3ObjectProcessingSucceeded { bucket, duration });
805 }
806
807 drop(batch);
810
811 if let Some(error) = read_error {
812 Err(ProcessingError::ReadObject {
813 source: error,
814 bucket: s3_event.s3.bucket.name.clone(),
815 key: s3_event.s3.object.key.clone(),
816 })
817 } else if let Some(error) = send_error {
818 Err(ProcessingError::PipelineSend {
819 source: error,
820 bucket: s3_event.s3.bucket.name.clone(),
821 key: s3_event.s3.object.key.clone(),
822 })
823 } else {
824 match receiver {
825 None => Ok(()),
826 Some(receiver) => {
827 let result = receiver.await;
828 match result {
829 BatchStatus::Delivered => {
830 debug!(
831 message = "S3 object from SQS delivered.",
832 bucket = s3_event.s3.bucket.name,
833 key = s3_event.s3.object.key,
834 );
835 Ok(())
836 }
837 BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
838 bucket: s3_event.s3.bucket.name,
839 key: s3_event.s3.object.key,
840 region: s3_event.aws_region,
841 }),
842 BatchStatus::Rejected => {
843 if self.state.delete_failed_message {
844 warn!(
845 message =
846 "S3 object from SQS was rejected. Deleting failed message.",
847 bucket = s3_event.s3.bucket.name,
848 key = s3_event.s3.object.key,
849 );
850 Ok(())
851 } else {
852 Err(ProcessingError::ErrorAcknowledgement {
853 bucket: s3_event.s3.bucket.name,
854 key: s3_event.s3.object.key,
855 region: s3_event.aws_region,
856 })
857 }
858 }
859 }
860 }
861 }
862 }
863 }
864
865 async fn receive_messages(
866 &mut self,
867 ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
868 self.state
869 .sqs_client
870 .receive_message()
871 .queue_url(self.state.queue_url.clone())
872 .max_number_of_messages(self.state.max_number_of_messages)
873 .visibility_timeout(self.state.visibility_timeout_secs)
874 .wait_time_seconds(self.state.poll_secs)
875 .send()
876 .map_ok(|res| res.messages.unwrap_or_default())
877 .await
878 }
879
880 async fn delete_messages(
881 &mut self,
882 entries: Vec<DeleteMessageBatchRequestEntry>,
883 ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
884 self.state
885 .sqs_client
886 .delete_message_batch()
887 .queue_url(self.state.queue_url.clone())
888 .set_entries(Some(entries))
889 .send()
890 .await
891 }
892
893 async fn send_messages(
894 &mut self,
895 entries: Vec<SendMessageBatchRequestEntry>,
896 queue_url: String,
897 ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
898 self.state
899 .sqs_client
900 .send_message_batch()
901 .queue_url(queue_url.clone())
902 .set_entries(Some(entries))
903 .send()
904 .await
905 }
906}
907
908fn handle_single_log(
909 log: &mut LogEvent,
910 log_namespace: LogNamespace,
911 s3_event: &S3EventRecord,
912 metadata: &Option<HashMap<String, String>>,
913 timestamp: Option<DateTime<Utc>>,
914) {
915 log_namespace.insert_source_metadata(
916 AwsS3Config::NAME,
917 log,
918 Some(LegacyKey::Overwrite(path!("bucket"))),
919 path!("bucket"),
920 Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
921 );
922
923 log_namespace.insert_source_metadata(
924 AwsS3Config::NAME,
925 log,
926 Some(LegacyKey::Overwrite(path!("object"))),
927 path!("object"),
928 Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
929 );
930 log_namespace.insert_source_metadata(
931 AwsS3Config::NAME,
932 log,
933 Some(LegacyKey::Overwrite(path!("region"))),
934 path!("region"),
935 Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
936 );
937
938 if let Some(metadata) = metadata {
939 for (key, value) in metadata {
940 log_namespace.insert_source_metadata(
941 AwsS3Config::NAME,
942 log,
943 Some(LegacyKey::Overwrite(path!(key))),
944 path!("metadata", key.as_str()),
945 value.clone(),
946 );
947 }
948 }
949
950 log_namespace.insert_vector_metadata(
951 log,
952 log_schema().source_type_key(),
953 path!("source_type"),
954 Bytes::from_static(AwsS3Config::NAME.as_bytes()),
955 );
956
957 match log_namespace {
961 LogNamespace::Vector => {
962 if let Some(timestamp) = timestamp {
963 log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
964 }
965
966 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
967 }
968 LogNamespace::Legacy => {
969 if let Some(timestamp_key) = log_schema().timestamp_key() {
970 log.try_insert(
971 (PathPrefix::Event, timestamp_key),
972 timestamp.unwrap_or_else(Utc::now),
973 );
974 }
975 }
976 };
977}
978
979#[derive(Clone, Debug, Deserialize)]
981#[serde(rename_all = "PascalCase")]
982pub struct SnsNotification {
983 pub message: String,
984 pub timestamp: DateTime<Utc>,
985}
986
987#[derive(Clone, Debug, Deserialize)]
989#[serde(untagged)]
990enum SqsEvent {
991 Event(S3Event),
992 TestEvent(S3TestEvent),
993}
994
995#[derive(Clone, Debug, Deserialize)]
996#[serde(rename_all = "PascalCase")]
997pub struct S3TestEvent {
998 pub service: String,
999 pub event: S3EventName,
1000 pub bucket: String,
1001}
1002
1003#[derive(Clone, Debug, Deserialize, Serialize)]
1005#[serde(rename_all = "PascalCase")]
1006pub struct S3Event {
1007 pub records: Vec<S3EventRecord>,
1008}
1009
1010#[derive(Clone, Debug, Deserialize, Serialize)]
1011#[serde(rename_all = "camelCase")]
1012pub struct S3EventRecord {
1013 pub event_version: S3EventVersion,
1014 pub event_source: String,
1015 pub aws_region: String,
1016 pub event_name: S3EventName,
1017 pub event_time: DateTime<Utc>,
1018
1019 pub s3: S3Message,
1020}
1021
1022#[derive(Clone, Debug)]
1023pub struct S3EventVersion {
1024 pub major: u64,
1025 pub minor: u64,
1026}
1027
1028impl From<S3EventVersion> for semver::Version {
1029 fn from(v: S3EventVersion) -> semver::Version {
1030 semver::Version::new(v.major, v.minor, 0)
1031 }
1032}
1033
1034impl<'de> Deserialize<'de> for S3EventVersion {
1037 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1038 where
1039 D: Deserializer<'de>,
1040 {
1041 use serde::de::Error;
1042
1043 let s = String::deserialize(deserializer)?;
1044
1045 let mut parts = s.splitn(2, '.');
1046
1047 let major = parts
1048 .next()
1049 .ok_or_else(|| D::Error::custom("Missing major version number"))?
1050 .parse::<u64>()
1051 .map_err(D::Error::custom)?;
1052
1053 let minor = parts
1054 .next()
1055 .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1056 .parse::<u64>()
1057 .map_err(D::Error::custom)?;
1058
1059 Ok(S3EventVersion { major, minor })
1060 }
1061}
1062
1063impl Serialize for S3EventVersion {
1064 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1065 where
1066 S: Serializer,
1067 {
1068 serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1069 }
1070}
1071
1072#[derive(Clone, Debug)]
1073pub struct S3EventName {
1074 pub kind: String,
1075 pub name: String,
1076}
1077
1078impl<'de> Deserialize<'de> for S3EventName {
1083 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1084 where
1085 D: Deserializer<'de>,
1086 {
1087 use serde::de::Error;
1088
1089 let s = String::deserialize(deserializer)?;
1090
1091 let mut parts = s.splitn(2, ':');
1092
1093 let kind = parts
1094 .next()
1095 .ok_or_else(|| D::Error::custom("Missing event kind"))?
1096 .parse::<String>()
1097 .map_err(D::Error::custom)?;
1098
1099 let name = parts
1100 .next()
1101 .ok_or_else(|| D::Error::custom("Missing event name"))?
1102 .parse::<String>()
1103 .map_err(D::Error::custom)?;
1104
1105 Ok(S3EventName { kind, name })
1106 }
1107}
1108
1109impl Serialize for S3EventName {
1110 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1111 where
1112 S: Serializer,
1113 {
1114 serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1115 }
1116}
1117
1118#[derive(Clone, Debug, Deserialize, Serialize)]
1119#[serde(rename_all = "camelCase")]
1120pub struct S3Message {
1121 pub bucket: S3Bucket,
1122 pub object: S3Object,
1123}
1124
1125#[derive(Clone, Debug, Deserialize, Serialize)]
1126#[serde(rename_all = "camelCase")]
1127pub struct S3Bucket {
1128 pub name: String,
1129}
1130
1131#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1132#[serde(rename_all = "camelCase")]
1133pub struct S3Object {
1134 #[serde(with = "urlencoded_string")]
1137 pub key: String,
1138}
1139
1140mod urlencoded_string {
1141 use percent_encoding::{percent_decode, utf8_percent_encode};
1142
1143 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1144 where
1145 D: serde::de::Deserializer<'de>,
1146 {
1147 use serde::de::Error;
1148
1149 serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1150 let decoded = if s.contains(&b'+') {
1151 let s = s
1153 .iter()
1154 .map(|c| if *c == b'+' { b' ' } else { *c })
1155 .collect::<Vec<_>>();
1156 percent_decode(&s).decode_utf8().map(Into::into)
1157 } else {
1158 percent_decode(s).decode_utf8().map(Into::into)
1159 };
1160
1161 decoded
1162 .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1163 })
1164 }
1165
1166 pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1167 where
1168 S: serde::ser::Serializer,
1169 {
1170 serializer.serialize_str(
1171 &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1172 )
1173 }
1174}
1175
1176#[test]
1177fn test_key_deserialize() {
1178 let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1179 assert_eq!(
1180 S3Object {
1181 key: "noog nork".to_string(),
1182 },
1183 value
1184 );
1185
1186 let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1187 assert_eq!(
1188 S3Object {
1189 key: "noog+nork".to_string(),
1190 },
1191 value
1192 );
1193}
1194
1195#[test]
1196fn test_s3_testevent() {
1197 let value: S3TestEvent = serde_json::from_str(
1198 r#"{
1199 "Service":"Amazon S3",
1200 "Event":"s3:TestEvent",
1201 "Time":"2014-10-13T15:57:02.089Z",
1202 "Bucket":"bucketname",
1203 "RequestId":"5582815E1AEA5ADF",
1204 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1205 }"#,
1206 )
1207 .unwrap();
1208
1209 assert_eq!(value.service, "Amazon S3".to_string());
1210 assert_eq!(value.bucket, "bucketname".to_string());
1211 assert_eq!(value.event.kind, "s3".to_string());
1212 assert_eq!(value.event.name, "TestEvent".to_string());
1213}
1214
1215#[test]
1216fn test_s3_sns_testevent() {
1217 let sns_value: SnsNotification = serde_json::from_str(
1218 r#"{
1219 "Type" : "Notification",
1220 "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1221 "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1222 "Subject" : "Testing publish to subscribed queues",
1223 "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1224 "Timestamp" : "2012-03-29T05:12:16.901Z",
1225 "SignatureVersion" : "1",
1226 "Signature" : "EXAMPLEnTrFPa3...",
1227 "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1228 "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1229 }"#,
1230 ).unwrap();
1231
1232 assert_eq!(
1233 sns_value.timestamp,
1234 DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1235 .unwrap()
1236 .to_utc()
1237 );
1238
1239 let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1240
1241 assert_eq!(value.service, "Amazon S3".to_string());
1242 assert_eq!(value.bucket, "bucketname".to_string());
1243 assert_eq!(value.event.kind, "s3".to_string());
1244 assert_eq!(value.event.name, "TestEvent".to_string());
1245}
1246
1247#[test]
1248fn parse_sqs_config() {
1249 let config: Config = serde_yaml::from_str(
1250 r#"queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1251"#,
1252 )
1253 .unwrap();
1254 assert_eq!(
1255 config.queue_url,
1256 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1257 );
1258 assert!(config.deferred.is_none());
1259
1260 let config: Config = serde_yaml::from_str(indoc::indoc! {r#"
1261 queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1262 deferred:
1263 queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1264 max_age_secs: 3600
1265 "#})
1266 .unwrap();
1267 assert_eq!(
1268 config.queue_url,
1269 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1270 );
1271 let Some(deferred) = config.deferred else {
1272 panic!("Expected deferred config");
1273 };
1274 assert_eq!(
1275 deferred.queue_url,
1276 "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1277 );
1278 assert_eq!(deferred.max_age_secs, 3600);
1279
1280 let test: Result<Config, serde_yaml::Error> = serde_yaml::from_str(indoc::indoc! {r#"
1281 queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1282 deferred:
1283 max_age_secs: 3600
1284 "#});
1285 assert!(test.is_err());
1286
1287 let test: Result<Config, serde_yaml::Error> = serde_yaml::from_str(indoc::indoc! {r#"
1288 queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1289 deferred:
1290 queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1291 "#});
1292 assert!(test.is_err());
1293}