1use std::fs::File;
2use std::io::Read;
3use std::sync::Arc;
4
5use azure_core::{
6 Error,
7 credentials::TokenCredential,
8 error::ErrorKind,
9 http::{StatusCode, Url},
10};
11use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
12
13use bytes::Bytes;
14use futures::FutureExt;
15use snafu::Snafu;
16use tower::ServiceBuilder;
17use vector_lib::{
18 codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
19 configurable::configurable_component,
20 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
21 sensitive_string::SensitiveString,
22 stream::DriverResponse,
23};
24
25use super::request_builder::AzureBlobRequestOptions;
26use crate::{
27 Result,
28 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
29 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
30 event::{EventFinalizers, EventStatus, Finalizable},
31 sinks::{
32 Healthcheck, VectorSink,
33 azure_blob::{service::AzureBlobService, sink::AzureBlobSink},
34 azure_common::{
35 config::AzureAuthentication,
36 config::AzureBlobTlsConfig,
37 connection_string::{Auth, ParsedConnectionString},
38 shared_key_policy::SharedKeyAuthorizationPolicy,
39 },
40 util::{
41 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
42 TowerRequestConfig, partitioner::KeyPartitioner, retries::RetryLogic,
43 service::TowerRequestConfigDefaults,
44 },
45 },
46 template::Template,
47};
48
49#[derive(Clone, Copy, Debug)]
50pub struct AzureBlobTowerRequestConfigDefaults;
51
52impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
53 const RATE_LIMIT_NUM: u64 = 250;
54}
55
56#[configurable_component(sink(
58 "azure_blob",
59 "Store your observability data in Azure Blob Storage."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct AzureBlobSinkConfig {
64 #[configurable(derived)]
65 #[serde(default)]
66 pub auth: Option<AzureAuthentication>,
67
68 #[configurable(metadata(
83 docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
84 resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
85 connection strings secure and not expose them in logs, error messages, or version control systems."
86 ))]
87 #[configurable(metadata(
88 docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
89 ))]
90 #[configurable(metadata(
91 docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
92 ))]
93 #[configurable(metadata(docs::examples = "AccountName=mylogstorage"))]
94 pub connection_string: Option<SensitiveString>,
95
96 #[configurable(metadata(docs::examples = "mylogstorage"))]
101 pub(super) account_name: Option<String>,
102
103 #[configurable(metadata(docs::examples = "https://mylogstorage.blob.core.windows.net/"))]
108 pub(super) blob_endpoint: Option<String>,
109
110 #[configurable(metadata(docs::examples = "my-logs"))]
112 pub(super) container_name: String,
113
114 #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
120 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
121 #[configurable(metadata(
122 docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
123 ))]
124 #[serde(default = "default_blob_prefix")]
125 pub blob_prefix: Template,
126
127 #[configurable(metadata(docs::syntax_override = "strftime"))]
144 pub blob_time_format: Option<String>,
145
146 pub blob_append_uuid: Option<bool>,
156
157 #[serde(flatten)]
158 pub encoding: EncodingConfigWithFraming,
159
160 #[configurable(derived)]
167 #[serde(default = "Compression::gzip_default")]
168 pub compression: Compression,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
173
174 #[configurable(derived)]
175 #[serde(default)]
176 pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
177
178 #[configurable(derived)]
179 #[serde(
180 default,
181 deserialize_with = "crate::serde::bool_or_struct",
182 skip_serializing_if = "crate::serde::is_default"
183 )]
184 pub(super) acknowledgements: AcknowledgementsConfig,
185
186 #[configurable(derived)]
187 #[serde(default)]
188 pub tls: Option<AzureBlobTlsConfig>,
189}
190
191pub fn default_blob_prefix() -> Template {
192 Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
193}
194
195impl GenerateConfig for AzureBlobSinkConfig {
196 fn generate_config() -> toml::Value {
197 toml::Value::try_from(Self {
198 auth: None,
199 connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
200 account_name: None,
201 blob_endpoint: None,
202 container_name: String::from("logs"),
203 blob_prefix: default_blob_prefix(),
204 blob_time_format: Some(String::from("%s")),
205 blob_append_uuid: Some(true),
206 encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
207 compression: Compression::gzip_default(),
208 batch: BatchConfig::default(),
209 request: TowerRequestConfig::default(),
210 acknowledgements: Default::default(),
211 tls: None,
212 })
213 .unwrap()
214 }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "azure_blob")]
219impl SinkConfig for AzureBlobSinkConfig {
220 async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
221 let connection_string: String = match (
222 &self.connection_string,
223 &self.account_name,
224 &self.blob_endpoint,
225 ) {
226 (Some(connstr), None, None) => connstr.inner().into(),
227 (None, Some(account_name), None) => {
228 if self.auth.is_none() {
229 return Err(
230 "`auth` configuration must be provided when using `account_name`".into(),
231 );
232 }
233 format!("AccountName={}", account_name)
234 }
235 (None, None, Some(blob_endpoint)) => {
236 if self.auth.is_none() {
237 return Err(
238 "`auth` configuration must be provided when using `blob_endpoint`".into(),
239 );
240 }
241 let blob_endpoint = if blob_endpoint.ends_with('/') {
243 blob_endpoint.clone()
244 } else {
245 format!("{}/", blob_endpoint)
246 };
247 format!("BlobEndpoint={}", blob_endpoint)
248 }
249 (None, None, None) => {
250 return Err("One of `connection_string`, `account_name`, or `blob_endpoint` must be provided".into());
251 }
252 (Some(_), Some(_), _) => {
253 return Err("Cannot provide both `connection_string` and `account_name`".into());
254 }
255 (Some(_), _, Some(_)) => {
256 return Err("Cannot provide both `connection_string` and `blob_endpoint`".into());
257 }
258 (_, Some(_), Some(_)) => {
259 return Err("Cannot provide both `account_name` and `blob_endpoint`".into());
260 }
261 };
262
263 let client = build_client(
264 self.auth.clone(),
265 connection_string.clone(),
266 self.container_name.clone(),
267 cx.proxy(),
268 self.tls.clone(),
269 )
270 .await?;
271
272 let healthcheck = build_healthcheck(self.container_name.clone(), Arc::clone(&client))?;
273 let sink = self.build_processor(client)?;
274 Ok((sink, healthcheck))
275 }
276
277 fn input(&self) -> Input {
278 Input::new(self.encoding.config().1.input_type() & DataType::Log)
279 }
280
281 fn acknowledgements(&self) -> &AcknowledgementsConfig {
282 &self.acknowledgements
283 }
284}
285
286const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
287const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
288const DEFAULT_FILENAME_APPEND_UUID: bool = true;
289
290impl AzureBlobSinkConfig {
291 pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
292 let request_limits = self.request.into_settings();
293 let service = ServiceBuilder::new()
294 .settings(request_limits, AzureBlobRetryLogic)
295 .service(AzureBlobService::new(client));
296
297 let batcher_settings = self.batch.into_batcher_settings()?;
299
300 let blob_time_format = self
301 .blob_time_format
302 .as_ref()
303 .cloned()
304 .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
305 let blob_append_uuid = self
306 .blob_append_uuid
307 .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
308
309 let transformer = self.encoding.transformer();
310 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
311 let encoder = Encoder::<Framer>::new(framer, serializer);
312
313 let request_options = AzureBlobRequestOptions {
314 container_name: self.container_name.clone(),
315 blob_time_format,
316 blob_append_uuid,
317 encoder: (transformer, encoder),
318 compression: self.compression,
319 };
320
321 let sink = AzureBlobSink::new(
322 service,
323 request_options,
324 self.key_partitioner()?,
325 batcher_settings,
326 );
327
328 Ok(VectorSink::from_event_streamsink(sink))
329 }
330
331 pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
332 Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
333 }
334}
335
336#[derive(Debug, Clone)]
337pub struct AzureBlobRequest {
338 pub blob_data: Bytes,
339 pub content_encoding: Option<&'static str>,
340 pub content_type: &'static str,
341 pub metadata: AzureBlobMetadata,
342 pub request_metadata: RequestMetadata,
343}
344
345impl Finalizable for AzureBlobRequest {
346 fn take_finalizers(&mut self) -> EventFinalizers {
347 std::mem::take(&mut self.metadata.finalizers)
348 }
349}
350
351impl MetaDescriptive for AzureBlobRequest {
352 fn get_metadata(&self) -> &RequestMetadata {
353 &self.request_metadata
354 }
355
356 fn metadata_mut(&mut self) -> &mut RequestMetadata {
357 &mut self.request_metadata
358 }
359}
360
361#[derive(Clone, Debug)]
362pub struct AzureBlobMetadata {
363 pub partition_key: String,
364 pub count: usize,
365 pub finalizers: EventFinalizers,
366}
367
368#[derive(Debug, Clone)]
369pub struct AzureBlobRetryLogic;
370
371impl RetryLogic for AzureBlobRetryLogic {
372 type Error = Error;
373 type Request = AzureBlobRequest;
374 type Response = AzureBlobResponse;
375
376 fn is_retriable_error(&self, error: &Self::Error) -> bool {
377 match error.http_status() {
378 Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
379 None => false,
380 }
381 }
382}
383
384#[derive(Debug)]
385pub struct AzureBlobResponse {
386 pub events_byte_size: GroupedCountByteSize,
387 pub byte_size: usize,
388}
389
390impl DriverResponse for AzureBlobResponse {
391 fn event_status(&self) -> EventStatus {
392 EventStatus::Delivered
393 }
394
395 fn events_sent(&self) -> &GroupedCountByteSize {
396 &self.events_byte_size
397 }
398
399 fn bytes_sent(&self) -> Option<usize> {
400 Some(self.byte_size)
401 }
402}
403
404#[derive(Debug, Snafu)]
405pub enum HealthcheckError {
406 #[snafu(display("Invalid connection string specified"))]
407 InvalidCredentials,
408 #[snafu(display("Container: {:?} not found", container))]
409 UnknownContainer { container: String },
410 #[snafu(display("Unknown status code: {}", status))]
411 Unknown { status: StatusCode },
412}
413
414pub fn build_healthcheck(
415 container_name: String,
416 client: Arc<BlobContainerClient>,
417) -> crate::Result<Healthcheck> {
418 let healthcheck = async move {
419 let resp: crate::Result<()> = match client.get_properties(None).await {
420 Ok(_) => Ok(()),
421 Err(error) => {
422 let code = error.http_status();
423 Err(match code {
424 Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
425 Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
426 container: container_name,
427 }),
428 Some(status) => Box::new(HealthcheckError::Unknown { status }),
429 None => "unknown status code".into(),
430 })
431 }
432 };
433 resp
434 };
435
436 Ok(healthcheck.boxed())
437}
438
439pub async fn build_client(
440 auth: Option<AzureAuthentication>,
441 connection_string: String,
442 container_name: String,
443 proxy: &crate::config::ProxyConfig,
444 tls: Option<AzureBlobTlsConfig>,
445) -> crate::Result<Arc<BlobContainerClient>> {
446 let parsed = ParsedConnectionString::parse(&connection_string)
448 .map_err(|e| format!("Invalid connection string: {e}"))?;
449 let container_url = parsed
451 .container_url(&container_name)
452 .map_err(|e| format!("Failed to build container URL: {e}"))?;
453 let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
454
455 let mut credential: Option<Arc<dyn TokenCredential>> = None;
456
457 let mut options = BlobContainerClientOptions::default();
459 match (parsed.auth(), &auth) {
460 (Auth::None, None) => {
461 warn!("No authentication method provided, requests will be anonymous.");
462 }
463 (Auth::Sas { .. }, None) => {
464 info!("Using SAS token authentication.");
465 }
466 (
467 Auth::SharedKey {
468 account_name,
469 account_key,
470 },
471 None,
472 ) => {
473 info!("Using Shared Key authentication.");
474
475 let policy = SharedKeyAuthorizationPolicy::new(
476 account_name,
477 account_key,
478 String::from("2025-11-05"),
480 )
481 .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
482 options
483 .client_options
484 .per_call_policies
485 .push(Arc::new(policy));
486 }
487 (Auth::None, Some(AzureAuthentication::Specific(..))) => {
488 info!("Using Azure Authentication method.");
489 let credential_result: Arc<dyn TokenCredential> =
490 auth.unwrap().credential().await.map_err(|e| {
491 Error::with_message(
492 ErrorKind::Credential,
493 format!("Failed to configure Azure Authentication: {e}"),
494 )
495 })?;
496 credential = Some(credential_result);
497 }
498 (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
499 return Err(Box::new(Error::with_message(
500 ErrorKind::Credential,
501 "Cannot use both SAS token and another Azure Authentication method at the same time",
502 )));
503 }
504 (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
505 return Err(Box::new(Error::with_message(
506 ErrorKind::Credential,
507 "Cannot use both Shared Key and another Azure Authentication method at the same time",
508 )));
509 }
510 #[cfg(test)]
511 (Auth::None, Some(AzureAuthentication::MockCredential)) => {
512 warn!("Using mock token credential authentication.");
513 credential = Some(auth.unwrap().credential().await.unwrap());
514 }
515 #[cfg(test)]
516 (_, Some(AzureAuthentication::MockCredential)) => {
517 return Err(Box::new(Error::with_message(
518 ErrorKind::Credential,
519 "Cannot use both connection string auth and mock credential at the same time",
520 )));
521 }
522 }
523
524 let mut reqwest_builder = reqwest_13::ClientBuilder::new();
526 let bypass_proxy = {
527 let host = url.host_str().unwrap_or("");
528 let port = url.port();
529 proxy.no_proxy.matches(host)
530 || port
531 .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
532 .unwrap_or(false)
533 };
534 if bypass_proxy || !proxy.enabled {
535 reqwest_builder = reqwest_builder.no_proxy();
537 } else {
538 if let Some(http) = &proxy.http {
539 let p = reqwest_13::Proxy::http(http)
540 .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
541 reqwest_builder = reqwest_builder.proxy(p);
543 }
544 if let Some(https) = &proxy.https {
545 let p = reqwest_13::Proxy::https(https)
546 .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
547 reqwest_builder = reqwest_builder.proxy(p);
549 }
550 }
551
552 if let Some(AzureBlobTlsConfig { ca_file }) = &tls
553 && let Some(ca_file) = ca_file
554 {
555 let mut buf = Vec::new();
556 File::open(ca_file)?.read_to_end(&mut buf)?;
557 let cert = reqwest_13::Certificate::from_pem(&buf)?;
558
559 warn!("Adding TLS root certificate from {}", ca_file.display());
560 reqwest_builder = reqwest_builder.add_root_certificate(cert);
561 }
562
563 options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
564 reqwest_builder
565 .build()
566 .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
567 )));
568 let client =
569 BlobContainerClient::new(url, credential, Some(options)).map_err(|e| format!("{e}"))?;
570 Ok(Arc::new(client))
571}