Skip to main content

vector/transforms/throttle/
rate_limiter.rs

1use std::{hash::Hash, sync::Arc};
2
3use governor::{RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore};
4use tokio;
5
6use super::transform::Throttle;
7use crate::cpu_time::spawn_timed;
8
9/// Re-usable wrapper around the structs/type from the governor crate.
10/// Spawns a background task that periodically flushes keys that haven't been accessed recently.
11pub struct RateLimiterRunner<K, C>
12where
13    K: Hash + Eq + Clone,
14    C: clock::Clock,
15{
16    pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
17    flush_handle: tokio::task::JoinHandle<()>,
18}
19
20impl<K, C> RateLimiterRunner<K, C>
21where
22    K: Hash + Eq + Clone + Send + Sync + 'static,
23    C: clock::Clock + Clone + Send + Sync + 'static,
24{
25    pub fn start(throttle: &Throttle<C, C::Instant>) -> Self {
26        let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(
27            throttle.quota,
28            throttle.clock.clone(),
29        ));
30
31        let flush_keys_interval = throttle.flush_keys_interval;
32        let cpu_ns = throttle.cpu_ns.clone();
33
34        let rate_limiter_clone = Arc::clone(&rate_limiter);
35        // Hook the periodic key-flush task onto the component's CPU counter so
36        // its housekeeping work is attributed to this throttle transform.
37        let flush_handle = spawn_timed(
38            async move {
39                let mut interval = tokio::time::interval(flush_keys_interval);
40                loop {
41                    interval.tick().await;
42                    rate_limiter_clone.retain_recent();
43                }
44            },
45            cpu_ns,
46        );
47
48        Self {
49            rate_limiter,
50            flush_handle,
51        }
52    }
53
54    pub fn check_key(&self, key: &K) -> bool {
55        self.rate_limiter.check_key(key).is_ok()
56    }
57}
58
59impl<K, C> Drop for RateLimiterRunner<K, C>
60where
61    K: Hash + Eq + Clone,
62    C: clock::Clock,
63{
64    fn drop(&mut self) {
65        self.flush_handle.abort();
66    }
67}