Skip to main content

vector/sinks/azure_blob/
request_builder.rs

1use bytes::Bytes;
2use chrono::Utc;
3use uuid::Uuid;
4use vector_lib::{codecs::encoding::Framer, request_metadata::RequestMetadata};
5
6use crate::{
7    codecs::{Encoder, Transformer},
8    event::{Event, Finalizable},
9    sinks::{
10        azure_blob::config::{AzureBlobMetadata, AzureBlobRequest},
11        util::{
12            Compression, RequestBuilder, metadata::RequestMetadataBuilder,
13            request_builder::EncodeResult,
14        },
15    },
16};
17
18#[derive(Clone)]
19pub struct AzureBlobRequestOptions {
20    pub container_name: String,
21    pub blob_time_format: String,
22    pub blob_append_uuid: bool,
23    pub encoder: (Transformer, Encoder<Framer>),
24    pub compression: Compression,
25}
26
27impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
28    type Metadata = AzureBlobMetadata;
29    type Events = Vec<Event>;
30    type Encoder = (Transformer, Encoder<Framer>);
31    type Payload = Bytes;
32    type Request = AzureBlobRequest;
33    type Error = std::io::Error;
34
35    fn compression(&self) -> Compression {
36        self.compression
37    }
38
39    fn encoder(&self) -> &Self::Encoder {
40        &self.encoder
41    }
42
43    fn split_input(
44        &self,
45        input: (String, Vec<Event>),
46    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
47        let (partition_key, mut events) = input;
48        let finalizers = events.take_finalizers();
49        let azure_metadata = AzureBlobMetadata {
50            partition_key,
51            count: events.len(),
52            finalizers,
53        };
54
55        let builder = RequestMetadataBuilder::from_events(&events);
56
57        (azure_metadata, builder, events)
58    }
59
60    fn build_request(
61        &self,
62        mut azure_metadata: Self::Metadata,
63        request_metadata: RequestMetadata,
64        payload: EncodeResult<Self::Payload>,
65    ) -> Self::Request {
66        let formatted_ts = Utc::now().format(self.blob_time_format.as_str());
67        let blob_name = if self.blob_append_uuid {
68            format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
69        } else {
70            formatted_ts.to_string()
71        };
72
73        let extension = self.compression.extension();
74        azure_metadata.partition_key = format!(
75            "{}{}.{}",
76            azure_metadata.partition_key, blob_name, extension
77        );
78
79        let blob_data = payload.into_payload();
80
81        debug!(
82            message = "Sending events.",
83            bytes = ?blob_data.len(),
84            events_len = ?azure_metadata.count,
85            blob = ?azure_metadata.partition_key,
86            container = ?self.container_name,
87        );
88
89        AzureBlobRequest {
90            blob_data,
91            content_encoding: self.compression.content_encoding(),
92            content_type: self.encoder.1.content_type(),
93            metadata: azure_metadata,
94            request_metadata,
95        }
96    }
97}