vector/sinks/azure_blob/
request_builder.rs1use bytes::Bytes;
2use chrono::Utc;
3use uuid::Uuid;
4use vector_lib::{codecs::encoding::Framer, request_metadata::RequestMetadata};
5
6use crate::{
7 codecs::{Encoder, Transformer},
8 event::{Event, Finalizable},
9 sinks::{
10 azure_blob::config::{AzureBlobMetadata, AzureBlobRequest},
11 util::{
12 Compression, RequestBuilder, metadata::RequestMetadataBuilder,
13 request_builder::EncodeResult,
14 },
15 },
16};
17
18#[derive(Clone)]
19pub struct AzureBlobRequestOptions {
20 pub container_name: String,
21 pub blob_time_format: String,
22 pub blob_append_uuid: bool,
23 pub encoder: (Transformer, Encoder<Framer>),
24 pub compression: Compression,
25}
26
27impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {
28 type Metadata = AzureBlobMetadata;
29 type Events = Vec<Event>;
30 type Encoder = (Transformer, Encoder<Framer>);
31 type Payload = Bytes;
32 type Request = AzureBlobRequest;
33 type Error = std::io::Error;
34
35 fn compression(&self) -> Compression {
36 self.compression
37 }
38
39 fn encoder(&self) -> &Self::Encoder {
40 &self.encoder
41 }
42
43 fn split_input(
44 &self,
45 input: (String, Vec<Event>),
46 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
47 let (partition_key, mut events) = input;
48 let finalizers = events.take_finalizers();
49 let azure_metadata = AzureBlobMetadata {
50 partition_key,
51 count: events.len(),
52 finalizers,
53 };
54
55 let builder = RequestMetadataBuilder::from_events(&events);
56
57 (azure_metadata, builder, events)
58 }
59
60 fn build_request(
61 &self,
62 mut azure_metadata: Self::Metadata,
63 request_metadata: RequestMetadata,
64 payload: EncodeResult<Self::Payload>,
65 ) -> Self::Request {
66 let formatted_ts = Utc::now().format(self.blob_time_format.as_str());
67 let blob_name = if self.blob_append_uuid {
68 format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
69 } else {
70 formatted_ts.to_string()
71 };
72
73 let extension = self.compression.extension();
74 azure_metadata.partition_key = format!(
75 "{}{}.{}",
76 azure_metadata.partition_key, blob_name, extension
77 );
78
79 let blob_data = payload.into_payload();
80
81 debug!(
82 message = "Sending events.",
83 bytes = ?blob_data.len(),
84 events_len = ?azure_metadata.count,
85 blob = ?azure_metadata.partition_key,
86 container = ?self.container_name,
87 );
88
89 AzureBlobRequest {
90 blob_data,
91 content_encoding: self.compression.content_encoding(),
92 content_type: self.encoder.1.content_type(),
93 metadata: azure_metadata,
94 request_metadata,
95 }
96 }
97}