1#![allow(unsafe_op_in_unsafe_fn)] use std::{
4 sync::{Arc, Mutex, MutexGuard},
5 time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11 shallow_copy::CopyValue,
12 {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18 sync::broadcast::{Receiver, Sender},
19 time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23 ByteSizeOf, EstimatedJsonEncodedSizeOf,
24 config::LogNamespace,
25 enrichment::{Case, Condition, Error, IndexHandle, InternalError, Table},
26 event::{Event, EventStatus, Finalizable},
27 internal_event::{
28 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29 },
30 shutdown::ShutdownSignal,
31 sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37 SourceSender,
38 enrichment_tables::memory::{
39 MemoryConfig,
40 internal_events::{
41 MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42 MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43 MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44 },
45 },
46};
47
48#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51 value: String,
52 update_time: CopyValue<Instant>,
53 ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57 fn allocated_bytes(&self) -> usize {
58 self.value.size_of()
59 }
60}
61
62impl MemoryEntry {
63 pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, Error> {
64 let ttl = self
65 .ttl
66 .saturating_sub(now.duration_since(*self.update_time).as_secs());
67 Ok(ObjectMap::from([
68 (
69 KeyString::from("key"),
70 Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71 ),
72 (
73 KeyString::from("value"),
74 serde_json::from_str::<Value>(&self.value).map_err(|source| Error::Internal {
76 source: InternalError::FailedToDecode {
77 details: source.to_string(),
78 },
79 })?,
80 ),
81 (
82 KeyString::from("ttl"),
83 Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
84 ),
85 ]))
86 }
87
88 fn expired(&self, now: Instant) -> bool {
89 now.duration_since(*self.update_time).as_secs() > self.ttl
90 }
91}
92
93#[derive(Default)]
94struct MemoryMetadata {
95 byte_size: u64,
96}
97
98#[derive(Clone)]
100pub(super) struct MemoryEntryPair {
101 pub(super) key: String,
103 pub(super) entry: MemoryEntry,
105}
106
107pub(super) struct MemoryWriter {
109 pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
110 metadata: MemoryMetadata,
111}
112
113pub struct Memory {
115 read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
116 read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
117 pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
118 pub(super) config: MemoryConfig,
119 #[allow(dead_code)]
120 expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
121 expired_items_sender: Sender<Vec<MemoryEntryPair>>,
122}
123
124impl Memory {
125 pub fn new(config: MemoryConfig) -> Self {
127 let (read_handle, write_handle) = evmap::new();
128 let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
132 Self {
133 config,
134 read_handle_factory: read_handle.factory(),
135 read_handle: ThreadLocal::new(),
136 write_handle: Arc::new(Mutex::new(MemoryWriter {
137 write_handle,
138 metadata: MemoryMetadata::default(),
139 })),
140 expired_items_sender: expired_tx,
141 expired_items_receiver: expired_rx,
142 }
143 }
144
145 pub fn from_previous_state(
147 config: MemoryConfig,
148 prev_state: Box<dyn std::any::Any + Send + Sync>,
149 ) -> Self {
150 if let Ok(prev_memory) = prev_state.downcast::<Memory>() {
151 Self {
152 config,
153 read_handle_factory: prev_memory.read_handle_factory,
154 read_handle: prev_memory.read_handle,
155 write_handle: prev_memory.write_handle,
156 expired_items_sender: prev_memory.expired_items_sender,
157 expired_items_receiver: prev_memory.expired_items_receiver,
158 }
159 } else {
160 Self::new(config)
161 }
162 }
163
164 pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
165 self.read_handle
166 .get_or(|| self.read_handle_factory.handle())
167 }
168
169 pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
170 self.expired_items_sender.subscribe()
171 }
172
173 fn handle_value(&self, value: ObjectMap) {
174 let mut writer = self.write_handle.lock().expect("mutex poisoned");
175 let now = Instant::now();
176
177 for (k, value) in value.into_iter() {
178 let new_entry_key = String::from(k);
179 let Ok(v) = serde_json::to_string(&value) else {
180 emit!(MemoryEnrichmentTableInsertFailed {
181 key: &new_entry_key,
182 include_key_metric_tag: self.config.internal_metrics.include_key_tag
183 });
184 continue;
185 };
186 let new_entry = MemoryEntry {
187 value: v,
188 update_time: now.into(),
189 ttl: self
190 .config
191 .ttl_field
192 .path
193 .as_ref()
194 .and_then(|p| value.get(p))
195 .and_then(|v| v.as_integer())
196 .map(|v| v as u64)
197 .unwrap_or(self.config.ttl),
198 };
199 let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
200 if let Some(max_byte_size) = self.config.max_byte_size
201 && writer
202 .metadata
203 .byte_size
204 .saturating_add(new_entry_size as u64)
205 > max_byte_size
206 {
207 emit!(MemoryEnrichmentTableInsertFailed {
209 key: &new_entry_key,
210 include_key_metric_tag: self.config.internal_metrics.include_key_tag
211 });
212 continue;
213 }
214 writer.metadata.byte_size = writer
215 .metadata
216 .byte_size
217 .saturating_add(new_entry_size as u64);
218 emit!(MemoryEnrichmentTableInserted {
219 key: &new_entry_key,
220 include_key_metric_tag: self.config.internal_metrics.include_key_tag
221 });
222 writer.write_handle.update(new_entry_key, new_entry);
223 }
224
225 if self.config.flush_interval.is_none() {
226 self.flush(writer);
227 }
228 }
229
230 fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
231 let now = Instant::now();
232
233 let mut needs_flush = false;
234 if let Some(reader) = self.get_read_handle().read() {
238 for (k, v) in reader.iter() {
239 if let Some(entry) = v.get_one()
240 && entry.expired(now)
241 {
242 writer.write_handle.empty(k.clone());
245 emit!(MemoryEnrichmentTableTtlExpired {
246 key: k,
247 include_key_metric_tag: self.config.internal_metrics.include_key_tag
248 });
249 needs_flush = true;
250 }
251 }
252 };
253
254 needs_flush
255 }
256
257 fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
258 let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
259 if needs_flush {
260 self.flush(writer);
261 }
262 }
263
264 fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
265 if self
267 .config
268 .source_config
269 .as_ref()
270 .map(|c| c.export_expired_items)
271 .unwrap_or_default()
272 {
273 let pending_removal = writer
274 .write_handle
275 .pending()
276 .iter()
277 .filter_map(|o| match o {
279 evmap::Operation::Empty(k) => Some(k),
280 _ => None,
281 })
282 .filter_map(|key| {
283 writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
284 key: key.to_string(),
285 entry: v.clone(),
286 })
287 })
288 .collect::<Vec<_>>();
289 if let Err(error) = self.expired_items_sender.send(pending_removal) {
290 error!(
291 message = "Error exporting expired items from memory enrichment table.",
292 error = %error,
293 );
294 }
295 }
296
297 writer.write_handle.refresh();
298 if let Some(reader) = self.get_read_handle().read() {
299 let mut byte_size = 0;
300 for (k, v) in reader.iter() {
301 byte_size += k.size_of() + v.get_one().size_of();
302 }
303 writer.metadata.byte_size = byte_size as u64;
304 emit!(MemoryEnrichmentTableFlushed {
305 new_objects_count: reader.len(),
306 new_byte_size: byte_size
307 });
308 }
309 }
310
311 pub(crate) fn as_source(
312 &self,
313 shutdown: ShutdownSignal,
314 out: SourceSender,
315 log_namespace: LogNamespace,
316 ) -> MemorySource {
317 MemorySource {
318 memory: self.clone(),
319 shutdown,
320 out,
321 log_namespace,
322 }
323 }
324}
325
326impl Clone for Memory {
327 fn clone(&self) -> Self {
328 Self {
329 read_handle_factory: self.read_handle_factory.clone(),
330 read_handle: ThreadLocal::new(),
331 write_handle: Arc::clone(&self.write_handle),
332 config: self.config.clone(),
333 expired_items_sender: self.expired_items_sender.clone(),
334 expired_items_receiver: self.expired_items_sender.subscribe(),
335 }
336 }
337}
338
339impl Table for Memory {
340 fn find_table_row<'a>(
341 &self,
342 case: Case,
343 condition: &'a [Condition<'a>],
344 select: Option<&'a [String]>,
345 wildcard: Option<&Value>,
346 index: Option<IndexHandle>,
347 ) -> Result<ObjectMap, Error> {
348 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
349
350 match rows.pop() {
351 Some(row) if rows.is_empty() => Ok(row),
352 Some(_) => Err(Error::MoreThanOneRowFound),
353 None => Err(Error::NoRowsFound),
354 }
355 }
356
357 fn find_table_rows<'a>(
358 &self,
359 _case: Case,
360 condition: &'a [Condition<'a>],
361 _select: Option<&'a [String]>,
362 _wildcard: Option<&Value>,
363 _index: Option<IndexHandle>,
364 ) -> Result<Vec<ObjectMap>, Error> {
365 match condition.first() {
366 Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
367 Some(Condition::Equals { value, .. }) => {
368 let key = value.to_string_lossy();
369 match self.get_read_handle().get_one(key.as_ref()) {
370 Some(row) => {
371 emit!(MemoryEnrichmentTableRead {
372 key: &key,
373 include_key_metric_tag: self.config.internal_metrics.include_key_tag
374 });
375 row.as_object_map(Instant::now(), &key).map(|r| vec![r])
376 }
377 None => {
378 emit!(MemoryEnrichmentTableReadFailed {
379 key: &key,
380 include_key_metric_tag: self.config.internal_metrics.include_key_tag
381 });
382 Ok(Default::default())
383 }
384 }
385 }
386 Some(_) => Err(Error::OnlyEqualityConditionAllowed),
387 None => Err(Error::MissingCondition { kind: "Key" }),
388 }
389 }
390
391 fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
392 match fields.len() {
393 0 => Err(Error::MissingRequiredField { field: "Key" }),
394 1 => Ok(IndexHandle(0)),
395 _ => Err(Error::OnlyOneFieldAllowed),
396 }
397 }
398
399 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
401 Vec::new()
402 }
403
404 fn needs_reload(&self) -> bool {
406 false
407 }
408
409 fn extract_state(&self) -> Option<Box<dyn std::any::Any + Send + Sync>> {
410 let writer = self.write_handle.lock().expect("mutex poisoned");
411 self.flush(writer);
412 Some(Box::new(self.clone()))
413 }
414}
415
416impl std::fmt::Debug for Memory {
417 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 write!(f, "Memory {} row(s)", self.get_read_handle().len())
419 }
420}
421
422#[async_trait]
423impl StreamSink<Event> for Memory {
424 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
425 let events_sent = register!(EventsSent::from(Output(None)));
426 let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
427 let mut flush_interval = IntervalStream::new(interval(
428 self.config
429 .flush_interval
430 .map(Duration::from_secs)
431 .unwrap_or(Duration::MAX),
432 ));
433 let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
434 self.config.scan_interval.into(),
435 )));
436
437 loop {
438 tokio::select! {
439 event = input.next() => {
440 let mut event = if let Some(event) = event {
441 event
442 } else {
443 break;
444 };
445 let event_byte_size = event.estimated_json_encoded_size_of();
446
447 let finalizers = event.take_finalizers();
448
449 let log = event.into_log();
451
452 if let (Value::Object(map), _) = log.into_parts() {
453 self.handle_value(map)
454 };
455
456 finalizers.update_status(EventStatus::Delivered);
457 events_sent.emit(CountByteSize(1, event_byte_size));
458 bytes_sent.emit(ByteSize(event_byte_size.get()));
459 }
460
461 Some(_) = flush_interval.next() => {
462 let writer = self.write_handle.lock().expect("mutex poisoned");
463 self.flush(writer);
464 }
465
466 Some(_) = scan_interval.next() => {
467 let writer = self.write_handle.lock().expect("mutex poisoned");
468 self.scan(writer);
469 }
470 }
471 }
472 Ok(())
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use std::{num::NonZeroU64, slice::from_ref, time::Duration};
479
480 use futures::{StreamExt, future::ready};
481 use futures_util::stream;
482 use tokio::time;
483
484 use vector_lib::{
485 event::{EventContainer, MetricValue},
486 lookup::lookup_v2::OptionalValuePath,
487 metrics::Controller,
488 sink::VectorSink,
489 };
490
491 use super::*;
492 use crate::{
493 config::EnrichmentTableConfig,
494 enrichment_tables::memory::{
495 config::MemorySourceConfig, internal_events::InternalMetricsConfig,
496 },
497 event::{Event, LogEvent},
498 test_util::components::{
499 SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
500 run_and_assert_source_compliance,
501 },
502 };
503
504 fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
505 let mut config = MemoryConfig::default();
506 modfn(&mut config);
507 config
508 }
509
510 #[test]
511 fn finds_row() {
512 let memory = Memory::new(Default::default());
513 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
514
515 let condition = Condition::Equals {
516 field: "key",
517 value: Value::from("test_key"),
518 };
519
520 assert_eq!(
521 Ok(ObjectMap::from([
522 ("key".into(), Value::from("test_key")),
523 ("ttl".into(), Value::from(memory.config.ttl)),
524 ("value".into(), Value::from(5)),
525 ])),
526 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
527 );
528 }
529
530 #[tokio::test]
531 async fn extract_state_preserves_data() {
532 let memory = Memory::new(Default::default());
533 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
534
535 let condition = Condition::Equals {
536 field: "key",
537 value: Value::from("test_key"),
538 };
539
540 let expected = ObjectMap::from([
541 ("key".into(), Value::from("test_key")),
542 ("ttl".into(), Value::from(memory.config.ttl)),
543 ("value".into(), Value::from(5)),
544 ]);
545 assert_eq!(
546 Ok(expected.clone()),
547 memory.find_table_row(
548 Case::Sensitive,
549 std::slice::from_ref(&condition),
550 None,
551 None,
552 None
553 )
554 );
555
556 let new_memory = MemoryConfig::default()
558 .build(&Default::default(), memory.extract_state())
559 .await
560 .unwrap();
561 assert_eq!(
562 Ok(expected),
563 new_memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
564 );
565 }
566
567 #[test]
568 fn calculates_ttl() {
569 let ttl = 100;
570 let secs_to_subtract = 10;
571 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
572 {
573 let mut handle = memory.write_handle.lock().unwrap();
574 handle.write_handle.update(
575 "test_key".to_string(),
576 MemoryEntry {
577 value: "5".to_string(),
578 update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
579 ttl,
580 },
581 );
582 handle.write_handle.refresh();
583 }
584
585 let condition = Condition::Equals {
586 field: "key",
587 value: Value::from("test_key"),
588 };
589
590 assert_eq!(
591 Ok(ObjectMap::from([
592 ("key".into(), Value::from("test_key")),
593 ("ttl".into(), Value::from(ttl - secs_to_subtract)),
594 ("value".into(), Value::from(5)),
595 ])),
596 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
597 );
598 }
599
600 #[test]
601 fn calculates_ttl_override() {
602 let global_ttl = 100;
603 let ttl_override = 10;
604 let memory = Memory::new(build_memory_config(|c| {
605 c.ttl = global_ttl;
606 c.ttl_field = OptionalValuePath::new("ttl");
607 }));
608 memory.handle_value(ObjectMap::from([
609 (
610 "ttl_override".into(),
611 Value::from(ObjectMap::from([
612 ("val".into(), Value::from(5)),
613 ("ttl".into(), Value::from(ttl_override)),
614 ])),
615 ),
616 (
617 "default_ttl".into(),
618 Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
619 ),
620 ]));
621
622 let default_condition = Condition::Equals {
623 field: "key",
624 value: Value::from("default_ttl"),
625 };
626 let override_condition = Condition::Equals {
627 field: "key",
628 value: Value::from("ttl_override"),
629 };
630
631 assert_eq!(
632 Ok(ObjectMap::from([
633 ("key".into(), Value::from("default_ttl")),
634 ("ttl".into(), Value::from(global_ttl)),
635 (
636 "value".into(),
637 Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
638 ),
639 ])),
640 memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
641 );
642 assert_eq!(
643 Ok(ObjectMap::from([
644 ("key".into(), Value::from("ttl_override")),
645 ("ttl".into(), Value::from(ttl_override)),
646 (
647 "value".into(),
648 Value::from(ObjectMap::from([
649 ("val".into(), Value::from(5)),
650 ("ttl".into(), Value::from(ttl_override))
651 ]))
652 ),
653 ])),
654 memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
655 );
656 }
657
658 #[test]
659 fn removes_expired_records_on_scan_interval() {
660 let ttl = 100;
661 let memory = Memory::new(build_memory_config(|c| {
662 c.ttl = ttl;
663 }));
664 {
665 let mut handle = memory.write_handle.lock().unwrap();
666 handle.write_handle.update(
667 "test_key".to_string(),
668 MemoryEntry {
669 value: "5".to_string(),
670 update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
671 ttl,
672 },
673 );
674 handle.write_handle.refresh();
675 }
676
677 let condition = Condition::Equals {
679 field: "key",
680 value: Value::from("test_key"),
681 };
682 assert_eq!(
683 Ok(ObjectMap::from([
684 ("key".into(), Value::from("test_key")),
685 ("ttl".into(), Value::from(0)),
686 ("value".into(), Value::from(5)),
687 ])),
688 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
689 );
690
691 let writer = memory.write_handle.lock().unwrap();
693 memory.scan(writer);
694
695 assert!(
697 memory
698 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
699 .unwrap()
700 .pop()
701 .is_none()
702 );
703 }
704
705 #[test]
706 fn does_not_show_values_before_flush_interval() {
707 let ttl = 100;
708 let memory = Memory::new(build_memory_config(|c| {
709 c.ttl = ttl;
710 c.flush_interval = Some(10);
711 }));
712 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
713
714 let condition = Condition::Equals {
715 field: "key",
716 value: Value::from("test_key"),
717 };
718
719 assert!(
720 memory
721 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
722 .unwrap()
723 .pop()
724 .is_none()
725 );
726 }
727
728 #[test]
729 fn updates_ttl_on_value_replacement() {
730 let ttl = 100;
731 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
732 {
733 let mut handle = memory.write_handle.lock().unwrap();
734 handle.write_handle.update(
735 "test_key".to_string(),
736 MemoryEntry {
737 value: "5".to_string(),
738 update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
739 ttl,
740 },
741 );
742 handle.write_handle.refresh();
743 }
744 let condition = Condition::Equals {
745 field: "key",
746 value: Value::from("test_key"),
747 };
748
749 assert_eq!(
750 Ok(ObjectMap::from([
751 ("key".into(), Value::from("test_key")),
752 ("ttl".into(), Value::from(ttl / 2)),
753 ("value".into(), Value::from(5)),
754 ])),
755 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
756 );
757
758 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
759
760 assert_eq!(
761 Ok(ObjectMap::from([
762 ("key".into(), Value::from("test_key")),
763 ("ttl".into(), Value::from(ttl)),
764 ("value".into(), Value::from(5)),
765 ])),
766 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
767 );
768 }
769
770 #[test]
771 fn ignores_all_values_over_byte_size_limit() {
772 let memory = Memory::new(build_memory_config(|c| {
773 c.max_byte_size = Some(1);
774 }));
775 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
776
777 let condition = Condition::Equals {
778 field: "key",
779 value: Value::from("test_key"),
780 };
781
782 assert!(
783 memory
784 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
785 .unwrap()
786 .pop()
787 .is_none()
788 );
789 }
790
791 #[test]
792 fn ignores_values_when_byte_size_limit_is_reached() {
793 let ttl = 100;
794 let memory = Memory::new(build_memory_config(|c| {
795 c.ttl = ttl;
796 c.max_byte_size = Some(150);
797 }));
798 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
799 memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
800
801 assert_eq!(
802 Ok(ObjectMap::from([
803 ("key".into(), Value::from("test_key")),
804 ("ttl".into(), Value::from(ttl)),
805 ("value".into(), Value::from(5)),
806 ])),
807 memory.find_table_row(
808 Case::Sensitive,
809 &[Condition::Equals {
810 field: "key",
811 value: Value::from("test_key")
812 }],
813 None,
814 None,
815 None
816 )
817 );
818
819 assert!(
820 memory
821 .find_table_rows(
822 Case::Sensitive,
823 &[Condition::Equals {
824 field: "key",
825 value: Value::from("rejected_key")
826 }],
827 None,
828 None,
829 None
830 )
831 .unwrap()
832 .pop()
833 .is_none()
834 );
835 }
836
837 #[test]
838 fn missing_key() {
839 let memory = Memory::new(Default::default());
840
841 let condition = Condition::Equals {
842 field: "key",
843 value: Value::from("test_key"),
844 };
845
846 assert!(
847 memory
848 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
849 .unwrap()
850 .pop()
851 .is_none()
852 );
853 }
854
855 #[tokio::test]
856 async fn sink_spec_compliance() {
857 let event = Event::Log(LogEvent::from(ObjectMap::from([(
858 "test_key".into(),
859 Value::from(5),
860 )])));
861
862 let memory = Memory::new(Default::default());
863
864 run_and_assert_sink_compliance(
865 VectorSink::from_event_streamsink(memory),
866 stream::once(ready(event)),
867 &SINK_TAGS,
868 )
869 .await;
870 }
871
872 #[tokio::test]
873 async fn flush_metrics_without_interval() {
874 let event = Event::Log(LogEvent::from(ObjectMap::from([(
875 "test_key".into(),
876 Value::from(5),
877 )])));
878
879 let memory = Memory::new(Default::default());
880
881 run_and_assert_sink_compliance(
882 VectorSink::from_event_streamsink(memory),
883 stream::once(ready(event)),
884 &SINK_TAGS,
885 )
886 .await;
887
888 let metrics = Controller::get().unwrap().capture_metrics();
889 let insertions_counter = metrics
890 .iter()
891 .find(|m| {
892 matches!(m.value(), MetricValue::Counter { .. })
893 && m.name() == "memory_enrichment_table_insertions_total"
894 })
895 .expect("Insertions metric is missing!");
896 let MetricValue::Counter {
897 value: insertions_count,
898 } = insertions_counter.value()
899 else {
900 unreachable!();
901 };
902 let flushes_counter = metrics
903 .iter()
904 .find(|m| {
905 matches!(m.value(), MetricValue::Counter { .. })
906 && m.name() == "memory_enrichment_table_flushes_total"
907 })
908 .expect("Flushes metric is missing!");
909 let MetricValue::Counter {
910 value: flushes_count,
911 } = flushes_counter.value()
912 else {
913 unreachable!();
914 };
915 let object_count_gauge = metrics
916 .iter()
917 .find(|m| {
918 matches!(m.value(), MetricValue::Gauge { .. })
919 && m.name() == "memory_enrichment_table_objects_count"
920 })
921 .expect("Object count metric is missing!");
922 let MetricValue::Gauge {
923 value: object_count,
924 } = object_count_gauge.value()
925 else {
926 unreachable!();
927 };
928 let byte_size_gauge = metrics
929 .iter()
930 .find(|m| {
931 matches!(m.value(), MetricValue::Gauge { .. })
932 && m.name() == "memory_enrichment_table_byte_size"
933 })
934 .expect("Byte size metric is missing!");
935 assert_eq!(*insertions_count, 1.0);
936 assert_eq!(*flushes_count, 1.0);
937 assert_eq!(*object_count, 1.0);
938 assert!(!byte_size_gauge.is_empty());
939 }
940
941 #[tokio::test]
942 async fn flush_metrics_with_interval() {
943 let event = Event::Log(LogEvent::from(ObjectMap::from([(
944 "test_key".into(),
945 Value::from(5),
946 )])));
947
948 let memory = Memory::new(build_memory_config(|c| {
949 c.flush_interval = Some(1);
950 }));
951
952 run_and_assert_sink_compliance(
953 VectorSink::from_event_streamsink(memory),
954 stream::iter(vec![event.clone(), event]).flat_map(|e| {
955 stream::once(async move {
956 tokio::time::sleep(Duration::from_millis(600)).await;
957 e
958 })
959 }),
960 &SINK_TAGS,
961 )
962 .await;
963
964 let metrics = Controller::get().unwrap().capture_metrics();
965 let insertions_counter = metrics
966 .iter()
967 .find(|m| {
968 matches!(m.value(), MetricValue::Counter { .. })
969 && m.name() == "memory_enrichment_table_insertions_total"
970 })
971 .expect("Insertions metric is missing!");
972 let MetricValue::Counter {
973 value: insertions_count,
974 } = insertions_counter.value()
975 else {
976 unreachable!();
977 };
978 let flushes_counter = metrics
979 .iter()
980 .find(|m| {
981 matches!(m.value(), MetricValue::Counter { .. })
982 && m.name() == "memory_enrichment_table_flushes_total"
983 })
984 .expect("Flushes metric is missing!");
985 let MetricValue::Counter {
986 value: flushes_count,
987 } = flushes_counter.value()
988 else {
989 unreachable!();
990 };
991 let object_count_gauge = metrics
992 .iter()
993 .find(|m| {
994 matches!(m.value(), MetricValue::Gauge { .. })
995 && m.name() == "memory_enrichment_table_objects_count"
996 })
997 .expect("Object count metric is missing!");
998 let MetricValue::Gauge {
999 value: object_count,
1000 } = object_count_gauge.value()
1001 else {
1002 unreachable!();
1003 };
1004 let byte_size_gauge = metrics
1005 .iter()
1006 .find(|m| {
1007 matches!(m.value(), MetricValue::Gauge { .. })
1008 && m.name() == "memory_enrichment_table_byte_size"
1009 })
1010 .expect("Byte size metric is missing!");
1011
1012 assert_eq!(*insertions_count, 2.0);
1013 assert_eq!(*flushes_count, 2.0);
1015 assert_eq!(*object_count, 1.0);
1016 assert!(!byte_size_gauge.is_empty());
1017 }
1018
1019 #[tokio::test]
1020 async fn flush_metrics_with_key() {
1021 let event = Event::Log(LogEvent::from(ObjectMap::from([(
1022 "test_key".into(),
1023 Value::from(5),
1024 )])));
1025
1026 let memory = Memory::new(build_memory_config(|c| {
1027 c.internal_metrics = InternalMetricsConfig {
1028 include_key_tag: true,
1029 };
1030 }));
1031
1032 run_and_assert_sink_compliance(
1033 VectorSink::from_event_streamsink(memory),
1034 stream::once(ready(event)),
1035 &SINK_TAGS,
1036 )
1037 .await;
1038
1039 let metrics = Controller::get().unwrap().capture_metrics();
1040 let insertions_counter = metrics
1041 .iter()
1042 .find(|m| {
1043 matches!(m.value(), MetricValue::Counter { .. })
1044 && m.name() == "memory_enrichment_table_insertions_total"
1045 })
1046 .expect("Insertions metric is missing!");
1047
1048 assert!(insertions_counter.tag_matches("key", "test_key"));
1049 }
1050
1051 #[tokio::test]
1052 async fn flush_metrics_without_key() {
1053 let event = Event::Log(LogEvent::from(ObjectMap::from([(
1054 "test_key".into(),
1055 Value::from(5),
1056 )])));
1057
1058 let memory = Memory::new(Default::default());
1059
1060 run_and_assert_sink_compliance(
1061 VectorSink::from_event_streamsink(memory),
1062 stream::once(ready(event)),
1063 &SINK_TAGS,
1064 )
1065 .await;
1066
1067 let metrics = Controller::get().unwrap().capture_metrics();
1068 let insertions_counter = metrics
1069 .iter()
1070 .find(|m| {
1071 matches!(m.value(), MetricValue::Counter { .. })
1072 && m.name() == "memory_enrichment_table_insertions_total"
1073 })
1074 .expect("Insertions metric is missing!");
1075
1076 assert!(insertions_counter.tag_value("key").is_none());
1077 }
1078
1079 #[tokio::test]
1080 async fn source_spec_compliance() {
1081 let mut memory_config = MemoryConfig::default();
1082 memory_config.source_config = Some(MemorySourceConfig {
1083 export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1084 export_batch_size: None,
1085 remove_after_export: false,
1086 export_expired_items: false,
1087 source_key: "test".to_string(),
1088 });
1089 let memory = memory_config.get_or_build_memory(None).await;
1090 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1091
1092 let mut events: Vec<Event> = run_and_assert_source_compliance(
1093 memory_config,
1094 time::Duration::from_secs(5),
1095 &SOURCE_TAGS,
1096 )
1097 .await;
1098
1099 assert!(!events.is_empty());
1100 let event = events.remove(0);
1101 let log = event.as_log();
1102
1103 assert!(!log.value().is_empty());
1104 }
1105}