vector_buffers/topology/channel/
sender.rs1#![allow(clippy::let_underscore_must_use)]
3
4use std::{sync::Arc, time::Instant};
5
6use async_recursion::async_recursion;
7use derivative::Derivative;
8use tokio::sync::Mutex;
9use tracing::Span;
10use vector_common::internal_event::{InternalEventHandle, Registered, register};
11
12use super::limited_queue::LimitedSender;
13use crate::{
14 BufferInstrumentation, Bufferable, WhenFull,
15 buffer_usage_data::BufferUsageHandle,
16 internal_events::BufferSendDuration,
17 variants::disk_v2::{self, ProductionFilesystem},
18};
19
20#[derive(Clone, Debug)]
22pub enum SenderAdapter<T: Bufferable> {
23 InMemory(LimitedSender<T>),
25
26 DiskV2(Arc<Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>>),
28}
29
30impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {
31 fn from(v: LimitedSender<T>) -> Self {
32 Self::InMemory(v)
33 }
34}
35
36impl<T: Bufferable> From<disk_v2::BufferWriter<T, ProductionFilesystem>> for SenderAdapter<T> {
37 fn from(v: disk_v2::BufferWriter<T, ProductionFilesystem>) -> Self {
38 Self::DiskV2(Arc::new(Mutex::new(v)))
39 }
40}
41
42impl<T> SenderAdapter<T>
43where
44 T: Bufferable,
45{
46 pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> {
47 match self {
48 Self::InMemory(tx) => tx.send(item).await.map_err(Into::into),
49 Self::DiskV2(writer) => {
50 let mut writer = writer.lock().await;
51
52 writer.write_record(item).await.map(|_| ()).map_err(|e| {
53 error!("Disk buffer writer has encountered an unrecoverable error.");
59
60 e.into()
61 })
62 }
63 }
64 }
65
66 pub(crate) async fn try_send(&mut self, item: T) -> crate::Result<Option<T>> {
67 match self {
68 Self::InMemory(tx) => tx
69 .try_send(item)
70 .map(|()| None)
71 .or_else(|e| Ok(Some(e.into_inner()))),
72 Self::DiskV2(writer) => {
73 let mut writer = writer.lock().await;
74
75 writer.try_write_record(item).await.map_err(|e| {
76 error!("Disk buffer writer has encountered an unrecoverable error.");
82
83 e.into()
84 })
85 }
86 }
87 }
88
89 pub(crate) async fn flush(&mut self) -> crate::Result<()> {
90 match self {
91 Self::InMemory(_) => Ok(()),
92 Self::DiskV2(writer) => {
93 let mut writer = writer.lock().await;
94 writer.flush().await.map_err(|e| {
95 error!("Disk buffer writer has encountered an unrecoverable error.");
97
98 e.into()
99 })
100 }
101 }
102 }
103
104 pub fn capacity(&self) -> Option<usize> {
105 match self {
106 Self::InMemory(tx) => Some(tx.available_capacity()),
107 Self::DiskV2(_) => None,
108 }
109 }
110}
111
112#[derive(Clone, Derivative)]
135#[derivative(Debug)]
136pub struct BufferSender<T: Bufferable> {
137 base: SenderAdapter<T>,
138 overflow: Option<Box<BufferSender<T>>>,
139 when_full: WhenFull,
140 usage_instrumentation: Option<BufferUsageHandle>,
141 #[derivative(Debug = "ignore")]
142 send_duration: Option<Registered<BufferSendDuration>>,
143 #[derivative(Debug = "ignore")]
144 custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
145}
146
147impl<T: Bufferable> BufferSender<T> {
148 pub fn new(base: SenderAdapter<T>, when_full: WhenFull) -> Self {
150 Self {
151 base,
152 overflow: None,
153 when_full,
154 usage_instrumentation: None,
155 send_duration: None,
156 custom_instrumentation: None,
157 }
158 }
159
160 pub fn with_overflow(base: SenderAdapter<T>, overflow: BufferSender<T>) -> Self {
162 Self {
163 base,
164 overflow: Some(Box::new(overflow)),
165 when_full: WhenFull::Overflow,
166 usage_instrumentation: None,
167 send_duration: None,
168 custom_instrumentation: None,
169 }
170 }
171
172 #[cfg(test)]
177 pub fn switch_to_overflow(&mut self, overflow: BufferSender<T>) {
178 self.overflow = Some(Box::new(overflow));
179 self.when_full = WhenFull::Overflow;
180 }
181
182 pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
184 self.usage_instrumentation = Some(handle);
185 }
186
187 pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
189 let _enter = span.enter();
190 self.send_duration = Some(register(BufferSendDuration { stage }));
191 }
192
193 pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
195 self.custom_instrumentation = Some(Arc::new(instrumentation));
196 }
197}
198
199impl<T: Bufferable> BufferSender<T> {
200 #[cfg(test)]
201 pub(crate) fn get_base_ref(&self) -> &SenderAdapter<T> {
202 &self.base
203 }
204
205 #[cfg(test)]
206 pub(crate) fn get_overflow_ref(&self) -> Option<&BufferSender<T>> {
207 self.overflow.as_ref().map(AsRef::as_ref)
208 }
209
210 #[async_recursion]
211 pub async fn send(
212 &mut self,
213 mut item: T,
214 send_reference: Option<Instant>,
215 ) -> crate::Result<()> {
216 if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
217 instrumentation.on_send(&mut item);
218 }
219 let item_sizing = self
220 .usage_instrumentation
221 .as_ref()
222 .map(|_| (item.event_count(), item.size_of()));
223
224 let mut was_dropped = false;
225
226 if let Some(instrumentation) = self.usage_instrumentation.as_ref()
227 && let Some((item_count, item_size)) = item_sizing
228 {
229 instrumentation
230 .increment_received_event_count_and_byte_size(item_count as u64, item_size as u64);
231 }
232 match self.when_full {
233 WhenFull::Block => self.base.send(item).await?,
234 WhenFull::DropNewest => {
235 if self.base.try_send(item).await?.is_some() {
236 was_dropped = true;
237 }
238 }
239 WhenFull::Overflow => {
240 if let Some(item) = self.base.try_send(item).await? {
241 was_dropped = true;
242 self.overflow
243 .as_mut()
244 .unwrap_or_else(|| unreachable!("overflow must exist"))
245 .send(item, send_reference)
246 .await?;
247 }
248 }
249 }
250
251 if let Some(instrumentation) = self.usage_instrumentation.as_ref()
252 && let Some((item_count, item_size)) = item_sizing
253 && was_dropped
254 {
255 instrumentation.increment_dropped_event_count_and_byte_size(
256 item_count as u64,
257 item_size as u64,
258 true,
259 );
260 }
261 if let Some(send_duration) = self.send_duration.as_ref()
262 && let Some(send_reference) = send_reference
263 {
264 send_duration.emit(send_reference.elapsed());
265 }
266
267 Ok(())
268 }
269
270 #[async_recursion]
271 pub async fn flush(&mut self) -> crate::Result<()> {
272 self.base.flush().await?;
273 if let Some(overflow) = self.overflow.as_mut() {
274 overflow.flush().await?;
275 }
276
277 Ok(())
278 }
279}