vector_buffers/lib.rs
1//! The Vector Core buffer
2//!
3//! This library implements a channel like functionality, one variant which is
4//! solely in-memory and the other that is on-disk. Both variants are bounded.
5
6#![deny(warnings)]
7#![deny(clippy::all)]
8#![deny(clippy::pedantic)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::type_complexity)] // long-types happen, especially in async code
11#![allow(clippy::must_use_candidate)]
12#![allow(async_fn_in_trait)]
13
14#[macro_use]
15extern crate tracing;
16
17mod buffer_usage_data;
18
19pub mod config;
20pub use config::{BufferConfig, BufferType, MemoryBufferSize};
21use encoding::Encodable;
22pub(crate) use vector_common::Result;
23use vector_config::configurable_component;
24
25pub mod encoding;
26
27mod internal_events;
28
29#[cfg(test)]
30pub mod test;
31pub mod topology;
32
33pub(crate) mod variants;
34
35/// `disk_v2`'s write-buffer size, re-exported under `test` so the harness can
36/// size payloads against the real value instead of hardcoding it.
37#[cfg(feature = "test")]
38pub use variants::disk_v2::common::DEFAULT_WRITE_BUFFER_SIZE as WRITE_BUFFER_SIZE_V2;
39
40use std::fmt::Debug;
41
42#[cfg(test)]
43use quickcheck::{Arbitrary, Gen};
44use vector_common::{byte_size_of::ByteSizeOf, finalization::AddBatchNotifier};
45
46/// Event handling behavior when a buffer is full.
47#[configurable_component]
48#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
49#[serde(rename_all = "snake_case")]
50pub enum WhenFull {
51 /// Wait for free space in the buffer.
52 ///
53 /// This applies backpressure up the topology, signalling that sources should slow down
54 /// the acceptance/consumption of events. This means that while no data is lost, data will pile
55 /// up at the edge.
56 #[default]
57 Block,
58
59 /// Drops the event instead of waiting for free space in buffer.
60 ///
61 /// The event will be intentionally dropped. This mode is typically used when performance is the
62 /// highest priority, and it is preferable to temporarily lose events rather than cause a
63 /// slowdown in the acceptance/consumption of events.
64 DropNewest,
65
66 /// Overflows to the next stage in the buffer topology.
67 ///
68 /// If the current buffer stage is full, attempt to send this event to the next buffer stage.
69 /// That stage may also be configured overflow, and so on, but ultimately the last stage in a
70 /// buffer topology must use one of the other handling behaviors. This means that next stage may
71 /// potentially be able to buffer the event, but it may also block or drop the event.
72 ///
73 /// This mode can only be used when two or more buffer stages are configured.
74 #[configurable(metadata(docs::hidden))]
75 Overflow,
76}
77
78#[cfg(test)]
79impl Arbitrary for WhenFull {
80 fn arbitrary(g: &mut Gen) -> Self {
81 // TODO: We explicitly avoid generating "overflow" as a possible value because nothing yet
82 // supports handling it, and will be defaulted to using "block" if they encounter
83 // "overflow". Thus, there's no reason to emit it here... yet.
84 if bool::arbitrary(g) {
85 WhenFull::Block
86 } else {
87 WhenFull::DropNewest
88 }
89 }
90}
91
92/// An item that can be buffered in memory.
93///
94/// This supertrait serves as the base trait for any item that can be pushed into a memory buffer.
95/// It is a relaxed version of `Bufferable` that allows for items that are not `Encodable` (e.g., `Instant`),
96/// which is an unnecessary constraint for memory buffers.
97pub trait InMemoryBufferable:
98 AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
99{
100}
101
102// Blanket implementation for anything that is already in-memory bufferable.
103impl<T> InMemoryBufferable for T where
104 T: AddBatchNotifier + ByteSizeOf + EventCount + Debug + Send + Sync + Unpin + Sized + 'static
105{
106}
107
108/// An item that can be buffered.
109///
110/// This supertrait serves as the base trait for any item that can be pushed into a buffer.
111pub trait Bufferable: InMemoryBufferable + Encodable {}
112
113// Blanket implementation for anything that is already bufferable.
114impl<T> Bufferable for T where T: InMemoryBufferable + Encodable {}
115
116/// Hook for observing items as they are sent into a `BufferSender`.
117pub trait BufferInstrumentation<T: Bufferable>: Send + Sync + 'static {
118 /// Called immediately before the item is emitted to the underlying buffer.
119 /// The underlying type is stored in an `Arc`, so we cannot have `&mut self`.
120 fn on_send(&self, item: &mut T);
121}
122
123pub trait EventCount {
124 fn event_count(&self) -> usize;
125}
126
127impl<T> EventCount for Vec<T>
128where
129 T: EventCount,
130{
131 fn event_count(&self) -> usize {
132 self.iter().map(EventCount::event_count).sum()
133 }
134}
135
136impl<T> EventCount for &T
137where
138 T: EventCount,
139{
140 fn event_count(&self) -> usize {
141 (*self).event_count()
142 }
143}
144
145#[track_caller]
146pub(crate) fn spawn_named<T>(
147 task: impl std::future::Future<Output = T> + Send + 'static,
148 _name: &str,
149) -> tokio::task::JoinHandle<T>
150where
151 T: Send + 'static,
152{
153 #[cfg(tokio_unstable)]
154 return tokio::task::Builder::new()
155 .name(_name)
156 .spawn(task)
157 .expect("tokio task should spawn");
158
159 #[cfg(not(tokio_unstable))]
160 tokio::spawn(task)
161}