1use std::{
33 collections::HashMap,
34 sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, Error, IndexHandle, InternalError, Table};
41use crate::Case;
42
43type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48 loading: Arc<Mutex<Option<TableMap>>>,
49 tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52impl PartialEq for TableRegistry {
54 fn eq(&self, other: &Self) -> bool {
55 Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56 || self.tables.load().is_none()
57 && other.tables.load().is_none()
58 && self.loading.lock().expect("lock poison").is_none()
59 && other.loading.lock().expect("lock poison").is_none()
60 }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65 pub fn load(&self, mut tables: TableMap) {
94 let mut loading = self.loading.lock().unwrap();
95 let existing = self.tables.load();
96 if let Some(existing) = &**existing {
97 let extend = existing
99 .iter()
100 .filter(|(key, _)| !tables.contains_key(*key))
101 .map(|(key, value)| (key.clone(), value.clone()))
102 .collect::<HashMap<_, _>>();
103
104 tables.extend(extend);
105 }
106 match *loading {
107 None => *loading = Some(tables),
108 Some(ref mut loading) => loading.extend(tables),
109 }
110 }
111
112 pub fn finish_load(&self) {
121 let mut tables_lock = self.loading.lock().unwrap();
122 let tables = tables_lock.take();
123 self.tables.swap(Arc::new(tables));
124 }
125
126 pub fn table_ids(&self) -> Vec<String> {
135 let locked = self.loading.lock().unwrap();
136 match *locked {
137 Some(ref tables) => tables.keys().cloned().collect(),
138 None => Vec::new(),
139 }
140 }
141
142 pub fn add_index(
150 &mut self,
151 table: &str,
152 case: Case,
153 fields: &[&str],
154 ) -> Result<IndexHandle, Error> {
155 let mut locked = self.loading.lock().unwrap();
156
157 match *locked {
158 None => Err(Error::Internal {
159 source: InternalError::FinishLoadCalled,
160 }),
161 Some(ref mut tables) => match tables.get_mut(table) {
162 None => Err(Error::TableNotLoaded {
163 table: table.to_string(),
164 }),
165 Some(table) => table.add_index(case, fields),
166 },
167 }
168 }
169
170 pub fn as_readonly(&self) -> TableSearch {
173 TableSearch(self.tables.clone())
174 }
175
176 pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
179 match &**self.tables.load() {
180 Some(tables) => tables
181 .get(table)
182 .map(|table| table.index_fields())
183 .unwrap_or_default(),
184 None => Vec::new(),
185 }
186 }
187
188 pub fn needs_reload(&self, table: &str) -> bool {
191 match &**self.tables.load() {
192 Some(tables) => tables
193 .get(table)
194 .map(|table| table.needs_reload())
195 .unwrap_or(true),
196 None => true,
197 }
198 }
199
200 pub fn extract_state(&self, table: &str) -> Option<Box<dyn std::any::Any + Send + Sync>> {
202 match &**self.tables.load() {
203 Some(tables) => tables.get(table).and_then(|t| t.extract_state()),
204 None => None,
205 }
206 }
207}
208
209impl std::fmt::Debug for TableRegistry {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 fmt_enrichment_table(f, "TableRegistry", &self.tables)
212 }
213}
214
215#[derive(Clone, Default)]
219pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
220
221impl TableSearch {
222 pub fn find_table_row<'a>(
226 &self,
227 table: &str,
228 case: Case,
229 condition: &'a [Condition<'a>],
230 select: Option<&[String]>,
231 wildcard: Option<&Value>,
232 index: Option<IndexHandle>,
233 ) -> Result<ObjectMap, Error> {
234 let tables = self.0.load();
235 if let Some(ref tables) = **tables {
236 match tables.get(table) {
237 None => Err(Error::TableNotLoaded {
238 table: table.to_string(),
239 }),
240 Some(table) => table.find_table_row(case, condition, select, wildcard, index),
241 }
242 } else {
243 Err(Error::Internal {
244 source: InternalError::FinishLoadNotCalled,
245 })
246 }
247 }
248
249 pub fn find_table_rows<'a>(
253 &self,
254 table: &str,
255 case: Case,
256 condition: &'a [Condition<'a>],
257 select: Option<&[String]>,
258 wildcard: Option<&Value>,
259 index: Option<IndexHandle>,
260 ) -> Result<Vec<ObjectMap>, Error> {
261 let tables = self.0.load();
262 if let Some(ref tables) = **tables {
263 match tables.get(table) {
264 None => Err(Error::TableNotLoaded {
265 table: table.to_string(),
266 }),
267 Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
268 }
269 } else {
270 Err(Error::Internal {
271 source: InternalError::FinishLoadNotCalled,
272 })
273 }
274 }
275}
276
277impl std::fmt::Debug for TableSearch {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
280 }
281}
282
283fn fmt_enrichment_table(
285 f: &mut std::fmt::Formatter<'_>,
286 name: &'static str,
287 tables: &Arc<ArcSwap<Option<TableMap>>>,
288) -> std::fmt::Result {
289 let tables = tables.load();
290 match **tables {
291 Some(ref tables) => {
292 let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
293 s.push_str(key);
294 s.push_str(", ");
295 s
296 });
297
298 tables.truncate(std::cmp::max(tables.len(), 0));
299 tables.push(')');
300
301 write!(f, "{name} {tables}")
302 }
303 None => write!(f, "{name} loading"),
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use vrl::value::Value;
310
311 use super::*;
312 use crate::test_util::DummyEnrichmentTable;
313
314 #[test]
315 fn tables_loaded() {
316 let mut tables: TableMap = HashMap::new();
317 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
318 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
319
320 let registry = super::TableRegistry::default();
321 registry.load(tables);
322 let mut result = registry.table_ids();
323 result.sort();
324 assert_eq!(vec!["dummy1", "dummy2"], result);
325 }
326
327 #[test]
328 fn can_add_indexes() {
329 let mut tables: TableMap = HashMap::new();
330 let indexes = Arc::new(Mutex::new(Vec::new()));
331 let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
332 tables.insert("dummy1".to_string(), Box::new(dummy));
333 let mut registry = super::TableRegistry::default();
334 registry.load(tables);
335 assert_eq!(
336 Ok(IndexHandle(0)),
337 registry.add_index("dummy1", Case::Sensitive, &["erk"])
338 );
339
340 let indexes = indexes.lock().unwrap();
341 assert_eq!(vec!["erk".to_string()], *indexes[0]);
342 }
343
344 #[test]
345 fn can_not_find_table_row_before_finish() {
346 let mut tables: TableMap = HashMap::new();
347 let dummy = DummyEnrichmentTable::new();
348 tables.insert("dummy1".to_string(), Box::new(dummy));
349 let registry = super::TableRegistry::default();
350 registry.load(tables);
351 let tables = registry.as_readonly();
352
353 assert_eq!(
354 Err(Error::Internal {
355 source: InternalError::FinishLoadNotCalled,
356 }),
357 tables.find_table_row(
358 "dummy1",
359 Case::Sensitive,
360 &[Condition::Equals {
361 field: "thing",
362 value: Value::from("thang"),
363 }],
364 None,
365 None,
366 None
367 )
368 );
369 }
370
371 #[test]
372 fn can_not_add_indexes_after_finish() {
373 let mut tables: TableMap = HashMap::new();
374 let dummy = DummyEnrichmentTable::new();
375 tables.insert("dummy1".to_string(), Box::new(dummy));
376 let mut registry = super::TableRegistry::default();
377 registry.load(tables);
378 registry.finish_load();
379 assert_eq!(
380 Err(Error::Internal {
381 source: InternalError::FinishLoadCalled,
382 }),
383 registry.add_index("dummy1", Case::Sensitive, &["erk"])
384 );
385 }
386
387 #[test]
388 fn can_find_table_row_after_finish() {
389 let mut tables: TableMap = HashMap::new();
390 let dummy = DummyEnrichmentTable::new();
391 tables.insert("dummy1".to_string(), Box::new(dummy));
392
393 let registry = super::TableRegistry::default();
394 registry.load(tables);
395 let tables_search = registry.as_readonly();
396
397 registry.finish_load();
398
399 assert_eq!(
400 Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
401 tables_search.find_table_row(
402 "dummy1",
403 Case::Sensitive,
404 &[Condition::Equals {
405 field: "thing",
406 value: Value::from("thang"),
407 }],
408 None,
409 None,
410 None
411 )
412 );
413 }
414
415 #[test]
416 fn can_reload() {
417 let mut tables: TableMap = HashMap::new();
418 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
419
420 let registry = super::TableRegistry::default();
421 registry.load(tables);
422
423 assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
424
425 registry.finish_load();
426
427 assert!(registry.table_ids().is_empty());
429
430 let mut tables: TableMap = HashMap::new();
431 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
432
433 registry.load(tables);
435 let mut table_ids = registry.table_ids();
436 table_ids.sort();
437
438 assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
439 }
440
441 #[test]
442 fn reloads_existing_tables() {
443 let mut tables: TableMap = HashMap::new();
444 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
445 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
446
447 let registry = super::TableRegistry::default();
448 registry.load(tables);
449 registry.finish_load();
450
451 assert!(registry.table_ids().is_empty());
453
454 let mut new_data = ObjectMap::new();
455 new_data.insert("thing".into(), Value::Null);
456
457 let mut tables: TableMap = HashMap::new();
458 tables.insert(
459 "dummy2".to_string(),
460 Box::new(DummyEnrichmentTable::new_with_data(new_data)),
461 );
462
463 registry.load(tables);
465 let tables = registry.loading.lock().unwrap();
466 let tables = tables.clone().unwrap();
467
468 assert_eq!(
470 Value::from("result"),
471 tables
472 .get("dummy1")
473 .unwrap()
474 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
475 .unwrap()
476 .get("field")
477 .cloned()
478 .unwrap()
479 );
480
481 assert_eq!(
483 Value::Null,
484 tables
485 .get("dummy2")
486 .unwrap()
487 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
488 .unwrap()
489 .get("thing")
490 .cloned()
491 .unwrap()
492 );
493 }
494}