Skip to main content

vector/enrichment_tables/memory/
table.rs

1#![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly.
2
3use std::{
4    sync::{Arc, Mutex, MutexGuard},
5    time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11    shallow_copy::CopyValue,
12    {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18    sync::broadcast::{Receiver, Sender},
19    time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23    ByteSizeOf, EstimatedJsonEncodedSizeOf,
24    config::LogNamespace,
25    enrichment::{Case, Condition, Error, IndexHandle, InternalError, Table},
26    event::{Event, EventStatus, Finalizable},
27    internal_event::{
28        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29    },
30    shutdown::ShutdownSignal,
31    sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37    SourceSender,
38    enrichment_tables::memory::{
39        MemoryConfig,
40        internal_events::{
41            MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42            MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43            MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44        },
45    },
46};
47
48/// Single memory entry containing the value and TTL
49#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51    value: String,
52    update_time: CopyValue<Instant>,
53    ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57    fn allocated_bytes(&self) -> usize {
58        self.value.size_of()
59    }
60}
61
62impl MemoryEntry {
63    pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, Error> {
64        let ttl = self
65            .ttl
66            .saturating_sub(now.duration_since(*self.update_time).as_secs());
67        Ok(ObjectMap::from([
68            (
69                KeyString::from("key"),
70                Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71            ),
72            (
73                KeyString::from("value"),
74                // Unreachable in normal operation: `value` was serialized by `handle_value`.
75                serde_json::from_str::<Value>(&self.value).map_err(|source| Error::Internal {
76                    source: InternalError::FailedToDecode {
77                        details: source.to_string(),
78                    },
79                })?,
80            ),
81            (
82                KeyString::from("ttl"),
83                Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
84            ),
85        ]))
86    }
87
88    fn expired(&self, now: Instant) -> bool {
89        now.duration_since(*self.update_time).as_secs() > self.ttl
90    }
91}
92
93#[derive(Default)]
94struct MemoryMetadata {
95    byte_size: u64,
96}
97
98/// [`MemoryEntry`] combined with its key
99#[derive(Clone)]
100pub(super) struct MemoryEntryPair {
101    /// Key of this entry
102    pub(super) key: String,
103    /// The value of this entry
104    pub(super) entry: MemoryEntry,
105}
106
107// Used to ensure that these 2 are locked together
108pub(super) struct MemoryWriter {
109    pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
110    metadata: MemoryMetadata,
111}
112
113/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
114pub struct Memory {
115    read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
116    read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
117    pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
118    pub(super) config: MemoryConfig,
119    #[allow(dead_code)]
120    expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
121    expired_items_sender: Sender<Vec<MemoryEntryPair>>,
122}
123
124impl Memory {
125    /// Creates a new [Memory] based on the provided config.
126    pub fn new(config: MemoryConfig) -> Self {
127        let (read_handle, write_handle) = evmap::new();
128        // Buffer could only be used if source is stuck exporting available items, but in that case,
129        // publishing will not happen either, because the lock would be held, so this buffer is not
130        // that important
131        let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
132        Self {
133            config,
134            read_handle_factory: read_handle.factory(),
135            read_handle: ThreadLocal::new(),
136            write_handle: Arc::new(Mutex::new(MemoryWriter {
137                write_handle,
138                metadata: MemoryMetadata::default(),
139            })),
140            expired_items_sender: expired_tx,
141            expired_items_receiver: expired_rx,
142        }
143    }
144
145    /// Creates a new [Memory] based on the provided config and previous state.
146    pub fn from_previous_state(
147        config: MemoryConfig,
148        prev_state: Box<dyn std::any::Any + Send + Sync>,
149    ) -> Self {
150        if let Ok(prev_memory) = prev_state.downcast::<Memory>() {
151            Self {
152                config,
153                read_handle_factory: prev_memory.read_handle_factory,
154                read_handle: prev_memory.read_handle,
155                write_handle: prev_memory.write_handle,
156                expired_items_sender: prev_memory.expired_items_sender,
157                expired_items_receiver: prev_memory.expired_items_receiver,
158            }
159        } else {
160            Self::new(config)
161        }
162    }
163
164    pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
165        self.read_handle
166            .get_or(|| self.read_handle_factory.handle())
167    }
168
169    pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
170        self.expired_items_sender.subscribe()
171    }
172
173    fn handle_value(&self, value: ObjectMap) {
174        let mut writer = self.write_handle.lock().expect("mutex poisoned");
175        let now = Instant::now();
176
177        for (k, value) in value.into_iter() {
178            let new_entry_key = String::from(k);
179            let Ok(v) = serde_json::to_string(&value) else {
180                emit!(MemoryEnrichmentTableInsertFailed {
181                    key: &new_entry_key,
182                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
183                });
184                continue;
185            };
186            let new_entry = MemoryEntry {
187                value: v,
188                update_time: now.into(),
189                ttl: self
190                    .config
191                    .ttl_field
192                    .path
193                    .as_ref()
194                    .and_then(|p| value.get(p))
195                    .and_then(|v| v.as_integer())
196                    .map(|v| v as u64)
197                    .unwrap_or(self.config.ttl),
198            };
199            let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
200            if let Some(max_byte_size) = self.config.max_byte_size
201                && writer
202                    .metadata
203                    .byte_size
204                    .saturating_add(new_entry_size as u64)
205                    > max_byte_size
206            {
207                // Reject new entries
208                emit!(MemoryEnrichmentTableInsertFailed {
209                    key: &new_entry_key,
210                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
211                });
212                continue;
213            }
214            writer.metadata.byte_size = writer
215                .metadata
216                .byte_size
217                .saturating_add(new_entry_size as u64);
218            emit!(MemoryEnrichmentTableInserted {
219                key: &new_entry_key,
220                include_key_metric_tag: self.config.internal_metrics.include_key_tag
221            });
222            writer.write_handle.update(new_entry_key, new_entry);
223        }
224
225        if self.config.flush_interval.is_none() {
226            self.flush(writer);
227        }
228    }
229
230    fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
231        let now = Instant::now();
232
233        let mut needs_flush = false;
234        // Since evmap holds 2 separate maps for the data, we are free to directly remove
235        // elements via the writer, while we are iterating the reader
236        // Refresh will happen only after we manually invoke it after iteration
237        if let Some(reader) = self.get_read_handle().read() {
238            for (k, v) in reader.iter() {
239                if let Some(entry) = v.get_one()
240                    && entry.expired(now)
241                {
242                    // Byte size is not reduced at this point, because the actual deletion
243                    // will only happen at refresh time
244                    writer.write_handle.empty(k.clone());
245                    emit!(MemoryEnrichmentTableTtlExpired {
246                        key: k,
247                        include_key_metric_tag: self.config.internal_metrics.include_key_tag
248                    });
249                    needs_flush = true;
250                }
251            }
252        };
253
254        needs_flush
255    }
256
257    fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
258        let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
259        if needs_flush {
260            self.flush(writer);
261        }
262    }
263
264    fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
265        // First publish items to be removed, if needed
266        if self
267            .config
268            .source_config
269            .as_ref()
270            .map(|c| c.export_expired_items)
271            .unwrap_or_default()
272        {
273            let pending_removal = writer
274                .write_handle
275                .pending()
276                .iter()
277                // We only use empty operation to remove keys
278                .filter_map(|o| match o {
279                    evmap::Operation::Empty(k) => Some(k),
280                    _ => None,
281                })
282                .filter_map(|key| {
283                    writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
284                        key: key.to_string(),
285                        entry: v.clone(),
286                    })
287                })
288                .collect::<Vec<_>>();
289            if let Err(error) = self.expired_items_sender.send(pending_removal) {
290                error!(
291                    message = "Error exporting expired items from memory enrichment table.",
292                    error = %error,
293                );
294            }
295        }
296
297        writer.write_handle.refresh();
298        if let Some(reader) = self.get_read_handle().read() {
299            let mut byte_size = 0;
300            for (k, v) in reader.iter() {
301                byte_size += k.size_of() + v.get_one().size_of();
302            }
303            writer.metadata.byte_size = byte_size as u64;
304            emit!(MemoryEnrichmentTableFlushed {
305                new_objects_count: reader.len(),
306                new_byte_size: byte_size
307            });
308        }
309    }
310
311    pub(crate) fn as_source(
312        &self,
313        shutdown: ShutdownSignal,
314        out: SourceSender,
315        log_namespace: LogNamespace,
316    ) -> MemorySource {
317        MemorySource {
318            memory: self.clone(),
319            shutdown,
320            out,
321            log_namespace,
322        }
323    }
324}
325
326impl Clone for Memory {
327    fn clone(&self) -> Self {
328        Self {
329            read_handle_factory: self.read_handle_factory.clone(),
330            read_handle: ThreadLocal::new(),
331            write_handle: Arc::clone(&self.write_handle),
332            config: self.config.clone(),
333            expired_items_sender: self.expired_items_sender.clone(),
334            expired_items_receiver: self.expired_items_sender.subscribe(),
335        }
336    }
337}
338
339impl Table for Memory {
340    fn find_table_row<'a>(
341        &self,
342        case: Case,
343        condition: &'a [Condition<'a>],
344        select: Option<&'a [String]>,
345        wildcard: Option<&Value>,
346        index: Option<IndexHandle>,
347    ) -> Result<ObjectMap, Error> {
348        let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
349
350        match rows.pop() {
351            Some(row) if rows.is_empty() => Ok(row),
352            Some(_) => Err(Error::MoreThanOneRowFound),
353            None => Err(Error::NoRowsFound),
354        }
355    }
356
357    fn find_table_rows<'a>(
358        &self,
359        _case: Case,
360        condition: &'a [Condition<'a>],
361        _select: Option<&'a [String]>,
362        _wildcard: Option<&Value>,
363        _index: Option<IndexHandle>,
364    ) -> Result<Vec<ObjectMap>, Error> {
365        match condition.first() {
366            Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
367            Some(Condition::Equals { value, .. }) => {
368                let key = value.to_string_lossy();
369                match self.get_read_handle().get_one(key.as_ref()) {
370                    Some(row) => {
371                        emit!(MemoryEnrichmentTableRead {
372                            key: &key,
373                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
374                        });
375                        row.as_object_map(Instant::now(), &key).map(|r| vec![r])
376                    }
377                    None => {
378                        emit!(MemoryEnrichmentTableReadFailed {
379                            key: &key,
380                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
381                        });
382                        Ok(Default::default())
383                    }
384                }
385            }
386            Some(_) => Err(Error::OnlyEqualityConditionAllowed),
387            None => Err(Error::MissingCondition { kind: "Key" }),
388        }
389    }
390
391    fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
392        match fields.len() {
393            0 => Err(Error::MissingRequiredField { field: "Key" }),
394            1 => Ok(IndexHandle(0)),
395            _ => Err(Error::OnlyOneFieldAllowed),
396        }
397    }
398
399    /// Returns a list of the field names that are in each index
400    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
401        Vec::new()
402    }
403
404    /// Doesn't need reload, data is written directly
405    fn needs_reload(&self) -> bool {
406        false
407    }
408
409    fn extract_state(&self) -> Option<Box<dyn std::any::Any + Send + Sync>> {
410        let writer = self.write_handle.lock().expect("mutex poisoned");
411        self.flush(writer);
412        Some(Box::new(self.clone()))
413    }
414}
415
416impl std::fmt::Debug for Memory {
417    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418        write!(f, "Memory {} row(s)", self.get_read_handle().len())
419    }
420}
421
422#[async_trait]
423impl StreamSink<Event> for Memory {
424    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
425        let events_sent = register!(EventsSent::from(Output(None)));
426        let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
427        let mut flush_interval = IntervalStream::new(interval(
428            self.config
429                .flush_interval
430                .map(Duration::from_secs)
431                .unwrap_or(Duration::MAX),
432        ));
433        let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
434            self.config.scan_interval.into(),
435        )));
436
437        loop {
438            tokio::select! {
439                event = input.next() => {
440                    let mut event = if let Some(event) = event {
441                        event
442                    } else {
443                        break;
444                    };
445                    let event_byte_size = event.estimated_json_encoded_size_of();
446
447                    let finalizers = event.take_finalizers();
448
449                    // Panic: This sink only accepts Logs, so this should never panic
450                    let log = event.into_log();
451
452                    if let (Value::Object(map), _) = log.into_parts() {
453                        self.handle_value(map)
454                    };
455
456                    finalizers.update_status(EventStatus::Delivered);
457                    events_sent.emit(CountByteSize(1, event_byte_size));
458                    bytes_sent.emit(ByteSize(event_byte_size.get()));
459                }
460
461                Some(_) = flush_interval.next() => {
462                    let writer = self.write_handle.lock().expect("mutex poisoned");
463                    self.flush(writer);
464                }
465
466                Some(_) = scan_interval.next() => {
467                    let writer = self.write_handle.lock().expect("mutex poisoned");
468                    self.scan(writer);
469                }
470            }
471        }
472        Ok(())
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use std::{num::NonZeroU64, slice::from_ref, time::Duration};
479
480    use futures::{StreamExt, future::ready};
481    use futures_util::stream;
482    use tokio::time;
483
484    use vector_lib::{
485        event::{EventContainer, MetricValue},
486        lookup::lookup_v2::OptionalValuePath,
487        metrics::Controller,
488        sink::VectorSink,
489    };
490
491    use super::*;
492    use crate::{
493        config::EnrichmentTableConfig,
494        enrichment_tables::memory::{
495            config::MemorySourceConfig, internal_events::InternalMetricsConfig,
496        },
497        event::{Event, LogEvent},
498        test_util::components::{
499            SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
500            run_and_assert_source_compliance,
501        },
502    };
503
504    fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
505        let mut config = MemoryConfig::default();
506        modfn(&mut config);
507        config
508    }
509
510    #[test]
511    fn finds_row() {
512        let memory = Memory::new(Default::default());
513        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
514
515        let condition = Condition::Equals {
516            field: "key",
517            value: Value::from("test_key"),
518        };
519
520        assert_eq!(
521            Ok(ObjectMap::from([
522                ("key".into(), Value::from("test_key")),
523                ("ttl".into(), Value::from(memory.config.ttl)),
524                ("value".into(), Value::from(5)),
525            ])),
526            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
527        );
528    }
529
530    #[tokio::test]
531    async fn extract_state_preserves_data() {
532        let memory = Memory::new(Default::default());
533        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
534
535        let condition = Condition::Equals {
536            field: "key",
537            value: Value::from("test_key"),
538        };
539
540        let expected = ObjectMap::from([
541            ("key".into(), Value::from("test_key")),
542            ("ttl".into(), Value::from(memory.config.ttl)),
543            ("value".into(), Value::from(5)),
544        ]);
545        assert_eq!(
546            Ok(expected.clone()),
547            memory.find_table_row(
548                Case::Sensitive,
549                std::slice::from_ref(&condition),
550                None,
551                None,
552                None
553            )
554        );
555
556        // Now build a new table using old state
557        let new_memory = MemoryConfig::default()
558            .build(&Default::default(), memory.extract_state())
559            .await
560            .unwrap();
561        assert_eq!(
562            Ok(expected),
563            new_memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
564        );
565    }
566
567    #[test]
568    fn calculates_ttl() {
569        let ttl = 100;
570        let secs_to_subtract = 10;
571        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
572        {
573            let mut handle = memory.write_handle.lock().unwrap();
574            handle.write_handle.update(
575                "test_key".to_string(),
576                MemoryEntry {
577                    value: "5".to_string(),
578                    update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
579                    ttl,
580                },
581            );
582            handle.write_handle.refresh();
583        }
584
585        let condition = Condition::Equals {
586            field: "key",
587            value: Value::from("test_key"),
588        };
589
590        assert_eq!(
591            Ok(ObjectMap::from([
592                ("key".into(), Value::from("test_key")),
593                ("ttl".into(), Value::from(ttl - secs_to_subtract)),
594                ("value".into(), Value::from(5)),
595            ])),
596            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
597        );
598    }
599
600    #[test]
601    fn calculates_ttl_override() {
602        let global_ttl = 100;
603        let ttl_override = 10;
604        let memory = Memory::new(build_memory_config(|c| {
605            c.ttl = global_ttl;
606            c.ttl_field = OptionalValuePath::new("ttl");
607        }));
608        memory.handle_value(ObjectMap::from([
609            (
610                "ttl_override".into(),
611                Value::from(ObjectMap::from([
612                    ("val".into(), Value::from(5)),
613                    ("ttl".into(), Value::from(ttl_override)),
614                ])),
615            ),
616            (
617                "default_ttl".into(),
618                Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
619            ),
620        ]));
621
622        let default_condition = Condition::Equals {
623            field: "key",
624            value: Value::from("default_ttl"),
625        };
626        let override_condition = Condition::Equals {
627            field: "key",
628            value: Value::from("ttl_override"),
629        };
630
631        assert_eq!(
632            Ok(ObjectMap::from([
633                ("key".into(), Value::from("default_ttl")),
634                ("ttl".into(), Value::from(global_ttl)),
635                (
636                    "value".into(),
637                    Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
638                ),
639            ])),
640            memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
641        );
642        assert_eq!(
643            Ok(ObjectMap::from([
644                ("key".into(), Value::from("ttl_override")),
645                ("ttl".into(), Value::from(ttl_override)),
646                (
647                    "value".into(),
648                    Value::from(ObjectMap::from([
649                        ("val".into(), Value::from(5)),
650                        ("ttl".into(), Value::from(ttl_override))
651                    ]))
652                ),
653            ])),
654            memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
655        );
656    }
657
658    #[test]
659    fn removes_expired_records_on_scan_interval() {
660        let ttl = 100;
661        let memory = Memory::new(build_memory_config(|c| {
662            c.ttl = ttl;
663        }));
664        {
665            let mut handle = memory.write_handle.lock().unwrap();
666            handle.write_handle.update(
667                "test_key".to_string(),
668                MemoryEntry {
669                    value: "5".to_string(),
670                    update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
671                    ttl,
672                },
673            );
674            handle.write_handle.refresh();
675        }
676
677        // Finds the value before scan
678        let condition = Condition::Equals {
679            field: "key",
680            value: Value::from("test_key"),
681        };
682        assert_eq!(
683            Ok(ObjectMap::from([
684                ("key".into(), Value::from("test_key")),
685                ("ttl".into(), Value::from(0)),
686                ("value".into(), Value::from(5)),
687            ])),
688            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
689        );
690
691        // Force scan
692        let writer = memory.write_handle.lock().unwrap();
693        memory.scan(writer);
694
695        // The value is not present anymore
696        assert!(
697            memory
698                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
699                .unwrap()
700                .pop()
701                .is_none()
702        );
703    }
704
705    #[test]
706    fn does_not_show_values_before_flush_interval() {
707        let ttl = 100;
708        let memory = Memory::new(build_memory_config(|c| {
709            c.ttl = ttl;
710            c.flush_interval = Some(10);
711        }));
712        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
713
714        let condition = Condition::Equals {
715            field: "key",
716            value: Value::from("test_key"),
717        };
718
719        assert!(
720            memory
721                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
722                .unwrap()
723                .pop()
724                .is_none()
725        );
726    }
727
728    #[test]
729    fn updates_ttl_on_value_replacement() {
730        let ttl = 100;
731        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
732        {
733            let mut handle = memory.write_handle.lock().unwrap();
734            handle.write_handle.update(
735                "test_key".to_string(),
736                MemoryEntry {
737                    value: "5".to_string(),
738                    update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
739                    ttl,
740                },
741            );
742            handle.write_handle.refresh();
743        }
744        let condition = Condition::Equals {
745            field: "key",
746            value: Value::from("test_key"),
747        };
748
749        assert_eq!(
750            Ok(ObjectMap::from([
751                ("key".into(), Value::from("test_key")),
752                ("ttl".into(), Value::from(ttl / 2)),
753                ("value".into(), Value::from(5)),
754            ])),
755            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
756        );
757
758        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
759
760        assert_eq!(
761            Ok(ObjectMap::from([
762                ("key".into(), Value::from("test_key")),
763                ("ttl".into(), Value::from(ttl)),
764                ("value".into(), Value::from(5)),
765            ])),
766            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
767        );
768    }
769
770    #[test]
771    fn ignores_all_values_over_byte_size_limit() {
772        let memory = Memory::new(build_memory_config(|c| {
773            c.max_byte_size = Some(1);
774        }));
775        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
776
777        let condition = Condition::Equals {
778            field: "key",
779            value: Value::from("test_key"),
780        };
781
782        assert!(
783            memory
784                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
785                .unwrap()
786                .pop()
787                .is_none()
788        );
789    }
790
791    #[test]
792    fn ignores_values_when_byte_size_limit_is_reached() {
793        let ttl = 100;
794        let memory = Memory::new(build_memory_config(|c| {
795            c.ttl = ttl;
796            c.max_byte_size = Some(150);
797        }));
798        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
799        memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
800
801        assert_eq!(
802            Ok(ObjectMap::from([
803                ("key".into(), Value::from("test_key")),
804                ("ttl".into(), Value::from(ttl)),
805                ("value".into(), Value::from(5)),
806            ])),
807            memory.find_table_row(
808                Case::Sensitive,
809                &[Condition::Equals {
810                    field: "key",
811                    value: Value::from("test_key")
812                }],
813                None,
814                None,
815                None
816            )
817        );
818
819        assert!(
820            memory
821                .find_table_rows(
822                    Case::Sensitive,
823                    &[Condition::Equals {
824                        field: "key",
825                        value: Value::from("rejected_key")
826                    }],
827                    None,
828                    None,
829                    None
830                )
831                .unwrap()
832                .pop()
833                .is_none()
834        );
835    }
836
837    #[test]
838    fn missing_key() {
839        let memory = Memory::new(Default::default());
840
841        let condition = Condition::Equals {
842            field: "key",
843            value: Value::from("test_key"),
844        };
845
846        assert!(
847            memory
848                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
849                .unwrap()
850                .pop()
851                .is_none()
852        );
853    }
854
855    #[tokio::test]
856    async fn sink_spec_compliance() {
857        let event = Event::Log(LogEvent::from(ObjectMap::from([(
858            "test_key".into(),
859            Value::from(5),
860        )])));
861
862        let memory = Memory::new(Default::default());
863
864        run_and_assert_sink_compliance(
865            VectorSink::from_event_streamsink(memory),
866            stream::once(ready(event)),
867            &SINK_TAGS,
868        )
869        .await;
870    }
871
872    #[tokio::test]
873    async fn flush_metrics_without_interval() {
874        let event = Event::Log(LogEvent::from(ObjectMap::from([(
875            "test_key".into(),
876            Value::from(5),
877        )])));
878
879        let memory = Memory::new(Default::default());
880
881        run_and_assert_sink_compliance(
882            VectorSink::from_event_streamsink(memory),
883            stream::once(ready(event)),
884            &SINK_TAGS,
885        )
886        .await;
887
888        let metrics = Controller::get().unwrap().capture_metrics();
889        let insertions_counter = metrics
890            .iter()
891            .find(|m| {
892                matches!(m.value(), MetricValue::Counter { .. })
893                    && m.name() == "memory_enrichment_table_insertions_total"
894            })
895            .expect("Insertions metric is missing!");
896        let MetricValue::Counter {
897            value: insertions_count,
898        } = insertions_counter.value()
899        else {
900            unreachable!();
901        };
902        let flushes_counter = metrics
903            .iter()
904            .find(|m| {
905                matches!(m.value(), MetricValue::Counter { .. })
906                    && m.name() == "memory_enrichment_table_flushes_total"
907            })
908            .expect("Flushes metric is missing!");
909        let MetricValue::Counter {
910            value: flushes_count,
911        } = flushes_counter.value()
912        else {
913            unreachable!();
914        };
915        let object_count_gauge = metrics
916            .iter()
917            .find(|m| {
918                matches!(m.value(), MetricValue::Gauge { .. })
919                    && m.name() == "memory_enrichment_table_objects_count"
920            })
921            .expect("Object count metric is missing!");
922        let MetricValue::Gauge {
923            value: object_count,
924        } = object_count_gauge.value()
925        else {
926            unreachable!();
927        };
928        let byte_size_gauge = metrics
929            .iter()
930            .find(|m| {
931                matches!(m.value(), MetricValue::Gauge { .. })
932                    && m.name() == "memory_enrichment_table_byte_size"
933            })
934            .expect("Byte size metric is missing!");
935        assert_eq!(*insertions_count, 1.0);
936        assert_eq!(*flushes_count, 1.0);
937        assert_eq!(*object_count, 1.0);
938        assert!(!byte_size_gauge.is_empty());
939    }
940
941    #[tokio::test]
942    async fn flush_metrics_with_interval() {
943        let event = Event::Log(LogEvent::from(ObjectMap::from([(
944            "test_key".into(),
945            Value::from(5),
946        )])));
947
948        let memory = Memory::new(build_memory_config(|c| {
949            c.flush_interval = Some(1);
950        }));
951
952        run_and_assert_sink_compliance(
953            VectorSink::from_event_streamsink(memory),
954            stream::iter(vec![event.clone(), event]).flat_map(|e| {
955                stream::once(async move {
956                    tokio::time::sleep(Duration::from_millis(600)).await;
957                    e
958                })
959            }),
960            &SINK_TAGS,
961        )
962        .await;
963
964        let metrics = Controller::get().unwrap().capture_metrics();
965        let insertions_counter = metrics
966            .iter()
967            .find(|m| {
968                matches!(m.value(), MetricValue::Counter { .. })
969                    && m.name() == "memory_enrichment_table_insertions_total"
970            })
971            .expect("Insertions metric is missing!");
972        let MetricValue::Counter {
973            value: insertions_count,
974        } = insertions_counter.value()
975        else {
976            unreachable!();
977        };
978        let flushes_counter = metrics
979            .iter()
980            .find(|m| {
981                matches!(m.value(), MetricValue::Counter { .. })
982                    && m.name() == "memory_enrichment_table_flushes_total"
983            })
984            .expect("Flushes metric is missing!");
985        let MetricValue::Counter {
986            value: flushes_count,
987        } = flushes_counter.value()
988        else {
989            unreachable!();
990        };
991        let object_count_gauge = metrics
992            .iter()
993            .find(|m| {
994                matches!(m.value(), MetricValue::Gauge { .. })
995                    && m.name() == "memory_enrichment_table_objects_count"
996            })
997            .expect("Object count metric is missing!");
998        let MetricValue::Gauge {
999            value: object_count,
1000        } = object_count_gauge.value()
1001        else {
1002            unreachable!();
1003        };
1004        let byte_size_gauge = metrics
1005            .iter()
1006            .find(|m| {
1007                matches!(m.value(), MetricValue::Gauge { .. })
1008                    && m.name() == "memory_enrichment_table_byte_size"
1009            })
1010            .expect("Byte size metric is missing!");
1011
1012        assert_eq!(*insertions_count, 2.0);
1013        // One is done right away and the next one after the interval
1014        assert_eq!(*flushes_count, 2.0);
1015        assert_eq!(*object_count, 1.0);
1016        assert!(!byte_size_gauge.is_empty());
1017    }
1018
1019    #[tokio::test]
1020    async fn flush_metrics_with_key() {
1021        let event = Event::Log(LogEvent::from(ObjectMap::from([(
1022            "test_key".into(),
1023            Value::from(5),
1024        )])));
1025
1026        let memory = Memory::new(build_memory_config(|c| {
1027            c.internal_metrics = InternalMetricsConfig {
1028                include_key_tag: true,
1029            };
1030        }));
1031
1032        run_and_assert_sink_compliance(
1033            VectorSink::from_event_streamsink(memory),
1034            stream::once(ready(event)),
1035            &SINK_TAGS,
1036        )
1037        .await;
1038
1039        let metrics = Controller::get().unwrap().capture_metrics();
1040        let insertions_counter = metrics
1041            .iter()
1042            .find(|m| {
1043                matches!(m.value(), MetricValue::Counter { .. })
1044                    && m.name() == "memory_enrichment_table_insertions_total"
1045            })
1046            .expect("Insertions metric is missing!");
1047
1048        assert!(insertions_counter.tag_matches("key", "test_key"));
1049    }
1050
1051    #[tokio::test]
1052    async fn flush_metrics_without_key() {
1053        let event = Event::Log(LogEvent::from(ObjectMap::from([(
1054            "test_key".into(),
1055            Value::from(5),
1056        )])));
1057
1058        let memory = Memory::new(Default::default());
1059
1060        run_and_assert_sink_compliance(
1061            VectorSink::from_event_streamsink(memory),
1062            stream::once(ready(event)),
1063            &SINK_TAGS,
1064        )
1065        .await;
1066
1067        let metrics = Controller::get().unwrap().capture_metrics();
1068        let insertions_counter = metrics
1069            .iter()
1070            .find(|m| {
1071                matches!(m.value(), MetricValue::Counter { .. })
1072                    && m.name() == "memory_enrichment_table_insertions_total"
1073            })
1074            .expect("Insertions metric is missing!");
1075
1076        assert!(insertions_counter.tag_value("key").is_none());
1077    }
1078
1079    #[tokio::test]
1080    async fn source_spec_compliance() {
1081        let mut memory_config = MemoryConfig::default();
1082        memory_config.source_config = Some(MemorySourceConfig {
1083            export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1084            export_batch_size: None,
1085            remove_after_export: false,
1086            export_expired_items: false,
1087            source_key: "test".to_string(),
1088        });
1089        let memory = memory_config.get_or_build_memory(None).await;
1090        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1091
1092        let mut events: Vec<Event> = run_and_assert_source_compliance(
1093            memory_config,
1094            time::Duration::from_secs(5),
1095            &SOURCE_TAGS,
1096        )
1097        .await;
1098
1099        assert!(!events.is_empty());
1100        let event = events.remove(0);
1101        let log = event.as_log();
1102
1103        assert!(!log.value().is_empty());
1104    }
1105}