vector/sinks/aws_kinesis/
sink.rs1use std::{borrow::Cow, fmt::Debug, marker::PhantomData};
2
3use rand::random;
4use vector_lib::lookup::lookup_v2::ConfigValuePath;
5use vrl::path::PathPrefix;
6
7use super::{
8 record::Record,
9 request_builder::{KinesisRequest, KinesisRequestBuilder},
10};
11use crate::{
12 internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
13 sinks::{
14 prelude::*,
15 util::{StreamSink, processed_event::ProcessedEvent},
16 },
17};
18
19pub type KinesisProcessedEvent = ProcessedEvent<LogEvent, KinesisKey>;
20
21#[derive(Debug, Clone, Hash, Eq, PartialEq)]
22pub struct KinesisKey {
23 pub partition_key: String,
24}
25
26#[derive(Clone)]
27pub struct KinesisSink<S, R> {
28 pub batch_settings: BatcherSettings,
29 pub service: S,
30 pub request_builder: KinesisRequestBuilder<R>,
31 pub partition_key_field: Option<ConfigValuePath>,
32 pub _phantom: PhantomData<R>,
33}
34
35impl<S, R> KinesisSink<S, R>
36where
37 S: Service<BatchKinesisRequest<R>> + Send + 'static,
38 S::Future: Send + 'static,
39 S::Response: DriverResponse + Send + 'static,
40 S::Error: Debug + Into<crate::Error> + Send,
41 R: Record + Send + Sync + Unpin + Clone + 'static,
42{
43 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
44 let batch_settings = self.batch_settings;
45
46 input
47 .filter_map(|event| {
48 let log = event.into_log();
50 let processed = process_log(log, self.partition_key_field.as_ref());
51
52 future::ready(processed)
53 })
54 .request_builder(
55 default_request_builder_concurrency_limit(),
56 self.request_builder,
57 )
58 .filter_map(|request| async move {
59 match request {
60 Err(error) => {
61 emit!(SinkRequestBuildError { error });
62 None
63 }
64 Ok(req) => Some(req),
65 }
66 })
67 .batched(batch_settings.as_byte_size_config())
68 .map(|events| {
69 let metadata = RequestMetadata::from_batch(
70 events.iter().map(|req| req.get_metadata().clone()),
71 );
72 BatchKinesisRequest { events, metadata }
73 })
74 .into_driver(self.service)
75 .run()
76 .await
77 }
78}
79
80#[async_trait]
81impl<S, R> StreamSink<Event> for KinesisSink<S, R>
82where
83 S: Service<BatchKinesisRequest<R>> + Send + 'static,
84 S::Future: Send + 'static,
85 S::Response: DriverResponse + Send + 'static,
86 S::Error: Debug + Into<crate::Error> + Send,
87 R: Record + Send + Sync + Unpin + Clone + 'static,
88{
89 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
90 self.run_inner(input).await
91 }
92}
93
94pub(crate) fn process_log(
101 log: LogEvent,
102 partition_key_field: Option<&ConfigValuePath>,
103) -> Option<KinesisProcessedEvent> {
104 let partition_key = if let Some(partition_key_field) = partition_key_field {
105 if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
106 v.to_string_lossy()
107 } else {
108 emit!(AwsKinesisStreamNoPartitionKeyError {
109 partition_key_field: partition_key_field.0.to_string().as_str()
110 });
111 return None;
112 }
113 } else {
114 Cow::Owned(gen_partition_key())
115 };
116 let partition_key = if partition_key.len() >= 256 {
117 #[expect(
118 clippy::string_slice,
119 reason = "floor_char_boundary guarantees a valid char boundary"
120 )]
121 let s = &partition_key[..partition_key.floor_char_boundary(256)];
122 s.to_string()
123 } else {
124 partition_key.into_owned()
125 };
126
127 Some(KinesisProcessedEvent {
128 event: log,
129 metadata: KinesisKey { partition_key },
130 })
131}
132
133fn gen_partition_key() -> String {
134 random::<[char; 16]>()
135 .iter()
136 .fold(String::new(), |mut s, c| {
137 s.push(*c);
138 s
139 })
140}
141
142pub struct BatchKinesisRequest<R>
143where
144 R: Record + Clone,
145{
146 pub events: Vec<KinesisRequest<R>>,
147 pub metadata: RequestMetadata,
148}
149
150impl<R> Clone for BatchKinesisRequest<R>
151where
152 R: Record + Clone,
153{
154 fn clone(&self) -> Self {
155 Self {
156 events: self.events.to_vec(),
157 metadata: self.metadata.clone(),
158 }
159 }
160}
161
162impl<R> Finalizable for BatchKinesisRequest<R>
163where
164 R: Record + Clone,
165{
166 fn take_finalizers(&mut self) -> EventFinalizers {
167 self.events.take_finalizers()
168 }
169}
170
171impl<R> MetaDescriptive for BatchKinesisRequest<R>
172where
173 R: Record + Clone,
174{
175 fn get_metadata(&self) -> &RequestMetadata {
176 &self.metadata
177 }
178
179 fn metadata_mut(&mut self) -> &mut RequestMetadata {
180 &mut self.metadata
181 }
182}