Skip to main content

enrichment/
tables.rs

1//! The Enrichment `TableRegistry` manages the collection of `Table`s loaded
2//! into Vector. Enrichment Tables go through two stages.
3//!
4//! ## 1. Writing
5//!
6//! The tables are loaded. There are two elements that need loading. The first
7//! is the actual data. This is loaded at config load time, the actual loading
8//! is performed by the implementation of the `EnrichmentTable` trait. Next, the
9//! tables are passed through Vectors `Transform` components, particularly the
10//! `Remap` transform. These Transforms are able to determine which fields we
11//! will want to lookup whilst Vector is running. They can notify the tables of
12//! these fields so that the data can be indexed.
13//!
14//! During this phase, the data is loaded within a single thread, so can be
15//! loaded directly into a `HashMap`.
16//!
17//! ## 2. Reading
18//!
19//! Once all the data has been loaded we can move to the next stage. This is
20//! signified by calling the `finish_load` method. At this point all the data is
21//! swapped into the `ArcSwap` of the `tables` field. `ArcSwap` provides
22//! lock-free read-only access to the data. From this point on we have fast,
23//! efficient read-only access and can no longer add indexes or otherwise mutate
24//! the data.
25//!
26//! This data within the `ArcSwap` is accessed through the `TableSearch`
27//! struct. Any transform that needs access to this can call
28//! `TableRegistry::as_readonly`. This returns a cheaply cloneable struct that
29//! implements `vrl:EnrichmentTableSearch` through with the enrichment tables
30//! can be searched.
31
32use std::{
33    collections::HashMap,
34    sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, Error, IndexHandle, InternalError, Table};
41use crate::Case;
42
43/// A hashmap of name => implementation of an enrichment table.
44type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48    loading: Arc<Mutex<Option<TableMap>>>,
49    tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52/// Pessimistic Eq implementation for caching purposes
53impl PartialEq for TableRegistry {
54    fn eq(&self, other: &Self) -> bool {
55        Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56            || self.tables.load().is_none()
57                && other.tables.load().is_none()
58                && self.loading.lock().expect("lock poison").is_none()
59                && other.loading.lock().expect("lock poison").is_none()
60    }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65    /// Load the given Enrichment Tables into the registry. This can be new tables
66    /// loaded from the config, or tables that need to be reloaded because the
67    /// underlying data has changed.
68    ///
69    /// If there are no tables currently loaded into the registry, this is a
70    /// simple operation, we simply load the tables into the `loading` field.
71    ///
72    /// If there are tables that have already been loaded things get a bit more
73    /// complicated. This can occur when the config is reloaded. Vector will be
74    /// currently running and transforming events, thus the tables loaded into
75    /// the `tables` field could be in active use. Since there is no lock
76    /// against these tables, we cannot mutate this list. We do need to have a
77    /// full list of tables in the `loading` field since there may be some
78    /// transforms that will need to add indexes to these tables during the
79    /// reload.
80    ///
81    /// Our only option is to clone the data that is in `tables` and move it
82    /// into the `loading` field so it can be mutated. This could be a
83    /// potentially expensive operation. For the period whilst the config is
84    /// reloading we could potentially have double the enrichment data loaded
85    /// into memory.
86    ///
87    /// Once loading is complete, the data is swapped out of `loading` and we
88    /// return to a single copy of the tables.
89    ///
90    /// # Panics
91    ///
92    /// Panics if the Mutex is poisoned.
93    pub fn load(&self, mut tables: TableMap) {
94        let mut loading = self.loading.lock().unwrap();
95        let existing = self.tables.load();
96        if let Some(existing) = &**existing {
97            // We already have some tables
98            let extend = existing
99                .iter()
100                .filter(|(key, _)| !tables.contains_key(*key))
101                .map(|(key, value)| (key.clone(), value.clone()))
102                .collect::<HashMap<_, _>>();
103
104            tables.extend(extend);
105        }
106        match *loading {
107            None => *loading = Some(tables),
108            Some(ref mut loading) => loading.extend(tables),
109        }
110    }
111
112    /// Swap the data out of the `HashTable` into the `ArcSwap`.
113    ///
114    /// From this point we can no longer add indexes to the tables, but are now
115    /// allowed to read the data.
116    ///
117    /// # Panics
118    ///
119    /// Panics if the Mutex is poisoned.
120    pub fn finish_load(&self) {
121        let mut tables_lock = self.loading.lock().unwrap();
122        let tables = tables_lock.take();
123        self.tables.swap(Arc::new(tables));
124    }
125
126    /// Return a list of the available tables that we can write to.
127    ///
128    /// This only works in the writing stage and will acquire a lock to retrieve
129    /// the tables.
130    ///
131    /// # Panics
132    ///
133    /// Panics if the Mutex is poisoned.
134    pub fn table_ids(&self) -> Vec<String> {
135        let locked = self.loading.lock().unwrap();
136        match *locked {
137            Some(ref tables) => tables.keys().cloned().collect(),
138            None => Vec::new(),
139        }
140    }
141
142    /// Adds an index to the given Enrichment Table.
143    ///
144    /// If we are in the reading stage, this function will error.
145    ///
146    /// # Panics
147    ///
148    /// Panics if the Mutex is poisoned.
149    pub fn add_index(
150        &mut self,
151        table: &str,
152        case: Case,
153        fields: &[&str],
154    ) -> Result<IndexHandle, Error> {
155        let mut locked = self.loading.lock().unwrap();
156
157        match *locked {
158            None => Err(Error::Internal {
159                source: InternalError::FinishLoadCalled,
160            }),
161            Some(ref mut tables) => match tables.get_mut(table) {
162                None => Err(Error::TableNotLoaded {
163                    table: table.to_string(),
164                }),
165                Some(table) => table.add_index(case, fields),
166            },
167        }
168    }
169
170    /// Returns a cheaply cloneable struct through that provides lock free read
171    /// access to the enrichment tables.
172    pub fn as_readonly(&self) -> TableSearch {
173        TableSearch(self.tables.clone())
174    }
175
176    /// Returns the indexes that have been applied to the given table.
177    /// If the table is reloaded we need these to reapply them to the new reloaded tables.
178    pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
179        match &**self.tables.load() {
180            Some(tables) => tables
181                .get(table)
182                .map(|table| table.index_fields())
183                .unwrap_or_default(),
184            None => Vec::new(),
185        }
186    }
187
188    /// Checks if the table needs reloading.
189    /// If in doubt (the table isn't in our list) we return true.
190    pub fn needs_reload(&self, table: &str) -> bool {
191        match &**self.tables.load() {
192            Some(tables) => tables
193                .get(table)
194                .map(|table| table.needs_reload())
195                .unwrap_or(true),
196            None => true,
197        }
198    }
199
200    /// Extracts state from the table if available.
201    pub fn extract_state(&self, table: &str) -> Option<Box<dyn std::any::Any + Send + Sync>> {
202        match &**self.tables.load() {
203            Some(tables) => tables.get(table).and_then(|t| t.extract_state()),
204            None => None,
205        }
206    }
207}
208
209impl std::fmt::Debug for TableRegistry {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        fmt_enrichment_table(f, "TableRegistry", &self.tables)
212    }
213}
214
215/// Provides read only access to the enrichment tables via the
216/// `vrl::EnrichmentTableSearch` trait. Cloning this object is designed to be
217/// cheap. The underlying data will be shared by all clones.
218#[derive(Clone, Default)]
219pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
220
221impl TableSearch {
222    /// Search the given table to find the data.
223    ///
224    /// If we are in the writing stage, this function will return an error.
225    pub fn find_table_row<'a>(
226        &self,
227        table: &str,
228        case: Case,
229        condition: &'a [Condition<'a>],
230        select: Option<&[String]>,
231        wildcard: Option<&Value>,
232        index: Option<IndexHandle>,
233    ) -> Result<ObjectMap, Error> {
234        let tables = self.0.load();
235        if let Some(ref tables) = **tables {
236            match tables.get(table) {
237                None => Err(Error::TableNotLoaded {
238                    table: table.to_string(),
239                }),
240                Some(table) => table.find_table_row(case, condition, select, wildcard, index),
241            }
242        } else {
243            Err(Error::Internal {
244                source: InternalError::FinishLoadNotCalled,
245            })
246        }
247    }
248
249    /// Search the enrichment table data with the given condition.
250    /// All conditions must match (AND).
251    /// Can return multiple matched records
252    pub fn find_table_rows<'a>(
253        &self,
254        table: &str,
255        case: Case,
256        condition: &'a [Condition<'a>],
257        select: Option<&[String]>,
258        wildcard: Option<&Value>,
259        index: Option<IndexHandle>,
260    ) -> Result<Vec<ObjectMap>, Error> {
261        let tables = self.0.load();
262        if let Some(ref tables) = **tables {
263            match tables.get(table) {
264                None => Err(Error::TableNotLoaded {
265                    table: table.to_string(),
266                }),
267                Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
268            }
269        } else {
270            Err(Error::Internal {
271                source: InternalError::FinishLoadNotCalled,
272            })
273        }
274    }
275}
276
277impl std::fmt::Debug for TableSearch {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
280    }
281}
282
283/// Provide some fairly rudimentary debug output for enrichment tables.
284fn fmt_enrichment_table(
285    f: &mut std::fmt::Formatter<'_>,
286    name: &'static str,
287    tables: &Arc<ArcSwap<Option<TableMap>>>,
288) -> std::fmt::Result {
289    let tables = tables.load();
290    match **tables {
291        Some(ref tables) => {
292            let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
293                s.push_str(key);
294                s.push_str(", ");
295                s
296            });
297
298            tables.truncate(std::cmp::max(tables.len(), 0));
299            tables.push(')');
300
301            write!(f, "{name} {tables}")
302        }
303        None => write!(f, "{name} loading"),
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use vrl::value::Value;
310
311    use super::*;
312    use crate::test_util::DummyEnrichmentTable;
313
314    #[test]
315    fn tables_loaded() {
316        let mut tables: TableMap = HashMap::new();
317        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
318        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
319
320        let registry = super::TableRegistry::default();
321        registry.load(tables);
322        let mut result = registry.table_ids();
323        result.sort();
324        assert_eq!(vec!["dummy1", "dummy2"], result);
325    }
326
327    #[test]
328    fn can_add_indexes() {
329        let mut tables: TableMap = HashMap::new();
330        let indexes = Arc::new(Mutex::new(Vec::new()));
331        let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
332        tables.insert("dummy1".to_string(), Box::new(dummy));
333        let mut registry = super::TableRegistry::default();
334        registry.load(tables);
335        assert_eq!(
336            Ok(IndexHandle(0)),
337            registry.add_index("dummy1", Case::Sensitive, &["erk"])
338        );
339
340        let indexes = indexes.lock().unwrap();
341        assert_eq!(vec!["erk".to_string()], *indexes[0]);
342    }
343
344    #[test]
345    fn can_not_find_table_row_before_finish() {
346        let mut tables: TableMap = HashMap::new();
347        let dummy = DummyEnrichmentTable::new();
348        tables.insert("dummy1".to_string(), Box::new(dummy));
349        let registry = super::TableRegistry::default();
350        registry.load(tables);
351        let tables = registry.as_readonly();
352
353        assert_eq!(
354            Err(Error::Internal {
355                source: InternalError::FinishLoadNotCalled,
356            }),
357            tables.find_table_row(
358                "dummy1",
359                Case::Sensitive,
360                &[Condition::Equals {
361                    field: "thing",
362                    value: Value::from("thang"),
363                }],
364                None,
365                None,
366                None
367            )
368        );
369    }
370
371    #[test]
372    fn can_not_add_indexes_after_finish() {
373        let mut tables: TableMap = HashMap::new();
374        let dummy = DummyEnrichmentTable::new();
375        tables.insert("dummy1".to_string(), Box::new(dummy));
376        let mut registry = super::TableRegistry::default();
377        registry.load(tables);
378        registry.finish_load();
379        assert_eq!(
380            Err(Error::Internal {
381                source: InternalError::FinishLoadCalled,
382            }),
383            registry.add_index("dummy1", Case::Sensitive, &["erk"])
384        );
385    }
386
387    #[test]
388    fn can_find_table_row_after_finish() {
389        let mut tables: TableMap = HashMap::new();
390        let dummy = DummyEnrichmentTable::new();
391        tables.insert("dummy1".to_string(), Box::new(dummy));
392
393        let registry = super::TableRegistry::default();
394        registry.load(tables);
395        let tables_search = registry.as_readonly();
396
397        registry.finish_load();
398
399        assert_eq!(
400            Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
401            tables_search.find_table_row(
402                "dummy1",
403                Case::Sensitive,
404                &[Condition::Equals {
405                    field: "thing",
406                    value: Value::from("thang"),
407                }],
408                None,
409                None,
410                None
411            )
412        );
413    }
414
415    #[test]
416    fn can_reload() {
417        let mut tables: TableMap = HashMap::new();
418        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
419
420        let registry = super::TableRegistry::default();
421        registry.load(tables);
422
423        assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
424
425        registry.finish_load();
426
427        // After we finish load there are no tables in the list
428        assert!(registry.table_ids().is_empty());
429
430        let mut tables: TableMap = HashMap::new();
431        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
432
433        // A load should put both tables back into the list.
434        registry.load(tables);
435        let mut table_ids = registry.table_ids();
436        table_ids.sort();
437
438        assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
439    }
440
441    #[test]
442    fn reloads_existing_tables() {
443        let mut tables: TableMap = HashMap::new();
444        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
445        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
446
447        let registry = super::TableRegistry::default();
448        registry.load(tables);
449        registry.finish_load();
450
451        // After we finish load there are no tables in the list
452        assert!(registry.table_ids().is_empty());
453
454        let mut new_data = ObjectMap::new();
455        new_data.insert("thing".into(), Value::Null);
456
457        let mut tables: TableMap = HashMap::new();
458        tables.insert(
459            "dummy2".to_string(),
460            Box::new(DummyEnrichmentTable::new_with_data(new_data)),
461        );
462
463        // A load should put both tables back into the list.
464        registry.load(tables);
465        let tables = registry.loading.lock().unwrap();
466        let tables = tables.clone().unwrap();
467
468        // dummy1 should still have old data.
469        assert_eq!(
470            Value::from("result"),
471            tables
472                .get("dummy1")
473                .unwrap()
474                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
475                .unwrap()
476                .get("field")
477                .cloned()
478                .unwrap()
479        );
480
481        // dummy2 should have new data.
482        assert_eq!(
483            Value::Null,
484            tables
485                .get("dummy2")
486                .unwrap()
487                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
488                .unwrap()
489                .get("thing")
490                .cloned()
491                .unwrap()
492        );
493    }
494}