Skip to main content

vector/sinks/aws_s3/
config.rs

1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
7use vector_lib::{
8    TimeZone,
9    codecs::{
10        EncoderKind, TextSerializerConfig,
11        encoding::{Framer, FramingConfig},
12    },
13    configurable::configurable_component,
14    sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19    aws::{AwsAuthentication, RegionOrEndpoint},
20    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22    sinks::{
23        Healthcheck,
24        s3_common::{
25            self,
26            config::{RetryStrategy, S3Options},
27            partitioner::S3KeyPartitioner,
28            service::S3Service,
29            sink::S3Sink,
30        },
31        util::{
32            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33            TowerRequestConfig, timezone_to_offset,
34        },
35    },
36    template::Template,
37    tls::TlsConfig,
38};
39
40/// Batch encoding configuration for the `aws_s3` sink.
41#[cfg(feature = "codecs-parquet")]
42#[configurable_component]
43#[derive(Clone, Debug)]
44#[serde(tag = "codec", rename_all = "snake_case")]
45#[configurable(metadata(
46    docs::enum_tag_description = "The codec to use for batch encoding events."
47))]
48pub enum S3BatchEncoding {
49    /// Encodes events in [Apache Parquet][apache_parquet] columnar format.
50    ///
51    /// [apache_parquet]: https://parquet.apache.org/
52    Parquet(ParquetSerializerConfig),
53}
54
55/// Configuration for the `aws_s3` sink.
56#[configurable_component(sink(
57    "aws_s3",
58    "Store observability events in the AWS S3 object storage system."
59))]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct S3SinkConfig {
63    /// The S3 bucket name.
64    ///
65    /// This must not include a leading `s3://` or a trailing `/`.
66    #[configurable(metadata(docs::examples = "my-bucket"))]
67    pub bucket: String,
68
69    /// A prefix to apply to all object keys.
70    ///
71    /// Prefixes are useful for partitioning objects, such as by creating an object key that
72    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
73    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
74    #[serde(default = "default_key_prefix")]
75    #[configurable(metadata(docs::templateable))]
76    #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
77    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
78    #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
79    pub key_prefix: String,
80
81    /// The timestamp format for the time component of the object key.
82    ///
83    /// By default, object keys are appended with a timestamp that reflects when the objects are
84    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
85    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
86    ///
87    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
88    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
89    /// timestamps in seconds since the Unix epoch.
90    ///
91    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
92    /// languages.
93    ///
94    /// When set to an empty string, no timestamp is appended to the key prefix.
95    ///
96    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
97    #[serde(default = "default_filename_time_format")]
98    pub filename_time_format: String,
99
100    /// Whether or not to append a UUID v4 token to the end of the object key.
101    ///
102    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
103    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
104    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
105    ///
106    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
107    /// object keys must be unique.
108    #[serde(default = "crate::serde::default_true")]
109    #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
110    pub filename_append_uuid: bool,
111
112    /// The filename extension to use in the object key.
113    ///
114    /// This overrides setting the extension based on the configured `compression`.
115    #[configurable(metadata(docs::examples = "json"))]
116    pub filename_extension: Option<String>,
117
118    #[serde(flatten)]
119    pub options: S3Options,
120
121    #[serde(flatten)]
122    pub region: RegionOrEndpoint,
123
124    #[serde(flatten)]
125    pub encoding: EncodingConfigWithFraming,
126
127    /// Batch encoding configuration for columnar formats.
128    ///
129    /// When set, events are encoded together as a batch in a columnar format (Parquet)
130    /// instead of the standard per-event framing-based encoding. The columnar format handles
131    /// its own internal compression, so the top-level `compression` setting is bypassed.
132    #[cfg(feature = "codecs-parquet")]
133    #[configurable(derived)]
134    #[serde(default)]
135    pub batch_encoding: Option<S3BatchEncoding>,
136
137    /// Compression configuration.
138    ///
139    /// All compression algorithms use the default compression level unless otherwise specified.
140    ///
141    /// Some cloud storage API clients and browsers handle decompression transparently, so
142    /// depending on how they are accessed, files may not always appear to be compressed.
143    #[configurable(derived)]
144    #[serde(default = "Compression::gzip_default")]
145    pub compression: Compression,
146
147    #[configurable(derived)]
148    #[serde(default)]
149    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
150
151    #[configurable(derived)]
152    #[serde(default)]
153    pub request: TowerRequestConfig,
154
155    #[configurable(derived)]
156    pub tls: Option<TlsConfig>,
157
158    #[configurable(derived)]
159    #[serde(default)]
160    pub auth: AwsAuthentication,
161
162    #[configurable(derived)]
163    #[serde(
164        default,
165        deserialize_with = "crate::serde::bool_or_struct",
166        skip_serializing_if = "crate::serde::is_default"
167    )]
168    pub acknowledgements: AcknowledgementsConfig,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    pub timezone: Option<TimeZone>,
173
174    /// Specifies which addressing style to use.
175    ///
176    /// This controls if the bucket name is in the hostname or part of the URL.
177    #[serde(default = "crate::serde::default_true")]
178    pub force_path_style: bool,
179
180    /// Specifies retry strategy for failed requests.
181    ///
182    /// By default, the sink only retries attempts it deems possible to retry.
183    /// These settings extend the default behavior.
184    #[configurable(derived)]
185    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
186    pub retry_strategy: RetryStrategy,
187}
188
189pub(super) fn default_key_prefix() -> String {
190    "date=%F".to_string()
191}
192
193pub(super) fn default_filename_time_format() -> String {
194    "%s".to_string()
195}
196
197impl GenerateConfig for S3SinkConfig {
198    fn generate_config() -> toml::Value {
199        toml::Value::try_from(Self {
200            bucket: "".to_owned(),
201            key_prefix: default_key_prefix(),
202            filename_time_format: default_filename_time_format(),
203            filename_append_uuid: true,
204            filename_extension: None,
205            options: S3Options::default(),
206            region: RegionOrEndpoint::default(),
207            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
208            #[cfg(feature = "codecs-parquet")]
209            batch_encoding: None,
210            compression: Compression::gzip_default(),
211            batch: BatchConfig::default(),
212            request: TowerRequestConfig::default(),
213            tls: Some(TlsConfig::default()),
214            auth: AwsAuthentication::default(),
215            acknowledgements: Default::default(),
216            timezone: Default::default(),
217            force_path_style: Default::default(),
218            retry_strategy: Default::default(),
219        })
220        .unwrap()
221    }
222}
223
224#[async_trait::async_trait]
225#[typetag::serde(name = "aws_s3")]
226impl SinkConfig for S3SinkConfig {
227    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
228        let service = self.create_service(&cx.proxy).await?;
229        let healthcheck = self.build_healthcheck(service.client())?;
230        let sink = self.build_processor(service, cx)?;
231        Ok((sink, healthcheck))
232    }
233
234    fn input(&self) -> Input {
235        #[cfg(feature = "codecs-parquet")]
236        if let Some(batch_encoding) = &self.batch_encoding {
237            let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
238            let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
239            return Input::new(resolved.input_type());
240        }
241        Input::new(self.encoding.config().1.input_type())
242    }
243
244    fn acknowledgements(&self) -> &AcknowledgementsConfig {
245        &self.acknowledgements
246    }
247}
248
249impl S3SinkConfig {
250    pub fn build_processor(
251        &self,
252        service: S3Service,
253        cx: SinkContext,
254    ) -> crate::Result<VectorSink> {
255        // Build our S3 client/service, which is what we'll ultimately feed
256        // requests into in order to ship files to S3.  We build this here in
257        // order to configure the client/service with retries, concurrency
258        // limits, rate limits, and whatever else the client should have.
259        let request_limits = self.request.into_settings();
260        let retry_strategy = self.retry_strategy.clone();
261        let service = ServiceBuilder::new()
262            .settings(request_limits, retry_strategy)
263            .service(service);
264
265        let offset = self
266            .timezone
267            .or(cx.globals.timezone)
268            .and_then(timezone_to_offset);
269
270        // Configure our partitioning/batching.
271        let batch_settings = self.batch.into_batcher_settings()?;
272
273        let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
274
275        let ssekms_key_id = self
276            .options
277            .ssekms_key_id
278            .as_ref()
279            .cloned()
280            .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
281            .transpose()?;
282
283        let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
284
285        let transformer = self.encoding.transformer();
286
287        // When batch_encoding is configured (e.g., Parquet), use batch mode
288        // with internal compression and appropriate file extension.
289        #[cfg(feature = "codecs-parquet")]
290        if let Some(batch_encoding) = &self.batch_encoding {
291            let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
292            let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());
293
294            let batch_serializer = resolved_batch_config.build_batch_serializer()?;
295            let batch_encoder = BatchEncoder::new(batch_serializer);
296
297            // Auto-detect Content-Type from batch format. Users can still
298            // override via `options.content_type`; we only set it when unset.
299            let mut api_options = self.options.clone();
300            if api_options.content_type.is_none() {
301                api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
302            }
303
304            let encoder = EncoderKind::Batch(batch_encoder);
305
306            let filename_extension = self.filename_extension.clone().or_else(|| {
307                Some(
308                    match batch_encoding {
309                        S3BatchEncoding::Parquet(_) => "parquet",
310                    }
311                    .to_string(),
312                )
313            });
314
315            if self.compression != Compression::None {
316                warn!("Top level compression setting ignored when batch_encoding set to parquet.")
317            }
318
319            let request_options = S3RequestOptions {
320                bucket: self.bucket.clone(),
321                api_options,
322                filename_extension,
323                filename_time_format: self.filename_time_format.clone(),
324                filename_append_uuid: self.filename_append_uuid,
325                encoder: (transformer, encoder),
326                // Batch formats handle their own compression internally
327                compression: Compression::None,
328                filename_tz_offset: offset,
329            };
330
331            let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
332            return Ok(VectorSink::from_event_streamsink(sink));
333        }
334
335        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
336        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
337
338        let request_options = S3RequestOptions {
339            bucket: self.bucket.clone(),
340            api_options: self.options.clone(),
341            filename_extension: self.filename_extension.clone(),
342            filename_time_format: self.filename_time_format.clone(),
343            filename_append_uuid: self.filename_append_uuid,
344            encoder: (transformer, encoder),
345            compression: self.compression,
346            filename_tz_offset: offset,
347        };
348
349        let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
350
351        Ok(VectorSink::from_event_streamsink(sink))
352    }
353
354    pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
355        s3_common::config::build_healthcheck(self.bucket.clone(), client)
356    }
357
358    pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
359        s3_common::config::create_service(
360            &self.region,
361            &self.auth,
362            proxy,
363            self.tls.as_ref(),
364            self.force_path_style,
365        )
366        .await
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::S3SinkConfig;
373
374    #[test]
375    fn generate_config() {
376        crate::test_util::test_generate_config::<S3SinkConfig>();
377    }
378
379    /// Correct TOML shape: `batch_encoding.codec = "parquet"` with `schema_mode = "auto_infer"`.
380    #[cfg(feature = "codecs-parquet")]
381    #[test]
382    fn parquet_batch_encoding_correct_toml_shape() {
383        let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
384            bucket: test-bucket
385            compression: none
386            encoding:
387              codec: text
388            batch_encoding:
389              schema_mode: auto_infer
390              codec: parquet
391              compression:
392                algorithm: snappy
393            "#})
394        .expect("correct batch_encoding shape should parse");
395
396        let batch_enc = config
397            .batch_encoding
398            .expect("batch_encoding should be Some");
399        let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
400        use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
401        assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
402        assert_eq!(p.compression, ParquetCompression::Snappy);
403    }
404
405    /// Content-Type must be auto-detected as `application/vnd.apache.parquet`
406    /// when `batch_encoding` is set and `content_type` is not explicitly provided.
407    #[cfg(feature = "codecs-parquet")]
408    #[test]
409    fn parquet_content_type_auto_detected() {
410        use vector_lib::codecs::encoding::format::{
411            ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
412        };
413
414        use crate::sinks::s3_common::config::S3Options;
415        use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
416        use vector_lib::codecs::TextSerializerConfig;
417        use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
418
419        let parquet_config = ParquetSerializerConfig {
420            schema_mode: ParquetSchemaMode::AutoInfer,
421            compression: ParquetCompression::Snappy,
422            ..Default::default()
423        };
424
425        let config = S3SinkConfig {
426            bucket: "test".to_string(),
427            key_prefix: super::default_key_prefix(),
428            filename_time_format: super::default_filename_time_format(),
429            filename_append_uuid: true,
430            filename_extension: None,
431            options: S3Options::default(),
432            region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
433            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
434            batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
435            compression: Compression::None,
436            batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
437            request: Default::default(),
438            tls: Default::default(),
439            auth: Default::default(),
440            acknowledgements: Default::default(),
441            timezone: Default::default(),
442            force_path_style: true,
443            retry_strategy: Default::default(),
444        };
445
446        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
447        let batch_config = BatchSerializerConfig::Parquet(p.clone());
448        let batch_serializer = batch_config.build_batch_serializer().unwrap();
449        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
450
451        let mut api_options = config.options.clone();
452        if api_options.content_type.is_none() {
453            api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
454        }
455
456        assert_eq!(
457            api_options.content_type.as_deref(),
458            Some("application/vnd.apache.parquet"),
459            "Content-Type must be auto-detected for Parquet"
460        );
461    }
462
463    /// When user explicitly sets `content_type`, the auto-detection must not override it.
464    #[cfg(feature = "codecs-parquet")]
465    #[test]
466    fn parquet_content_type_user_override_preserved() {
467        let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
468            bucket: test-bucket
469            compression: none
470            content_type: "application/octet-stream"
471            encoding:
472              codec: text
473            batch_encoding:
474              codec: parquet
475              schema_mode: auto_infer
476              compression:
477                algorithm: gzip
478                level: 9
479            "#})
480        .unwrap();
481
482        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
483        let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
484        let batch_serializer = batch_config.build_batch_serializer().unwrap();
485        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
486
487        let mut api_options = config.options.clone();
488        if api_options.content_type.is_none() {
489            api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
490        }
491
492        assert_eq!(
493            api_options.content_type.as_deref(),
494            Some("application/octet-stream"),
495            "User-specified Content-Type must not be overridden"
496        );
497    }
498
499    /// Codecs other than `parquet` must be rejected at parse time, since
500    /// `S3BatchEncoding` only exposes the `parquet` variant.
501    #[cfg(feature = "codecs-parquet")]
502    #[test]
503    fn parquet_batch_encoding_rejects_unsupported_codec() {
504        let err = serde_yaml::from_str::<S3SinkConfig>(
505            r#"
506            bucket: test-bucket
507            compression: none
508            encoding:
509              codec: text
510            batch_encoding:
511              codec: arrow_stream
512            "#,
513        )
514        .unwrap_err();
515
516        assert!(
517            err.to_string().contains("arrow_stream"),
518            "expected error to mention the offending codec, got: {err}"
519        );
520    }
521
522    /// Explicit filename_extension overrides the `.parquet` default.
523    #[cfg(feature = "codecs-parquet")]
524    #[test]
525    fn parquet_filename_extension_user_override() {
526        let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
527            bucket: test-bucket
528            compression: none
529            filename_extension: pq
530            encoding:
531              codec: text
532            batch_encoding:
533              codec: parquet
534              schema_mode: auto_infer
535            "#})
536        .unwrap();
537
538        assert_eq!(config.filename_extension.as_deref(), Some("pq"));
539    }
540
541    /// `schema_mode` defaults to `relaxed` when not specified.
542    #[cfg(feature = "codecs-parquet")]
543    #[test]
544    fn parquet_schema_mode_defaults_to_relaxed() {
545        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
546
547        let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
548            bucket: test-bucket
549            compression: none
550            encoding:
551              codec: text
552            batch_encoding:
553              codec: parquet
554            "#})
555        .unwrap();
556
557        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
558        assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
559    }
560
561    /// Explicit `schema_mode = "strict"` is correctly parsed.
562    #[cfg(feature = "codecs-parquet")]
563    #[test]
564    fn parquet_schema_mode_strict_parsed() {
565        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
566
567        let config: S3SinkConfig = serde_yaml::from_str(indoc::indoc! {r#"
568            bucket: test-bucket
569            compression: none
570            encoding:
571              codec: text
572            batch_encoding:
573              codec: parquet
574              schema_mode: strict
575              schema_file: tmp/something.schema
576            "#})
577        .unwrap();
578
579        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
580        assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
581    }
582}