1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
7use vector_lib::{
8 TimeZone,
9 codecs::{
10 EncoderKind, TextSerializerConfig,
11 encoding::{Framer, FramingConfig},
12 },
13 configurable::configurable_component,
14 sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19 aws::{AwsAuthentication, RegionOrEndpoint},
20 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22 sinks::{
23 Healthcheck,
24 s3_common::{
25 self,
26 config::{RetryStrategy, S3Options},
27 partitioner::S3KeyPartitioner,
28 service::S3Service,
29 sink::S3Sink,
30 },
31 util::{
32 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33 TowerRequestConfig, timezone_to_offset,
34 },
35 },
36 template::Template,
37 tls::TlsConfig,
38};
39
40#[cfg(feature = "codecs-parquet")]
42#[configurable_component]
43#[derive(Clone, Debug)]
44#[serde(tag = "codec", rename_all = "snake_case")]
45#[configurable(metadata(
46 docs::enum_tag_description = "The codec to use for batch encoding events."
47))]
48pub enum S3BatchEncoding {
49 Parquet(ParquetSerializerConfig),
53}
54
55#[configurable_component(sink(
57 "aws_s3",
58 "Store observability events in the AWS S3 object storage system."
59))]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct S3SinkConfig {
63 #[configurable(metadata(docs::examples = "my-bucket"))]
67 pub bucket: String,
68
69 #[serde(default = "default_key_prefix")]
75 #[configurable(metadata(docs::templateable))]
76 #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
77 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
78 #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
79 pub key_prefix: String,
80
81 #[serde(default = "default_filename_time_format")]
98 pub filename_time_format: String,
99
100 #[serde(default = "crate::serde::default_true")]
109 #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
110 pub filename_append_uuid: bool,
111
112 #[configurable(metadata(docs::examples = "json"))]
116 pub filename_extension: Option<String>,
117
118 #[serde(flatten)]
119 pub options: S3Options,
120
121 #[serde(flatten)]
122 pub region: RegionOrEndpoint,
123
124 #[serde(flatten)]
125 pub encoding: EncodingConfigWithFraming,
126
127 #[cfg(feature = "codecs-parquet")]
133 #[configurable(derived)]
134 #[serde(default)]
135 pub batch_encoding: Option<S3BatchEncoding>,
136
137 #[configurable(derived)]
144 #[serde(default = "Compression::gzip_default")]
145 pub compression: Compression,
146
147 #[configurable(derived)]
148 #[serde(default)]
149 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
150
151 #[configurable(derived)]
152 #[serde(default)]
153 pub request: TowerRequestConfig,
154
155 #[configurable(derived)]
156 pub tls: Option<TlsConfig>,
157
158 #[configurable(derived)]
159 #[serde(default)]
160 pub auth: AwsAuthentication,
161
162 #[configurable(derived)]
163 #[serde(
164 default,
165 deserialize_with = "crate::serde::bool_or_struct",
166 skip_serializing_if = "crate::serde::is_default"
167 )]
168 pub acknowledgements: AcknowledgementsConfig,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 pub timezone: Option<TimeZone>,
173
174 #[serde(default = "crate::serde::default_true")]
178 pub force_path_style: bool,
179
180 #[configurable(derived)]
185 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
186 pub retry_strategy: RetryStrategy,
187}
188
189pub(super) fn default_key_prefix() -> String {
190 "date=%F".to_string()
191}
192
193pub(super) fn default_filename_time_format() -> String {
194 "%s".to_string()
195}
196
197impl GenerateConfig for S3SinkConfig {
198 fn generate_config() -> toml::Value {
199 toml::Value::try_from(Self {
200 bucket: "".to_owned(),
201 key_prefix: default_key_prefix(),
202 filename_time_format: default_filename_time_format(),
203 filename_append_uuid: true,
204 filename_extension: None,
205 options: S3Options::default(),
206 region: RegionOrEndpoint::default(),
207 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
208 #[cfg(feature = "codecs-parquet")]
209 batch_encoding: None,
210 compression: Compression::gzip_default(),
211 batch: BatchConfig::default(),
212 request: TowerRequestConfig::default(),
213 tls: Some(TlsConfig::default()),
214 auth: AwsAuthentication::default(),
215 acknowledgements: Default::default(),
216 timezone: Default::default(),
217 force_path_style: Default::default(),
218 retry_strategy: Default::default(),
219 })
220 .unwrap()
221 }
222}
223
224#[async_trait::async_trait]
225#[typetag::serde(name = "aws_s3")]
226impl SinkConfig for S3SinkConfig {
227 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
228 let service = self.create_service(&cx.proxy).await?;
229 let healthcheck = self.build_healthcheck(service.client())?;
230 let sink = self.build_processor(service, cx)?;
231 Ok((sink, healthcheck))
232 }
233
234 fn input(&self) -> Input {
235 #[cfg(feature = "codecs-parquet")]
236 if let Some(batch_encoding) = &self.batch_encoding {
237 let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
238 let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
239 return Input::new(resolved.input_type());
240 }
241 Input::new(self.encoding.config().1.input_type())
242 }
243
244 fn acknowledgements(&self) -> &AcknowledgementsConfig {
245 &self.acknowledgements
246 }
247}
248
249impl S3SinkConfig {
250 pub fn build_processor(
251 &self,
252 service: S3Service,
253 cx: SinkContext,
254 ) -> crate::Result<VectorSink> {
255 let request_limits = self.request.into_settings();
260 let retry_strategy = self.retry_strategy.clone();
261 let service = ServiceBuilder::new()
262 .settings(request_limits, retry_strategy)
263 .service(service);
264
265 let offset = self
266 .timezone
267 .or(cx.globals.timezone)
268 .and_then(timezone_to_offset);
269
270 let batch_settings = self.batch.into_batcher_settings()?;
272
273 let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
274
275 let ssekms_key_id = self
276 .options
277 .ssekms_key_id
278 .as_ref()
279 .cloned()
280 .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
281 .transpose()?;
282
283 let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
284
285 let transformer = self.encoding.transformer();
286
287 #[cfg(feature = "codecs-parquet")]
290 if let Some(batch_encoding) = &self.batch_encoding {
291 let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
292 let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());
293
294 let batch_serializer = resolved_batch_config.build_batch_serializer()?;
295 let batch_encoder = BatchEncoder::new(batch_serializer);
296
297 let mut api_options = self.options.clone();
300 if api_options.content_type.is_none() {
301 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
302 }
303
304 let encoder = EncoderKind::Batch(batch_encoder);
305
306 let filename_extension = self.filename_extension.clone().or_else(|| {
307 Some(
308 match batch_encoding {
309 S3BatchEncoding::Parquet(_) => "parquet",
310 }
311 .to_string(),
312 )
313 });
314
315 if self.compression != Compression::None {
316 warn!("Top level compression setting ignored when batch_encoding set to parquet.")
317 }
318
319 let request_options = S3RequestOptions {
320 bucket: self.bucket.clone(),
321 api_options,
322 filename_extension,
323 filename_time_format: self.filename_time_format.clone(),
324 filename_append_uuid: self.filename_append_uuid,
325 encoder: (transformer, encoder),
326 compression: Compression::None,
328 filename_tz_offset: offset,
329 };
330
331 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
332 return Ok(VectorSink::from_event_streamsink(sink));
333 }
334
335 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
336 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
337
338 let request_options = S3RequestOptions {
339 bucket: self.bucket.clone(),
340 api_options: self.options.clone(),
341 filename_extension: self.filename_extension.clone(),
342 filename_time_format: self.filename_time_format.clone(),
343 filename_append_uuid: self.filename_append_uuid,
344 encoder: (transformer, encoder),
345 compression: self.compression,
346 filename_tz_offset: offset,
347 };
348
349 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
350
351 Ok(VectorSink::from_event_streamsink(sink))
352 }
353
354 pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
355 s3_common::config::build_healthcheck(self.bucket.clone(), client)
356 }
357
358 pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
359 s3_common::config::create_service(
360 &self.region,
361 &self.auth,
362 proxy,
363 self.tls.as_ref(),
364 self.force_path_style,
365 )
366 .await
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::S3SinkConfig;
373
374 #[test]
375 fn generate_config() {
376 crate::test_util::test_generate_config::<S3SinkConfig>();
377 }
378
379 #[cfg(feature = "codecs-parquet")]
381 #[test]
382 fn parquet_batch_encoding_correct_toml_shape() {
383 let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
384 bucket: test-bucket
385 compression: none
386 encoding:
387 codec: text
388 batch_encoding:
389 schema_mode: auto_infer
390 codec: parquet
391 compression:
392 algorithm: snappy
393 "#})
394 .expect("correct batch_encoding shape should parse");
395
396 let batch_enc = config
397 .batch_encoding
398 .expect("batch_encoding should be Some");
399 let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
400 use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
401 assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
402 assert_eq!(p.compression, ParquetCompression::Snappy);
403 }
404
405 #[cfg(feature = "codecs-parquet")]
408 #[test]
409 fn parquet_content_type_auto_detected() {
410 use vector_lib::codecs::encoding::format::{
411 ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
412 };
413
414 use crate::sinks::s3_common::config::S3Options;
415 use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
416 use vector_lib::codecs::TextSerializerConfig;
417 use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
418
419 let parquet_config = ParquetSerializerConfig {
420 schema_mode: ParquetSchemaMode::AutoInfer,
421 compression: ParquetCompression::Snappy,
422 ..Default::default()
423 };
424
425 let config = S3SinkConfig {
426 bucket: "test".to_string(),
427 key_prefix: super::default_key_prefix(),
428 filename_time_format: super::default_filename_time_format(),
429 filename_append_uuid: true,
430 filename_extension: None,
431 options: S3Options::default(),
432 region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
433 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
434 batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
435 compression: Compression::None,
436 batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
437 request: Default::default(),
438 tls: Default::default(),
439 auth: Default::default(),
440 acknowledgements: Default::default(),
441 timezone: Default::default(),
442 force_path_style: true,
443 retry_strategy: Default::default(),
444 };
445
446 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
447 let batch_config = BatchSerializerConfig::Parquet(p.clone());
448 let batch_serializer = batch_config.build_batch_serializer().unwrap();
449 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
450
451 let mut api_options = config.options.clone();
452 if api_options.content_type.is_none() {
453 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
454 }
455
456 assert_eq!(
457 api_options.content_type.as_deref(),
458 Some("application/vnd.apache.parquet"),
459 "Content-Type must be auto-detected for Parquet"
460 );
461 }
462
463 #[cfg(feature = "codecs-parquet")]
465 #[test]
466 fn parquet_content_type_user_override_preserved() {
467 let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
468 bucket: test-bucket
469 compression: none
470 content_type: "application/octet-stream"
471 encoding:
472 codec: text
473 batch_encoding:
474 codec: parquet
475 schema_mode: auto_infer
476 compression:
477 algorithm: gzip
478 level: 9
479 "#})
480 .unwrap();
481
482 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
483 let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
484 let batch_serializer = batch_config.build_batch_serializer().unwrap();
485 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
486
487 let mut api_options = config.options.clone();
488 if api_options.content_type.is_none() {
489 api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
490 }
491
492 assert_eq!(
493 api_options.content_type.as_deref(),
494 Some("application/octet-stream"),
495 "User-specified Content-Type must not be overridden"
496 );
497 }
498
499 #[cfg(feature = "codecs-parquet")]
502 #[test]
503 fn parquet_batch_encoding_rejects_unsupported_codec() {
504 let err = serde_yaml::from_str::<S3SinkConfig>(
505 r#"
506 bucket: test-bucket
507 compression: none
508 encoding:
509 codec: text
510 batch_encoding:
511 codec: arrow_stream
512 "#,
513 )
514 .unwrap_err();
515
516 assert!(
517 err.to_string().contains("arrow_stream"),
518 "expected error to mention the offending codec, got: {err}"
519 );
520 }
521
522 #[cfg(feature = "codecs-parquet")]
524 #[test]
525 fn parquet_filename_extension_user_override() {
526 let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
527 bucket: test-bucket
528 compression: none
529 filename_extension: pq
530 encoding:
531 codec: text
532 batch_encoding:
533 codec: parquet
534 schema_mode: auto_infer
535 "#})
536 .unwrap();
537
538 assert_eq!(config.filename_extension.as_deref(), Some("pq"));
539 }
540
541 #[cfg(feature = "codecs-parquet")]
543 #[test]
544 fn parquet_schema_mode_defaults_to_relaxed() {
545 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
546
547 let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
548 bucket: test-bucket
549 compression: none
550 encoding:
551 codec: text
552 batch_encoding:
553 codec: parquet
554 "#})
555 .unwrap();
556
557 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
558 assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
559 }
560
561 #[cfg(feature = "codecs-parquet")]
563 #[test]
564 fn parquet_schema_mode_strict_parsed() {
565 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
566
567 let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
568 bucket: test-bucket
569 compression: none
570 encoding:
571 codec: text
572 batch_encoding:
573 codec: parquet
574 schema_mode: strict
575 schema_file: tmp/something.schema
576 "#})
577 .unwrap();
578
579 let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
580 assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
581 }
582}