vector/transforms/throttle/
rate_limiter.rs1use std::{hash::Hash, sync::Arc};
2
3use governor::{RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore};
4use tokio;
5
6use super::transform::Throttle;
7use crate::cpu_time::spawn_timed;
8
9pub struct RateLimiterRunner<K, C>
12where
13 K: Hash + Eq + Clone,
14 C: clock::Clock,
15{
16 pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
17 flush_handle: tokio::task::JoinHandle<()>,
18}
19
20impl<K, C> RateLimiterRunner<K, C>
21where
22 K: Hash + Eq + Clone + Send + Sync + 'static,
23 C: clock::Clock + Clone + Send + Sync + 'static,
24{
25 pub fn start(throttle: &Throttle<C, C::Instant>) -> Self {
26 let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(
27 throttle.quota,
28 throttle.clock.clone(),
29 ));
30
31 let flush_keys_interval = throttle.flush_keys_interval;
32 let cpu_ns = throttle.cpu_ns.clone();
33
34 let rate_limiter_clone = Arc::clone(&rate_limiter);
35 let flush_handle = spawn_timed(
38 async move {
39 let mut interval = tokio::time::interval(flush_keys_interval);
40 loop {
41 interval.tick().await;
42 rate_limiter_clone.retain_recent();
43 }
44 },
45 cpu_ns,
46 );
47
48 Self {
49 rate_limiter,
50 flush_handle,
51 }
52 }
53
54 pub fn check_key(&self, key: &K) -> bool {
55 self.rate_limiter.check_key(key).is_ok()
56 }
57}
58
59impl<K, C> Drop for RateLimiterRunner<K, C>
60where
61 K: Hash + Eq + Clone,
62 C: clock::Clock,
63{
64 fn drop(&mut self) {
65 self.flush_handle.abort();
66 }
67}