Skip to main content

vector_buffers/topology/channel/
sender.rs

1// Derivative's Debug impl generates 'let _ = field.fmt(f)' which triggers this lint.
2#![allow(clippy::let_underscore_must_use)]
3
4use std::{sync::Arc, time::Instant};
5
6use async_recursion::async_recursion;
7use derivative::Derivative;
8use tokio::sync::Mutex;
9use tracing::Span;
10use vector_common::internal_event::{InternalEventHandle, Registered, register};
11
12use super::limited_queue::LimitedSender;
13use crate::{
14    BufferInstrumentation, Bufferable, WhenFull,
15    buffer_usage_data::BufferUsageHandle,
16    internal_events::BufferSendDuration,
17    variants::disk_v2::{self, ProductionFilesystem},
18};
19
20/// Adapter for papering over various sender backends.
21#[derive(Clone, Debug)]
22pub enum SenderAdapter<T: Bufferable> {
23    /// The in-memory channel buffer.
24    InMemory(LimitedSender<T>),
25
26    /// The disk v2 buffer.
27    DiskV2(Arc<Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>>),
28}
29
30impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {
31    fn from(v: LimitedSender<T>) -> Self {
32        Self::InMemory(v)
33    }
34}
35
36impl<T: Bufferable> From<disk_v2::BufferWriter<T, ProductionFilesystem>> for SenderAdapter<T> {
37    fn from(v: disk_v2::BufferWriter<T, ProductionFilesystem>) -> Self {
38        Self::DiskV2(Arc::new(Mutex::new(v)))
39    }
40}
41
42impl<T> SenderAdapter<T>
43where
44    T: Bufferable,
45{
46    pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> {
47        match self {
48            Self::InMemory(tx) => tx.send(item).await.map_err(Into::into),
49            Self::DiskV2(writer) => {
50                let mut writer = writer.lock().await;
51
52                writer.write_record(item).await.map(|_| ()).map_err(|e| {
53                    // TODO: Could some errors be handled and not be unrecoverable? Right now,
54                    // encoding should theoretically be recoverable -- encoded value was too big, or
55                    // error during encoding -- but the traits don't allow for recovering the
56                    // original event value because we have to consume it to do the encoding... but
57                    // that might not always be the case.
58                    error!("Disk buffer writer has encountered an unrecoverable error.");
59
60                    e.into()
61                })
62            }
63        }
64    }
65
66    pub(crate) async fn try_send(&mut self, item: T) -> crate::Result<Option<T>> {
67        match self {
68            Self::InMemory(tx) => tx
69                .try_send(item)
70                .map(|()| None)
71                .or_else(|e| Ok(Some(e.into_inner()))),
72            Self::DiskV2(writer) => {
73                let mut writer = writer.lock().await;
74
75                writer.try_write_record(item).await.map_err(|e| {
76                    // TODO: Could some errors be handled and not be unrecoverable? Right now,
77                    // encoding should theoretically be recoverable -- encoded value was too big, or
78                    // error during encoding -- but the traits don't allow for recovering the
79                    // original event value because we have to consume it to do the encoding... but
80                    // that might not always be the case.
81                    error!("Disk buffer writer has encountered an unrecoverable error.");
82
83                    e.into()
84                })
85            }
86        }
87    }
88
89    pub(crate) async fn flush(&mut self) -> crate::Result<()> {
90        match self {
91            Self::InMemory(_) => Ok(()),
92            Self::DiskV2(writer) => {
93                let mut writer = writer.lock().await;
94                writer.flush().await.map_err(|e| {
95                    // Errors on the I/O path, which is all that flushing touches, are never recoverable.
96                    error!("Disk buffer writer has encountered an unrecoverable error.");
97
98                    e.into()
99                })
100            }
101        }
102    }
103
104    pub fn capacity(&self) -> Option<usize> {
105        match self {
106            Self::InMemory(tx) => Some(tx.available_capacity()),
107            Self::DiskV2(_) => None,
108        }
109    }
110}
111
112/// A buffer sender.
113///
114/// The sender handles sending events into the buffer, as well as the behavior around handling
115/// events when the internal channel is full.
116///
117/// When creating a buffer sender/receiver pair, callers can specify the "when full" behavior of the
118/// sender.  This controls how events are handled when the internal channel is full.  Three modes
119/// are possible:
120/// - block
121/// - drop newest
122/// - overflow
123///
124/// In "block" mode, callers are simply forced to wait until the channel has enough capacity to
125/// accept the event.  In "drop newest" mode, any event being sent when the channel is full will be
126/// dropped and proceed no further. In "overflow" mode, events will be sent to another buffer
127/// sender.  Callers can specify the overflow sender to use when constructing their buffers initially.
128///
129/// TODO: We should eventually rework `BufferSender`/`BufferReceiver` so that they contain a vector
130/// of the fields we already have here, but instead of cascading via calling into `overflow`, we'd
131/// linearize the nesting instead, so that `BufferSender` would only ever be calling the underlying
132/// `SenderAdapter` instances instead... which would let us get rid of the boxing and
133/// `#[async_recursion]` stuff.
134#[derive(Clone, Derivative)]
135#[derivative(Debug)]
136pub struct BufferSender<T: Bufferable> {
137    base: SenderAdapter<T>,
138    overflow: Option<Box<BufferSender<T>>>,
139    when_full: WhenFull,
140    usage_instrumentation: Option<BufferUsageHandle>,
141    #[derivative(Debug = "ignore")]
142    send_duration: Option<Registered<BufferSendDuration>>,
143    #[derivative(Debug = "ignore")]
144    custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
145}
146
147impl<T: Bufferable> BufferSender<T> {
148    /// Creates a new [`BufferSender`] wrapping the given channel sender.
149    pub fn new(base: SenderAdapter<T>, when_full: WhenFull) -> Self {
150        Self {
151            base,
152            overflow: None,
153            when_full,
154            usage_instrumentation: None,
155            send_duration: None,
156            custom_instrumentation: None,
157        }
158    }
159
160    /// Creates a new [`BufferSender`] wrapping the given channel sender and overflow sender.
161    pub fn with_overflow(base: SenderAdapter<T>, overflow: BufferSender<T>) -> Self {
162        Self {
163            base,
164            overflow: Some(Box::new(overflow)),
165            when_full: WhenFull::Overflow,
166            usage_instrumentation: None,
167            send_duration: None,
168            custom_instrumentation: None,
169        }
170    }
171
172    /// Converts this sender into an overflowing sender using the given `BufferSender<T>`.
173    ///
174    /// Note: this resets the internal state of this sender, and so this should not be called except
175    /// when initially constructing `BufferSender<T>`.
176    #[cfg(test)]
177    pub fn switch_to_overflow(&mut self, overflow: BufferSender<T>) {
178        self.overflow = Some(Box::new(overflow));
179        self.when_full = WhenFull::Overflow;
180    }
181
182    /// Configures this sender to instrument the items passing through it.
183    pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
184        self.usage_instrumentation = Some(handle);
185    }
186
187    /// Configures this sender to instrument the send duration.
188    pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
189        let _enter = span.enter();
190        self.send_duration = Some(register(BufferSendDuration { stage }));
191    }
192
193    /// Configures this sender to invoke a custom instrumentation hook.
194    pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
195        self.custom_instrumentation = Some(Arc::new(instrumentation));
196    }
197}
198
199impl<T: Bufferable> BufferSender<T> {
200    #[cfg(test)]
201    pub(crate) fn get_base_ref(&self) -> &SenderAdapter<T> {
202        &self.base
203    }
204
205    #[cfg(test)]
206    pub(crate) fn get_overflow_ref(&self) -> Option<&BufferSender<T>> {
207        self.overflow.as_ref().map(AsRef::as_ref)
208    }
209
210    #[async_recursion]
211    pub async fn send(
212        &mut self,
213        mut item: T,
214        send_reference: Option<Instant>,
215    ) -> crate::Result<()> {
216        if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
217            instrumentation.on_send(&mut item);
218        }
219        let item_sizing = self
220            .usage_instrumentation
221            .as_ref()
222            .map(|_| (item.event_count(), item.size_of()));
223
224        let mut was_dropped = false;
225
226        if let Some(instrumentation) = self.usage_instrumentation.as_ref()
227            && let Some((item_count, item_size)) = item_sizing
228        {
229            instrumentation
230                .increment_received_event_count_and_byte_size(item_count as u64, item_size as u64);
231        }
232        match self.when_full {
233            WhenFull::Block => self.base.send(item).await?,
234            WhenFull::DropNewest => {
235                if self.base.try_send(item).await?.is_some() {
236                    was_dropped = true;
237                }
238            }
239            WhenFull::Overflow => {
240                if let Some(item) = self.base.try_send(item).await? {
241                    was_dropped = true;
242                    self.overflow
243                        .as_mut()
244                        .unwrap_or_else(|| unreachable!("overflow must exist"))
245                        .send(item, send_reference)
246                        .await?;
247                }
248            }
249        }
250
251        if let Some(instrumentation) = self.usage_instrumentation.as_ref()
252            && let Some((item_count, item_size)) = item_sizing
253            && was_dropped
254        {
255            instrumentation.increment_dropped_event_count_and_byte_size(
256                item_count as u64,
257                item_size as u64,
258                true,
259            );
260        }
261        if let Some(send_duration) = self.send_duration.as_ref()
262            && let Some(send_reference) = send_reference
263        {
264            send_duration.emit(send_reference.elapsed());
265        }
266
267        Ok(())
268    }
269
270    #[async_recursion]
271    pub async fn flush(&mut self) -> crate::Result<()> {
272        self.base.flush().await?;
273        if let Some(overflow) = self.overflow.as_mut() {
274            overflow.flush().await?;
275        }
276
277        Ok(())
278    }
279}