vector/cpu_time.rs
1//! Per-component CPU-time measurement primitives.
2//!
3//! This module provides the building blocks for attributing CPU time to
4//! individual Vector components at runtime:
5//!
6//! - [`ThreadTime`] — a lightweight snapshot of thread CPU time, backed by
7//! `CLOCK_THREAD_CPUTIME_ID` on Linux/macOS, `GetThreadTimes` on Windows,
8//! and a zero no-op on other platforms.
9//! - [`CpuTimedFuture`] / [`CpuTimedExt`] — a [`Future`] adapter that
10//! brackets every `poll` call with a [`ThreadTime`] sample and accumulates
11//! the delta into a [`metrics::Counter`].
12//! - [`spawn_timed`] — convenience wrapper that spawns a future on the
13//! current tokio runtime with optional CPU-time accounting attached.
14//! - `register_counter` — registers the `component_cpu_usage_ns_total` metrics
15//! counter for a component (available on Linux, macOS, and Windows only).
16
17use std::{
18 future::Future,
19 pin::Pin,
20 task::{Context, Poll},
21 time::Duration,
22};
23
24use metrics::Counter;
25use pin_project::pin_project;
26
27/// An opaque snapshot of thread CPU time.
28///
29/// On Linux and macOS this uses `CLOCK_THREAD_CPUTIME_ID`, which measures
30/// only the time the calling thread was actually scheduled on a CPU (true CPU
31/// time, excluding preemption and context switches to other threads/processes).
32///
33/// On Windows this uses `GetThreadTimes`, which provides the same guarantee
34/// with 100ns granularity.
35///
36/// On other platforms thread CPU time is unavailable; [`ThreadTime`] is a
37/// no-op that always reports zero elapsed time. The per-component CPU metric
38/// is omitted on those platforms (see [`register_counter`]) rather than
39/// emitted with misleading wall-clock or zero values.
40///
41/// # Usage
42///
43/// Call [`ThreadTime::now`] immediately before the work to measure, then call
44/// [`ThreadTime::elapsed`] immediately after:
45///
46/// ```ignore
47/// let t0 = ThreadTime::now();
48/// do_work();
49/// let cpu_time = t0.elapsed();
50/// ```
51///
52/// # Correctness for sync transforms
53///
54/// This measurement is accurate for [`crate::transforms::SyncTransform`]
55/// because `transform_all` is synchronous and non-yielding: between the two
56/// measurement points the worker thread runs only transform code, with no
57/// `.await` points that could interleave other tokio tasks.
58pub struct ThreadTime(Inner);
59
60impl ThreadTime {
61 /// Captures the current thread CPU time.
62 #[inline]
63 pub fn now() -> Self {
64 ThreadTime(Inner::now())
65 }
66
67 /// Returns the CPU time elapsed since this snapshot was taken.
68 #[inline]
69 pub fn elapsed(&self) -> Duration {
70 self.0.elapsed()
71 }
72}
73
74// ── Linux / macOS: CLOCK_THREAD_CPUTIME_ID ────────────────────────────────
75
76#[cfg(any(target_os = "linux", target_os = "macos"))]
77struct Inner(Duration);
78
79#[cfg(any(target_os = "linux", target_os = "macos"))]
80impl Inner {
81 fn now() -> Self {
82 let mut ts = libc::timespec {
83 tv_sec: 0,
84 tv_nsec: 0,
85 };
86 // SAFETY:
87 // - `ts` is a valid, zero-initialised `timespec` on the stack.
88 // - `CLOCK_THREAD_CPUTIME_ID` is a valid clock ID on Linux ≥ 2.6 and
89 // macOS ≥ 10.12 (both are baseline requirements for Vector).
90 // - The return value is intentionally ignored: the only failure modes
91 // are an invalid clock ID (not the case here) or an invalid pointer
92 // (not the case here), neither of which can occur.
93 unsafe {
94 libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts);
95 }
96 Inner(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32))
97 }
98
99 #[inline]
100 fn elapsed(&self) -> Duration {
101 Self::now().0.saturating_sub(self.0)
102 }
103}
104
105// ── Windows: GetThreadTimes ───────────────────────────────────────────────
106
107#[cfg(target_os = "windows")]
108struct Inner(Duration);
109
110#[cfg(target_os = "windows")]
111impl Inner {
112 fn now() -> Self {
113 use windows_sys::Win32::Foundation::FILETIME;
114 use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes};
115
116 let mut creation = FILETIME {
117 dwLowDateTime: 0,
118 dwHighDateTime: 0,
119 };
120 let mut exit = FILETIME {
121 dwLowDateTime: 0,
122 dwHighDateTime: 0,
123 };
124 let mut kernel = FILETIME {
125 dwLowDateTime: 0,
126 dwHighDateTime: 0,
127 };
128 let mut user = FILETIME {
129 dwLowDateTime: 0,
130 dwHighDateTime: 0,
131 };
132
133 // SAFETY:
134 // - `GetCurrentThread()` returns a pseudo-handle that is always valid
135 // and does not need to be closed.
136 // - All four `FILETIME` pointers are valid, properly aligned, and
137 // stack-allocated.
138 // - The return value is intentionally ignored: failure is only possible
139 // with an invalid handle, which cannot occur with `GetCurrentThread()`.
140 unsafe {
141 GetThreadTimes(
142 GetCurrentThread(),
143 &mut creation,
144 &mut exit,
145 &mut kernel,
146 &mut user,
147 );
148 }
149
150 // Combine the low/high halves of each FILETIME into a u64, then sum
151 // kernel + user. FILETIME units are 100-nanosecond intervals.
152 let kernel_ns = filetime_to_nanos(kernel);
153 let user_ns = filetime_to_nanos(user);
154 Inner(Duration::from_nanos(kernel_ns + user_ns))
155 }
156
157 #[inline]
158 fn elapsed(&self) -> Duration {
159 Self::now().0.saturating_sub(self.0)
160 }
161}
162
163#[cfg(target_os = "windows")]
164#[inline]
165fn filetime_to_nanos(ft: windows_sys::Win32::Foundation::FILETIME) -> u64 {
166 let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64);
167 ticks * 100 // convert 100ns intervals to nanoseconds
168}
169
170// ── Other platforms: no-op (metric is not emitted on these platforms) ─────
171
172#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
173struct Inner;
174
175#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
176impl Inner {
177 #[inline]
178 fn now() -> Self {
179 Inner
180 }
181
182 #[inline]
183 fn elapsed(&self) -> Duration {
184 Duration::ZERO
185 }
186}
187
188// ── CpuTimedFuture: per-poll CPU time accumulator ─────────────────────────
189
190/// A [`Future`] adapter that accumulates thread CPU time across every `poll`.
191///
192/// Each call to [`Future::poll`] is bracketed by a [`ThreadTime`] sample;
193/// the delta is added to `counter`. Tokio's executor cannot migrate a task
194/// to another worker thread or run another task on the current thread between
195/// the entry and exit of a single `poll`, so each delta is a clean per-thread
196/// CPU-time measurement of the wrapped future's work for that poll. Multiple
197/// polls (across `Pending` returns and wake-ups) accumulate into the same
198/// counter, with each poll independently sampling the thread it ran on.
199///
200/// This is the per-task analogue of tokio's unstable
201/// `on_before_task_poll` / `on_after_task_poll` runtime hooks: it hooks the
202/// same boundary, but on a single future rather than the whole runtime, and
203/// it works on stable Rust without `--cfg tokio_unstable`.
204///
205/// # Measurement scope and upstream isolation
206///
207/// Vector components communicate only through `BufferReceiver`/`BufferSender`
208/// channels — never through stream combinators chained across component
209/// boundaries. Each component runs in its own tokio task. When a transform
210/// polls its input channel, it dequeues items that the upstream component
211/// computed earlier, in its own task; it does **not** execute any upstream
212/// code. The upstream's CPU was already charged to the upstream's counter at
213/// the time those items were produced. This holds even when the channel is
214/// always full: the items were produced by upstream CPU that was already
215/// counted upstream; we are only dequeuing them.
216///
217/// As a consequence, this counter for a transform task **includes**:
218///
219/// - Input-channel dequeue (our task's poll of the channel, not upstream work)
220/// - `on_events_received` bookkeeping and metric emit
221/// - `transform_all` (the core CPU cost)
222/// - `send_outputs` / fanout dispatch to downstream channels
223/// - Per-event schema validation and latency recording
224/// - For transforms that spawn helper tasks (e.g. `aws_ec2_metadata` IMDS
225/// refresh, `throttle`'s flush loop): those tasks' polls, via
226/// [`spawn_timed`], feed the **same** counter rather than being silently
227/// excluded.
228///
229/// And **does not** include:
230///
231/// - Other components' CPU — channel isolation guarantees this.
232/// - Time the task is parked (`Poll::Pending`): no polls → no measurement.
233/// Back-pressure and input starvation show up as flat counter growth.
234/// - `Drop` of the inner future after the final `Poll::Ready`. The drop runs
235/// after `CpuTimedFuture::poll` returns, so it is outside the timed window.
236/// This is a one-time cost at task shutdown, not steady-state.
237/// - Tokio scheduler and waker overhead — executor work, not component work.
238///
239/// # Concurrent sync transforms
240///
241/// For transforms that run concurrently (`enable_concurrency() == true`), both
242/// the driver future **and** each per-batch spawned task are wrapped with this
243/// adapter. Because the spawned tasks are separate tokio tasks, the driver's
244/// `CpuTimedFuture` never observes their polls — there is no double-counting.
245/// The driver is measured for: input dequeue, `on_events_received`, and
246/// `send_outputs`. Each spawned task is measured for: `transform_all`.
247///
248/// Construct it via [`CpuTimedExt::cpu_timed`].
249#[pin_project]
250pub struct CpuTimedFuture<F> {
251 #[pin]
252 inner: F,
253 counter: Counter,
254}
255
256impl<F: Future> Future for CpuTimedFuture<F> {
257 type Output = F::Output;
258
259 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
260 let t0 = ThreadTime::now();
261 let this = self.project();
262 let result = this.inner.poll(cx);
263 this.counter.increment(t0.elapsed().as_nanos() as u64);
264 result
265 }
266}
267
268/// Extension trait that wraps a future in [`CpuTimedFuture`] via a chained
269/// call:
270///
271/// ```ignore
272/// async move { /* work */ }.cpu_timed(counter)
273/// ```
274///
275/// Mirrors the style of [`tracing::Instrument::in_current_span`].
276pub trait CpuTimedExt: Future + Sized {
277 /// Wraps this future in a [`CpuTimedFuture`] that increments `counter` by
278 /// the thread CPU time consumed on each `poll`.
279 fn cpu_timed(self, counter: Counter) -> CpuTimedFuture<Self> {
280 CpuTimedFuture {
281 inner: self,
282 counter,
283 }
284 }
285}
286
287impl<F: Future> CpuTimedExt for F {}
288
289/// Spawns `future` on the current tokio runtime, attaching CPU-time
290/// accounting when `counter` is [`Some`]. When [`None`], the future is
291/// spawned as-is with no per-poll overhead.
292///
293/// Equivalent to:
294///
295/// ```ignore
296/// // Some(counter):
297/// crate::spawn_in_current_span(future.cpu_timed(counter))
298/// // None:
299/// crate::spawn_in_current_span(future)
300/// ```
301///
302/// Use this when spawning background tasks (e.g. a transform's housekeeping
303/// loop) whose CPU usage should be attributed back to a component. Wrap the
304/// future with [`tracing::Instrument`] (or similar adapters) before passing
305/// it in if you want those adapters' per-poll work included in the CPU-time
306/// measurement.
307///
308/// The current tracing span is attached to the spawned task.
309pub fn spawn_timed<F>(future: F, counter: Option<Counter>) -> tokio::task::JoinHandle<F::Output>
310where
311 F: Future + Send + 'static,
312 F::Output: Send + 'static,
313{
314 match counter {
315 Some(c) => crate::spawn_in_current_span(future.cpu_timed(c)),
316 None => crate::spawn_in_current_span(future),
317 }
318}
319
320/// Registers the `component_cpu_usage_ns_total` counter for the calling
321/// component on platforms where thread CPU time is available (Linux, macOS,
322/// Windows). On other platforms it returns [`Counter::noop()`] — the metric
323/// is **not** emitted at all, rather than reporting wall-clock or zero values
324/// that would be misleading to compare against supported platforms.
325///
326/// Call this inside a tracing span that carries `component_id`,
327/// `component_kind`, and `component_type` labels so that those labels are
328/// automatically attached to the registered counter by the metrics recorder.
329///
330/// # Using the emitted counter
331///
332/// The counter is cumulative nanoseconds of CPU time. To derive the average
333/// number of CPU cores a component consumed over a window:
334///
335/// ```promql
336/// rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9
337/// ```
338///
339/// This value can exceed 1.0 when a transform genuinely uses multiple cores
340/// (concurrent execution path). Compare against `utilization` to separate
341/// CPU cost from pipeline back-pressure.
342#[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))]
343pub fn register_counter() -> Counter {
344 vector_lib::counter!(vector_lib::internal_event::CounterName::ComponentCpuUsageNsTotal)
345}
346
347/// Registers the `component_cpu_usage_ns_total` counter for the calling
348/// component on platforms where thread CPU time is available (Linux, macOS,
349/// Windows). On other platforms it returns [`Counter::noop()`] — the metric
350/// is **not** emitted at all, rather than reporting wall-clock or zero values
351/// that would be misleading to compare against supported platforms.
352#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
353pub fn register_counter() -> Counter {
354 Counter::noop()
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn elapsed_is_non_negative() {
363 let t0 = ThreadTime::now();
364 // Burn a small amount of CPU to ensure the clock advances.
365 let _: u64 = (0u64..10_000).sum();
366 assert!(t0.elapsed() >= Duration::ZERO);
367 }
368
369 #[test]
370 fn elapsed_is_monotone() {
371 // Two consecutive elapsed() calls on the same snapshot must be
372 // non-decreasing (the clock never goes backwards).
373 let t0 = ThreadTime::now();
374 let _: u64 = (0u64..10_000).sum();
375 let first = t0.elapsed();
376 let _: u64 = (0u64..10_000).sum();
377 let second = t0.elapsed();
378 assert!(
379 second >= first,
380 "clock went backwards: {second:?} < {first:?}"
381 );
382 }
383}