vector/transforms/throttle/
transform.rs1use std::{hash::Hash, num::NonZeroU32, pin::Pin, time::Duration};
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5use governor::{Quota, clock};
6use metrics::Counter;
7use snafu::Snafu;
8
9use super::{
10 config::{ThrottleConfig, ThrottleInternalMetricsConfig},
11 rate_limiter::RateLimiterRunner,
12};
13use crate::{
14 conditions::Condition,
15 config::TransformContext,
16 event::Event,
17 internal_events::{TemplateRenderingError, ThrottleEventDiscarded},
18 template::Template,
19 transforms::TaskTransform,
20};
21
22#[derive(Clone)]
23pub struct Throttle<C: clock::Clock<Instant = I>, I: clock::Reference> {
24 pub quota: Quota,
25 pub flush_keys_interval: Duration,
26 key_field: Option<Template>,
27 exclude: Option<Condition>,
28 pub clock: C,
29 internal_metrics: ThrottleInternalMetricsConfig,
30 pub cpu_ns: Option<Counter>,
31}
32
33impl<C, I> Throttle<C, I>
34where
35 C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
36 I: clock::Reference,
37{
38 pub fn new(
39 config: &ThrottleConfig,
40 context: &TransformContext,
41 clock: C,
42 ) -> crate::Result<Self> {
43 let flush_keys_interval = config.window_secs;
44
45 let threshold = match NonZeroU32::new(config.threshold) {
46 Some(threshold) => threshold,
47 None => return Err(Box::new(ConfigError::NonZero)),
48 };
49
50 let quota = match Quota::with_period(Duration::from_secs_f64(
51 flush_keys_interval.as_secs_f64() / f64::from(threshold.get()),
52 )) {
53 Some(quota) => quota.allow_burst(threshold),
54 None => return Err(Box::new(ConfigError::NonZero)),
55 };
56 let exclude = config
57 .exclude
58 .as_ref()
59 .map(|condition| condition.build(&context.enrichment_tables, &context.metrics_storage))
60 .transpose()?;
61
62 Ok(Self {
63 quota,
64 clock,
65 flush_keys_interval,
66 key_field: config.key_field.clone(),
67 exclude,
68 internal_metrics: config.internal_metrics.clone(),
69 cpu_ns: context.cpu_ns.clone(),
70 })
71 }
72
73 #[must_use]
74 pub fn start_rate_limiter<K>(&self) -> RateLimiterRunner<K, C>
75 where
76 K: Hash + Eq + Clone + Send + Sync + 'static,
77 {
78 RateLimiterRunner::start(self)
79 }
80
81 pub fn emit_event_discarded(&self, key: String) {
82 emit!(ThrottleEventDiscarded {
83 key,
84 emit_events_discarded_per_key: self.internal_metrics.emit_events_discarded_per_key
85 });
86 }
87}
88
89impl<C, I> TaskTransform<Event> for Throttle<C, I>
90where
91 C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
92 I: clock::Reference + Send + 'static,
93{
94 fn transform(
95 self: Box<Self>,
96 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
97 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
98 where
99 Self: 'static,
100 {
101 let limiter = self.start_rate_limiter();
102
103 Box::pin(stream! {
104 while let Some(event) = input_rx.next().await {
105 let (throttle, event) = match self.exclude.as_ref() {
106 Some(condition) => {
107 let (result, event) = condition.check(event);
108 (!result, event)
109 },
110 _ => (true, event)
111 };
112 let output = if throttle {
113 let key = self.key_field.as_ref().and_then(|t| {
114 t.render_string(&event)
115 .map_err(|error| {
116 emit!(TemplateRenderingError {
117 error,
118 field: Some("key_field"),
119 drop_event: false,
120 })
121 })
122 .ok()
123 });
124
125 if limiter.check_key(&key) {
126 Some(event)
127 } else {
128 self.emit_event_discarded(key.unwrap_or_else(|| "None".to_string()));
129 None
130 }
131 } else {
132 Some(event)
133 };
134 if let Some(event) = output {
135 yield event;
136 }
137 }
138 })
139 }
140}
141
142#[derive(Debug, Snafu)]
143pub enum ConfigError {
144 #[snafu(display("`threshold`, and `window_secs` must be non-zero"))]
145 NonZero,
146}
147
148#[cfg(test)]
149mod tests {
150 use std::task::Poll;
151
152 use futures::SinkExt;
153 use indoc::indoc;
154 use tokio::sync::mpsc;
155 use tokio_stream::wrappers::ReceiverStream;
156
157 use super::*;
158 use crate::{
159 event::LogEvent,
160 test_util::components::assert_transform_compliance,
161 transforms::{Transform, test::create_topology},
162 };
163
164 #[tokio::test]
165 async fn throttle_events() {
166 let clock = clock::FakeRelativeClock::default();
167 let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {"
168 threshold: 2
169 window_secs: 5
170 "})
171 .unwrap();
172
173 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
174 .map(Transform::event_task)
175 .unwrap();
176
177 let throttle = throttle.into_task();
178
179 let (mut tx, rx) = futures::channel::mpsc::channel(10);
180 let mut out_stream = throttle.transform_events(Box::pin(rx));
181
182 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
185
186 tx.send(LogEvent::default().into()).await.unwrap();
187 tx.send(LogEvent::default().into()).await.unwrap();
188
189 let mut count = 0_u8;
190 while count < 2 {
191 match out_stream.next().await {
192 Some(_event) => {
193 count += 1;
194 }
195 _ => {
196 panic!("Unexpectedly received None in output stream");
197 }
198 }
199 }
200 assert_eq!(2, count);
201
202 clock.advance(Duration::from_secs(2));
203
204 tx.send(LogEvent::default().into()).await.unwrap();
205
206 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
208
209 clock.advance(Duration::from_secs(3));
210
211 tx.send(LogEvent::default().into()).await.unwrap();
212
213 match out_stream.next().await {
215 Some(_event) => {}
216 _ => {
217 panic!("Unexpectedly received None in output stream");
218 }
219 }
220
221 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
223
224 tx.disconnect();
225
226 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
228 }
229
230 #[tokio::test]
231 async fn throttle_exclude() {
232 let clock = clock::FakeRelativeClock::default();
233 let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {"
234 threshold: 2
235 window_secs: 5
236 exclude: \"exists(.special)\"
237 "})
238 .unwrap();
239
240 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
241 .map(Transform::event_task)
242 .unwrap();
243
244 let throttle = throttle.into_task();
245
246 let (mut tx, rx) = futures::channel::mpsc::channel(10);
247 let mut out_stream = throttle.transform_events(Box::pin(rx));
248
249 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
252
253 tx.send(LogEvent::default().into()).await.unwrap();
254 tx.send(LogEvent::default().into()).await.unwrap();
255
256 let mut count = 0_u8;
257 while count < 2 {
258 match out_stream.next().await {
259 Some(_event) => {
260 count += 1;
261 }
262 _ => {
263 panic!("Unexpectedly received None in output stream");
264 }
265 }
266 }
267 assert_eq!(2, count);
268
269 clock.advance(Duration::from_secs(2));
270
271 tx.send(LogEvent::default().into()).await.unwrap();
272
273 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
275
276 let mut special_log = LogEvent::default();
277 special_log.insert("special", "true");
278 tx.send(special_log.into()).await.unwrap();
279 match out_stream.next().await {
281 Some(_event) => {}
282 _ => {
283 panic!("Unexpectedly received None in output stream");
284 }
285 }
286
287 clock.advance(Duration::from_secs(3));
288
289 tx.send(LogEvent::default().into()).await.unwrap();
290
291 match out_stream.next().await {
293 Some(_event) => {}
294 _ => {
295 panic!("Unexpectedly received None in output stream");
296 }
297 }
298
299 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
301
302 tx.disconnect();
303
304 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
306 }
307
308 #[tokio::test]
309 async fn throttle_buckets() {
310 let clock = clock::FakeRelativeClock::default();
311 let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {r#"
312 threshold: 1
313 window_secs: 5
314 key_field: "{{ bucket }}"
315 "#})
316 .unwrap();
317
318 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
319 .map(Transform::event_task)
320 .unwrap();
321
322 let throttle = throttle.into_task();
323
324 let (mut tx, rx) = futures::channel::mpsc::channel(10);
325 let mut out_stream = throttle.transform_events(Box::pin(rx));
326
327 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
330
331 let mut log_a = LogEvent::default();
332 log_a.insert("bucket", "a");
333 let mut log_b = LogEvent::default();
334 log_b.insert("bucket", "b");
335 tx.send(log_a.into()).await.unwrap();
336 tx.send(log_b.into()).await.unwrap();
337
338 let mut count = 0_u8;
339 while count < 2 {
340 match out_stream.next().await {
341 Some(_event) => {
342 count += 1;
343 }
344 _ => {
345 panic!("Unexpectedly received None in output stream");
346 }
347 }
348 }
349 assert_eq!(2, count);
350
351 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
353
354 tx.disconnect();
355
356 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
358 }
359
360 #[tokio::test]
361 async fn emits_internal_events() {
362 assert_transform_compliance(async move {
363 let config = ThrottleConfig {
364 threshold: 1,
365 window_secs: Duration::from_secs_f64(1.0),
366 key_field: None,
367 exclude: None,
368 internal_metrics: Default::default(),
369 };
370 let (tx, rx) = mpsc::channel(1);
371 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
372
373 let log = LogEvent::from("hello world");
374 tx.send(log.into()).await.unwrap();
375
376 _ = out.recv().await;
377
378 drop(tx);
379 topology.stop().await;
380 assert_eq!(out.recv().await, None);
381 })
382 .await
383 }
384}