Skip to main content

vector/sinks/azure_blob/
config.rs

1use std::fs::File;
2use std::io::Read;
3use std::sync::Arc;
4
5use azure_core::{
6    Error,
7    credentials::TokenCredential,
8    error::ErrorKind,
9    http::{StatusCode, Url},
10};
11use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
12
13use bytes::Bytes;
14use futures::FutureExt;
15use snafu::Snafu;
16use tower::ServiceBuilder;
17use vector_lib::{
18    codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
19    configurable::configurable_component,
20    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
21    sensitive_string::SensitiveString,
22    stream::DriverResponse,
23};
24
25use super::request_builder::AzureBlobRequestOptions;
26use crate::{
27    Result,
28    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
29    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
30    event::{EventFinalizers, EventStatus, Finalizable},
31    sinks::{
32        Healthcheck, VectorSink,
33        azure_blob::{service::AzureBlobService, sink::AzureBlobSink},
34        azure_common::{
35            config::AzureAuthentication,
36            config::AzureBlobTlsConfig,
37            connection_string::{Auth, ParsedConnectionString},
38            shared_key_policy::SharedKeyAuthorizationPolicy,
39        },
40        util::{
41            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
42            TowerRequestConfig, partitioner::KeyPartitioner, retries::RetryLogic,
43            service::TowerRequestConfigDefaults,
44        },
45    },
46    template::Template,
47};
48
49#[derive(Clone, Copy, Debug)]
50pub struct AzureBlobTowerRequestConfigDefaults;
51
52impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
53    const RATE_LIMIT_NUM: u64 = 250;
54}
55
56/// Configuration for the `azure_blob` sink.
57#[configurable_component(sink(
58    "azure_blob",
59    "Store your observability data in Azure Blob Storage."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct AzureBlobSinkConfig {
64    #[configurable(derived)]
65    #[serde(default)]
66    pub auth: Option<AzureAuthentication>,
67
68    /// The Azure Blob Storage Account connection string.
69    ///
70    /// Authentication with an access key or shared access signature (SAS)
71    /// are supported authentication methods. If using a non-account SAS,
72    /// healthchecks will fail and will need to be disabled by setting
73    /// `healthcheck.enabled` to `false` for this sink
74    ///
75    /// When generating an account SAS, the following are the minimum required option
76    /// settings for Vector to access blob storage and pass a health check.
77    /// | Option                 | Value              |
78    /// | ---------------------- | ------------------ |
79    /// | Allowed services       | Blob               |
80    /// | Allowed resource types | Container & Object |
81    /// | Allowed permissions    | Read & Create      |
82    #[configurable(metadata(
83        docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
84        resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
85        connection strings secure and not expose them in logs, error messages, or version control systems."
86    ))]
87    #[configurable(metadata(
88        docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
89    ))]
90    #[configurable(metadata(
91        docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
92    ))]
93    #[configurable(metadata(docs::examples = "AccountName=mylogstorage"))]
94    pub connection_string: Option<SensitiveString>,
95
96    /// The Azure Blob Storage Account name.
97    ///
98    /// If provided, this will be used instead of the `connection_string`.
99    /// This is useful for authenticating with an Azure credential.
100    #[configurable(metadata(docs::examples = "mylogstorage"))]
101    pub(super) account_name: Option<String>,
102
103    /// The Azure Blob Storage endpoint.
104    ///
105    /// If provided, this will be used instead of the `connection_string`.
106    /// This is useful for authenticating with an Azure credential.
107    #[configurable(metadata(docs::examples = "https://mylogstorage.blob.core.windows.net/"))]
108    pub(super) blob_endpoint: Option<String>,
109
110    /// The Azure Blob Storage Account container name.
111    #[configurable(metadata(docs::examples = "my-logs"))]
112    pub(super) container_name: String,
113
114    /// A prefix to apply to all blob keys.
115    ///
116    /// Prefixes are useful for partitioning objects, such as by creating a blob key that
117    /// stores blobs under a particular directory. If using a prefix for this purpose, it must end
118    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
119    #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
120    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
121    #[configurable(metadata(
122        docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
123    ))]
124    #[serde(default = "default_blob_prefix")]
125    pub blob_prefix: Template,
126
127    /// The timestamp format for the time component of the blob key.
128    ///
129    /// By default, blob keys are appended with a timestamp that reflects when the blob are sent to
130    /// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining
131    /// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
132    ///
133    /// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
134    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
135    /// timestamps in seconds since the Unix epoch.
136    ///
137    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
138    /// languages.
139    ///
140    /// When set to an empty string, no timestamp is appended to the blob prefix.
141    ///
142    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
143    #[configurable(metadata(docs::syntax_override = "strftime"))]
144    pub blob_time_format: Option<String>,
145
146    /// Whether or not to append a UUID v4 token to the end of the blob key.
147    ///
148    /// The UUID is appended to the timestamp portion of the object key, such that if the blob key
149    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
150    /// in an blob key that looks like
151    /// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
152    ///
153    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
154    /// blob keys must be unique.
155    pub blob_append_uuid: Option<bool>,
156
157    #[serde(flatten)]
158    pub encoding: EncodingConfigWithFraming,
159
160    /// Compression configuration.
161    ///
162    /// All compression algorithms use the default compression level unless otherwise specified.
163    ///
164    /// Some cloud storage API clients and browsers handle decompression transparently, so
165    /// depending on how they are accessed, files may not always appear to be compressed.
166    #[configurable(derived)]
167    #[serde(default = "Compression::gzip_default")]
168    pub compression: Compression,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
173
174    #[configurable(derived)]
175    #[serde(default)]
176    pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
177
178    #[configurable(derived)]
179    #[serde(
180        default,
181        deserialize_with = "crate::serde::bool_or_struct",
182        skip_serializing_if = "crate::serde::is_default"
183    )]
184    pub(super) acknowledgements: AcknowledgementsConfig,
185
186    #[configurable(derived)]
187    #[serde(default)]
188    pub tls: Option<AzureBlobTlsConfig>,
189}
190
191pub fn default_blob_prefix() -> Template {
192    Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
193}
194
195impl GenerateConfig for AzureBlobSinkConfig {
196    fn generate_config() -> toml::Value {
197        toml::Value::try_from(Self {
198            auth: None,
199            connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
200            account_name: None,
201            blob_endpoint: None,
202            container_name: String::from("logs"),
203            blob_prefix: default_blob_prefix(),
204            blob_time_format: Some(String::from("%s")),
205            blob_append_uuid: Some(true),
206            encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
207            compression: Compression::gzip_default(),
208            batch: BatchConfig::default(),
209            request: TowerRequestConfig::default(),
210            acknowledgements: Default::default(),
211            tls: None,
212        })
213        .unwrap()
214    }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "azure_blob")]
219impl SinkConfig for AzureBlobSinkConfig {
220    async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
221        let connection_string: String = match (
222            &self.connection_string,
223            &self.account_name,
224            &self.blob_endpoint,
225        ) {
226            (Some(connstr), None, None) => connstr.inner().into(),
227            (None, Some(account_name), None) => {
228                if self.auth.is_none() {
229                    return Err(
230                        "`auth` configuration must be provided when using `account_name`".into(),
231                    );
232                }
233                format!("AccountName={}", account_name)
234            }
235            (None, None, Some(blob_endpoint)) => {
236                if self.auth.is_none() {
237                    return Err(
238                        "`auth` configuration must be provided when using `blob_endpoint`".into(),
239                    );
240                }
241                // BlobEndpoint must always end in a trailing slash
242                let blob_endpoint = if blob_endpoint.ends_with('/') {
243                    blob_endpoint.clone()
244                } else {
245                    format!("{}/", blob_endpoint)
246                };
247                format!("BlobEndpoint={}", blob_endpoint)
248            }
249            (None, None, None) => {
250                return Err("One of `connection_string`, `account_name`, or `blob_endpoint` must be provided".into());
251            }
252            (Some(_), Some(_), _) => {
253                return Err("Cannot provide both `connection_string` and `account_name`".into());
254            }
255            (Some(_), _, Some(_)) => {
256                return Err("Cannot provide both `connection_string` and `blob_endpoint`".into());
257            }
258            (_, Some(_), Some(_)) => {
259                return Err("Cannot provide both `account_name` and `blob_endpoint`".into());
260            }
261        };
262
263        let client = build_client(
264            self.auth.clone(),
265            connection_string.clone(),
266            self.container_name.clone(),
267            cx.proxy(),
268            self.tls.clone(),
269        )
270        .await?;
271
272        let healthcheck = build_healthcheck(self.container_name.clone(), Arc::clone(&client))?;
273        let sink = self.build_processor(client)?;
274        Ok((sink, healthcheck))
275    }
276
277    fn input(&self) -> Input {
278        Input::new(self.encoding.config().1.input_type() & DataType::Log)
279    }
280
281    fn acknowledgements(&self) -> &AcknowledgementsConfig {
282        &self.acknowledgements
283    }
284}
285
286const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
287const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
288const DEFAULT_FILENAME_APPEND_UUID: bool = true;
289
290impl AzureBlobSinkConfig {
291    pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
292        let request_limits = self.request.into_settings();
293        let service = ServiceBuilder::new()
294            .settings(request_limits, AzureBlobRetryLogic)
295            .service(AzureBlobService::new(client));
296
297        // Configure our partitioning/batching.
298        let batcher_settings = self.batch.into_batcher_settings()?;
299
300        let blob_time_format = self
301            .blob_time_format
302            .as_ref()
303            .cloned()
304            .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
305        let blob_append_uuid = self
306            .blob_append_uuid
307            .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
308
309        let transformer = self.encoding.transformer();
310        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
311        let encoder = Encoder::<Framer>::new(framer, serializer);
312
313        let request_options = AzureBlobRequestOptions {
314            container_name: self.container_name.clone(),
315            blob_time_format,
316            blob_append_uuid,
317            encoder: (transformer, encoder),
318            compression: self.compression,
319        };
320
321        let sink = AzureBlobSink::new(
322            service,
323            request_options,
324            self.key_partitioner()?,
325            batcher_settings,
326        );
327
328        Ok(VectorSink::from_event_streamsink(sink))
329    }
330
331    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
332        Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
333    }
334}
335
336#[derive(Debug, Clone)]
337pub struct AzureBlobRequest {
338    pub blob_data: Bytes,
339    pub content_encoding: Option<&'static str>,
340    pub content_type: &'static str,
341    pub metadata: AzureBlobMetadata,
342    pub request_metadata: RequestMetadata,
343}
344
345impl Finalizable for AzureBlobRequest {
346    fn take_finalizers(&mut self) -> EventFinalizers {
347        std::mem::take(&mut self.metadata.finalizers)
348    }
349}
350
351impl MetaDescriptive for AzureBlobRequest {
352    fn get_metadata(&self) -> &RequestMetadata {
353        &self.request_metadata
354    }
355
356    fn metadata_mut(&mut self) -> &mut RequestMetadata {
357        &mut self.request_metadata
358    }
359}
360
361#[derive(Clone, Debug)]
362pub struct AzureBlobMetadata {
363    pub partition_key: String,
364    pub count: usize,
365    pub finalizers: EventFinalizers,
366}
367
368#[derive(Debug, Clone)]
369pub struct AzureBlobRetryLogic;
370
371impl RetryLogic for AzureBlobRetryLogic {
372    type Error = Error;
373    type Request = AzureBlobRequest;
374    type Response = AzureBlobResponse;
375
376    fn is_retriable_error(&self, error: &Self::Error) -> bool {
377        match error.http_status() {
378            Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
379            None => false,
380        }
381    }
382}
383
384#[derive(Debug)]
385pub struct AzureBlobResponse {
386    pub events_byte_size: GroupedCountByteSize,
387    pub byte_size: usize,
388}
389
390impl DriverResponse for AzureBlobResponse {
391    fn event_status(&self) -> EventStatus {
392        EventStatus::Delivered
393    }
394
395    fn events_sent(&self) -> &GroupedCountByteSize {
396        &self.events_byte_size
397    }
398
399    fn bytes_sent(&self) -> Option<usize> {
400        Some(self.byte_size)
401    }
402}
403
404#[derive(Debug, Snafu)]
405pub enum HealthcheckError {
406    #[snafu(display("Invalid connection string specified"))]
407    InvalidCredentials,
408    #[snafu(display("Container: {:?} not found", container))]
409    UnknownContainer { container: String },
410    #[snafu(display("Unknown status code: {}", status))]
411    Unknown { status: StatusCode },
412}
413
414pub fn build_healthcheck(
415    container_name: String,
416    client: Arc<BlobContainerClient>,
417) -> crate::Result<Healthcheck> {
418    let healthcheck = async move {
419        let resp: crate::Result<()> = match client.get_properties(None).await {
420            Ok(_) => Ok(()),
421            Err(error) => {
422                let code = error.http_status();
423                Err(match code {
424                    Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
425                    Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
426                        container: container_name,
427                    }),
428                    Some(status) => Box::new(HealthcheckError::Unknown { status }),
429                    None => "unknown status code".into(),
430                })
431            }
432        };
433        resp
434    };
435
436    Ok(healthcheck.boxed())
437}
438
439pub async fn build_client(
440    auth: Option<AzureAuthentication>,
441    connection_string: String,
442    container_name: String,
443    proxy: &crate::config::ProxyConfig,
444    tls: Option<AzureBlobTlsConfig>,
445) -> crate::Result<Arc<BlobContainerClient>> {
446    // Parse connection string without legacy SDK
447    let parsed = ParsedConnectionString::parse(&connection_string)
448        .map_err(|e| format!("Invalid connection string: {e}"))?;
449    // Compose container URL (SAS appended if present)
450    let container_url = parsed
451        .container_url(&container_name)
452        .map_err(|e| format!("Failed to build container URL: {e}"))?;
453    let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
454
455    let mut credential: Option<Arc<dyn TokenCredential>> = None;
456
457    // Prepare options; attach Shared Key policy if needed
458    let mut options = BlobContainerClientOptions::default();
459    match (parsed.auth(), &auth) {
460        (Auth::None, None) => {
461            warn!("No authentication method provided, requests will be anonymous.");
462        }
463        (Auth::Sas { .. }, None) => {
464            info!("Using SAS token authentication.");
465        }
466        (
467            Auth::SharedKey {
468                account_name,
469                account_key,
470            },
471            None,
472        ) => {
473            info!("Using Shared Key authentication.");
474
475            let policy = SharedKeyAuthorizationPolicy::new(
476                account_name,
477                account_key,
478                // Use an Azurite-supported storage service version
479                String::from("2025-11-05"),
480            )
481            .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
482            options
483                .client_options
484                .per_call_policies
485                .push(Arc::new(policy));
486        }
487        (Auth::None, Some(AzureAuthentication::Specific(..))) => {
488            info!("Using Azure Authentication method.");
489            let credential_result: Arc<dyn TokenCredential> =
490                auth.unwrap().credential().await.map_err(|e| {
491                    Error::with_message(
492                        ErrorKind::Credential,
493                        format!("Failed to configure Azure Authentication: {e}"),
494                    )
495                })?;
496            credential = Some(credential_result);
497        }
498        (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
499            return Err(Box::new(Error::with_message(
500                ErrorKind::Credential,
501                "Cannot use both SAS token and another Azure Authentication method at the same time",
502            )));
503        }
504        (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
505            return Err(Box::new(Error::with_message(
506                ErrorKind::Credential,
507                "Cannot use both Shared Key and another Azure Authentication method at the same time",
508            )));
509        }
510        #[cfg(test)]
511        (Auth::None, Some(AzureAuthentication::MockCredential)) => {
512            warn!("Using mock token credential authentication.");
513            credential = Some(auth.unwrap().credential().await.unwrap());
514        }
515        #[cfg(test)]
516        (_, Some(AzureAuthentication::MockCredential)) => {
517            return Err(Box::new(Error::with_message(
518                ErrorKind::Credential,
519                "Cannot use both connection string auth and mock credential at the same time",
520            )));
521        }
522    }
523
524    // Use reqwest v0.13 since Azure SDK only implements HttpClient for reqwest::Client v0.13
525    let mut reqwest_builder = reqwest_13::ClientBuilder::new();
526    let bypass_proxy = {
527        let host = url.host_str().unwrap_or("");
528        let port = url.port();
529        proxy.no_proxy.matches(host)
530            || port
531                .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
532                .unwrap_or(false)
533    };
534    if bypass_proxy || !proxy.enabled {
535        // Ensure no proxy (and disable any potential system proxy auto-detection)
536        reqwest_builder = reqwest_builder.no_proxy();
537    } else {
538        if let Some(http) = &proxy.http {
539            let p = reqwest_13::Proxy::http(http)
540                .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
541            // If credentials are embedded in the proxy URL, reqwest will handle them.
542            reqwest_builder = reqwest_builder.proxy(p);
543        }
544        if let Some(https) = &proxy.https {
545            let p = reqwest_13::Proxy::https(https)
546                .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
547            // If credentials are embedded in the proxy URL, reqwest will handle them.
548            reqwest_builder = reqwest_builder.proxy(p);
549        }
550    }
551
552    if let Some(AzureBlobTlsConfig { ca_file }) = &tls
553        && let Some(ca_file) = ca_file
554    {
555        let mut buf = Vec::new();
556        File::open(ca_file)?.read_to_end(&mut buf)?;
557        let cert = reqwest_13::Certificate::from_pem(&buf)?;
558
559        warn!("Adding TLS root certificate from {}", ca_file.display());
560        reqwest_builder = reqwest_builder.add_root_certificate(cert);
561    }
562
563    options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
564        reqwest_builder
565            .build()
566            .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
567    )));
568    let client =
569        BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?;
570    Ok(Arc::new(client))
571}