Skip to main content

vector/transforms/throttle/
transform.rs

1use std::{hash::Hash, num::NonZeroU32, pin::Pin, time::Duration};
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5use governor::{Quota, clock};
6use metrics::Counter;
7use snafu::Snafu;
8
9use super::{
10    config::{ThrottleConfig, ThrottleInternalMetricsConfig},
11    rate_limiter::RateLimiterRunner,
12};
13use crate::{
14    conditions::Condition,
15    config::TransformContext,
16    event::Event,
17    internal_events::{TemplateRenderingError, ThrottleEventDiscarded},
18    template::Template,
19    transforms::TaskTransform,
20};
21
22#[derive(Clone)]
23pub struct Throttle<C: clock::Clock<Instant = I>, I: clock::Reference> {
24    pub quota: Quota,
25    pub flush_keys_interval: Duration,
26    key_field: Option<Template>,
27    exclude: Option<Condition>,
28    pub clock: C,
29    internal_metrics: ThrottleInternalMetricsConfig,
30    pub cpu_ns: Option<Counter>,
31}
32
33impl<C, I> Throttle<C, I>
34where
35    C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
36    I: clock::Reference,
37{
38    pub fn new(
39        config: &ThrottleConfig,
40        context: &TransformContext,
41        clock: C,
42    ) -> crate::Result<Self> {
43        let flush_keys_interval = config.window_secs;
44
45        let threshold = match NonZeroU32::new(config.threshold) {
46            Some(threshold) => threshold,
47            None => return Err(Box::new(ConfigError::NonZero)),
48        };
49
50        let quota = match Quota::with_period(Duration::from_secs_f64(
51            flush_keys_interval.as_secs_f64() / f64::from(threshold.get()),
52        )) {
53            Some(quota) => quota.allow_burst(threshold),
54            None => return Err(Box::new(ConfigError::NonZero)),
55        };
56        let exclude = config
57            .exclude
58            .as_ref()
59            .map(|condition| condition.build(&context.enrichment_tables, &context.metrics_storage))
60            .transpose()?;
61
62        Ok(Self {
63            quota,
64            clock,
65            flush_keys_interval,
66            key_field: config.key_field.clone(),
67            exclude,
68            internal_metrics: config.internal_metrics.clone(),
69            cpu_ns: context.cpu_ns.clone(),
70        })
71    }
72
73    #[must_use]
74    pub fn start_rate_limiter<K>(&self) -> RateLimiterRunner<K, C>
75    where
76        K: Hash + Eq + Clone + Send + Sync + 'static,
77    {
78        RateLimiterRunner::start(self)
79    }
80
81    pub fn emit_event_discarded(&self, key: String) {
82        emit!(ThrottleEventDiscarded {
83            key,
84            emit_events_discarded_per_key: self.internal_metrics.emit_events_discarded_per_key
85        });
86    }
87}
88
89impl<C, I> TaskTransform<Event> for Throttle<C, I>
90where
91    C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
92    I: clock::Reference + Send + 'static,
93{
94    fn transform(
95        self: Box<Self>,
96        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
97    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
98    where
99        Self: 'static,
100    {
101        let limiter = self.start_rate_limiter();
102
103        Box::pin(stream! {
104            while let Some(event) = input_rx.next().await {
105                let (throttle, event) = match self.exclude.as_ref() {
106                    Some(condition) => {
107                        let (result, event) = condition.check(event);
108                        (!result, event)
109                    },
110                    _ => (true, event)
111                };
112                let output = if throttle {
113                    let key = self.key_field.as_ref().and_then(|t| {
114                        t.render_string(&event)
115                            .map_err(|error| {
116                                emit!(TemplateRenderingError {
117                                    error,
118                                    field: Some("key_field"),
119                                    drop_event: false,
120                                })
121                            })
122                            .ok()
123                    });
124
125                    if limiter.check_key(&key) {
126                        Some(event)
127                    } else {
128                        self.emit_event_discarded(key.unwrap_or_else(|| "None".to_string()));
129                        None
130                    }
131                } else {
132                    Some(event)
133                };
134                if let Some(event) = output {
135                    yield event;
136                }
137            }
138        })
139    }
140}
141
142#[derive(Debug, Snafu)]
143pub enum ConfigError {
144    #[snafu(display("`threshold`, and `window_secs` must be non-zero"))]
145    NonZero,
146}
147
148#[cfg(test)]
149mod tests {
150    use std::task::Poll;
151
152    use futures::SinkExt;
153    use indoc::indoc;
154    use tokio::sync::mpsc;
155    use tokio_stream::wrappers::ReceiverStream;
156
157    use super::*;
158    use crate::{
159        event::LogEvent,
160        test_util::components::assert_transform_compliance,
161        transforms::{Transform, test::create_topology},
162    };
163
164    #[tokio::test]
165    async fn throttle_events() {
166        let clock = clock::FakeRelativeClock::default();
167        let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {"
168            threshold: 2
169            window_secs: 5
170        "})
171        .unwrap();
172
173        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
174            .map(Transform::event_task)
175            .unwrap();
176
177        let throttle = throttle.into_task();
178
179        let (mut tx, rx) = futures::channel::mpsc::channel(10);
180        let mut out_stream = throttle.transform_events(Box::pin(rx));
181
182        // tokio interval is always immediately ready, so we poll once to make sure
183        // we trip it/set the interval in the future
184        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
185
186        tx.send(LogEvent::default().into()).await.unwrap();
187        tx.send(LogEvent::default().into()).await.unwrap();
188
189        let mut count = 0_u8;
190        while count < 2 {
191            match out_stream.next().await {
192                Some(_event) => {
193                    count += 1;
194                }
195                _ => {
196                    panic!("Unexpectedly received None in output stream");
197                }
198            }
199        }
200        assert_eq!(2, count);
201
202        clock.advance(Duration::from_secs(2));
203
204        tx.send(LogEvent::default().into()).await.unwrap();
205
206        // We should be back to pending, having the second event dropped
207        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
208
209        clock.advance(Duration::from_secs(3));
210
211        tx.send(LogEvent::default().into()).await.unwrap();
212
213        // The rate limiter should now be refreshed and allow an additional event through
214        match out_stream.next().await {
215            Some(_event) => {}
216            _ => {
217                panic!("Unexpectedly received None in output stream");
218            }
219        }
220
221        // We should be back to pending, having nothing waiting for us
222        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
223
224        tx.disconnect();
225
226        // And still nothing there
227        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
228    }
229
230    #[tokio::test]
231    async fn throttle_exclude() {
232        let clock = clock::FakeRelativeClock::default();
233        let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {"
234            threshold: 2
235            window_secs: 5
236            exclude: \"exists(.special)\"
237        "})
238        .unwrap();
239
240        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
241            .map(Transform::event_task)
242            .unwrap();
243
244        let throttle = throttle.into_task();
245
246        let (mut tx, rx) = futures::channel::mpsc::channel(10);
247        let mut out_stream = throttle.transform_events(Box::pin(rx));
248
249        // tokio interval is always immediately ready, so we poll once to make sure
250        // we trip it/set the interval in the future
251        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
252
253        tx.send(LogEvent::default().into()).await.unwrap();
254        tx.send(LogEvent::default().into()).await.unwrap();
255
256        let mut count = 0_u8;
257        while count < 2 {
258            match out_stream.next().await {
259                Some(_event) => {
260                    count += 1;
261                }
262                _ => {
263                    panic!("Unexpectedly received None in output stream");
264                }
265            }
266        }
267        assert_eq!(2, count);
268
269        clock.advance(Duration::from_secs(2));
270
271        tx.send(LogEvent::default().into()).await.unwrap();
272
273        // We should be back to pending, having the second event dropped
274        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
275
276        let mut special_log = LogEvent::default();
277        special_log.insert("special", "true");
278        tx.send(special_log.into()).await.unwrap();
279        // The rate limiter should allow this log through regardless of current limit
280        match out_stream.next().await {
281            Some(_event) => {}
282            _ => {
283                panic!("Unexpectedly received None in output stream");
284            }
285        }
286
287        clock.advance(Duration::from_secs(3));
288
289        tx.send(LogEvent::default().into()).await.unwrap();
290
291        // The rate limiter should now be refreshed and allow an additional event through
292        match out_stream.next().await {
293            Some(_event) => {}
294            _ => {
295                panic!("Unexpectedly received None in output stream");
296            }
297        }
298
299        // We should be back to pending, having nothing waiting for us
300        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
301
302        tx.disconnect();
303
304        // And still nothing there
305        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
306    }
307
308    #[tokio::test]
309    async fn throttle_buckets() {
310        let clock = clock::FakeRelativeClock::default();
311        let config = serde_yaml::from_str::<ThrottleConfig>(indoc! {r#"
312            threshold: 1
313            window_secs: 5
314            key_field: "{{ bucket }}"
315        "#})
316        .unwrap();
317
318        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
319            .map(Transform::event_task)
320            .unwrap();
321
322        let throttle = throttle.into_task();
323
324        let (mut tx, rx) = futures::channel::mpsc::channel(10);
325        let mut out_stream = throttle.transform_events(Box::pin(rx));
326
327        // tokio interval is always immediately ready, so we poll once to make sure
328        // we trip it/set the interval in the future
329        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
330
331        let mut log_a = LogEvent::default();
332        log_a.insert("bucket", "a");
333        let mut log_b = LogEvent::default();
334        log_b.insert("bucket", "b");
335        tx.send(log_a.into()).await.unwrap();
336        tx.send(log_b.into()).await.unwrap();
337
338        let mut count = 0_u8;
339        while count < 2 {
340            match out_stream.next().await {
341                Some(_event) => {
342                    count += 1;
343                }
344                _ => {
345                    panic!("Unexpectedly received None in output stream");
346                }
347            }
348        }
349        assert_eq!(2, count);
350
351        // We should be back to pending, having nothing waiting for us
352        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
353
354        tx.disconnect();
355
356        // And still nothing there
357        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
358    }
359
360    #[tokio::test]
361    async fn emits_internal_events() {
362        assert_transform_compliance(async move {
363            let config = ThrottleConfig {
364                threshold: 1,
365                window_secs: Duration::from_secs_f64(1.0),
366                key_field: None,
367                exclude: None,
368                internal_metrics: Default::default(),
369            };
370            let (tx, rx) = mpsc::channel(1);
371            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
372
373            let log = LogEvent::from("hello world");
374            tx.send(log.into()).await.unwrap();
375
376            _ = out.recv().await;
377
378            drop(tx);
379            topology.stop().await;
380            assert_eq!(out.recv().await, None);
381        })
382        .await
383    }
384}