Skip to main content

vector/enrichment_tables/
file.rs

1//! Handles enrichment tables for `type = file`.
2use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime};
3
4use bytes::Bytes;
5use tracing::trace;
6use vector_lib::{
7    TimeZone,
8    configurable::configurable_component,
9    conversion::Conversion,
10    enrichment::{Case, Condition, Error, IndexHandle, Table},
11};
12use vrl::value::{ObjectMap, Value};
13
14use crate::config::EnrichmentTableConfig;
15
16/// File encoding configuration.
17#[configurable_component]
18#[derive(Clone, Debug, Eq, PartialEq)]
19#[serde(tag = "type", rename_all = "snake_case")]
20#[configurable(metadata(docs::enum_tag_description = "File encoding type."))]
21pub enum Encoding {
22    /// Decodes the file as a [CSV][csv] (comma-separated values) file.
23    ///
24    /// [csv]: https://wikipedia.org/wiki/Comma-separated_values
25    Csv {
26        /// Whether or not the file contains column headers.
27        ///
28        /// When set to `true`, the first row of the CSV file will be read as the header row, and
29        /// the values will be used for the names of each column. This is the default behavior.
30        ///
31        /// When set to `false`, columns are referred to by their numerical index.
32        #[serde(default = "crate::serde::default_true")]
33        include_headers: bool,
34
35        /// The delimiter used to separate fields in each row of the CSV file.
36        #[serde(default = "default_delimiter")]
37        delimiter: char,
38    },
39}
40
41impl Default for Encoding {
42    fn default() -> Self {
43        Self::Csv {
44            include_headers: true,
45            delimiter: default_delimiter(),
46        }
47    }
48}
49
50/// File-specific settings.
51#[configurable_component]
52#[derive(Clone, Debug, Default, Eq, PartialEq)]
53pub struct FileSettings {
54    /// The path of the enrichment table file.
55    ///
56    /// Currently, only [CSV][csv] files are supported.
57    ///
58    /// [csv]: https://en.wikipedia.org/wiki/Comma-separated_values
59    pub path: PathBuf,
60
61    /// File encoding configuration.
62    #[configurable(derived)]
63    pub encoding: Encoding,
64}
65
66/// Configuration for the `file` enrichment table.
67#[configurable_component(enrichment_table("file"))]
68#[derive(Clone, Debug, Default, Eq, PartialEq)]
69pub struct FileConfig {
70    /// File-specific settings.
71    #[configurable(derived)]
72    pub file: FileSettings,
73
74    /// Key/value pairs representing mapped log field names and types.
75    ///
76    /// This is used to coerce log fields from strings into their proper types. The available types are listed in the `Types` list below.
77    ///
78    /// Timestamp coercions need to be prefaced with `timestamp|`, for example `"timestamp|%F"`. Timestamp specifiers can use either of the following:
79    ///
80    /// 1. One of the built-in-formats listed in the `Timestamp Formats` table below.
81    /// 2. The [time format specifiers][chrono_fmt] from Rust’s `chrono` library.
82    ///
83    /// Types
84    ///
85    /// - **`bool`**
86    /// - **`string`**
87    /// - **`float`**
88    /// - **`integer`**
89    /// - **`date`**
90    /// - **`timestamp`** (see the table below for formats)
91    ///
92    /// Timestamp Formats
93    ///
94    /// | Format               | Description                                                                      | Example                          |
95    /// |----------------------|----------------------------------------------------------------------------------|----------------------------------|
96    /// | `%F %T`              | `YYYY-MM-DD HH:MM:SS`                                                            | `2020-12-01 02:37:54`            |
97    /// | `%v %T`              | `DD-Mmm-YYYY HH:MM:SS`                                                           | `01-Dec-2020 02:37:54`           |
98    /// | `%FT%T`              | [ISO 8601][iso8601]/[RFC 3339][rfc3339], without time zone                       | `2020-12-01T02:37:54`            |
99    /// | `%FT%TZ`             | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC                                     | `2020-12-01T09:37:54Z`           |
100    /// | `%+`                 | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC, with time zone                     | `2020-12-01T02:37:54-07:00`      |
101    /// | `%a, %d %b %Y %T`    | [RFC 822][rfc822]/[RFC 2822][rfc2822], without time zone                         | `Tue, 01 Dec 2020 02:37:54`      |
102    /// | `%a %b %e %T %Y`     | [ctime][ctime] format                                                            | `Tue Dec 1 02:37:54 2020`        |
103    /// | `%s`                 | [UNIX timestamp][unix_ts]                                                        | `1606790274`                     |
104    /// | `%a %d %b %T %Y`     | [date][date] command, without time zone                                          | `Tue 01 Dec 02:37:54 2020`       |
105    /// | `%a %d %b %T %Z %Y`  | [date][date] command, with time zone                                             | `Tue 01 Dec 02:37:54 PST 2020`   |
106    /// | `%a %d %b %T %z %Y`  | [date][date] command, with numeric time zone                                     | `Tue 01 Dec 02:37:54 -0700 2020` |
107    /// | `%a %d %b %T %#z %Y` | [date][date] command, with numeric time zone (minutes can be missing or present) | `Tue 01 Dec 02:37:54 -07 2020`   |
108    ///
109    /// [date]: https://man7.org/linux/man-pages/man1/date.1.html
110    /// [ctime]: https://www.cplusplus.com/reference/ctime
111    /// [unix_ts]: https://en.wikipedia.org/wiki/Unix_time
112    /// [rfc822]: https://tools.ietf.org/html/rfc822#section-5
113    /// [rfc2822]: https://tools.ietf.org/html/rfc2822#section-3.3
114    /// [iso8601]: https://en.wikipedia.org/wiki/ISO_8601
115    /// [rfc3339]: https://tools.ietf.org/html/rfc3339
116    /// [chrono_fmt]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
117    #[serde(default)]
118    #[configurable(metadata(
119        docs::additional_props_description = "Represents mapped log field names and types."
120    ))]
121    pub schema: HashMap<String, String>,
122}
123
124const fn default_delimiter() -> char {
125    ','
126}
127
128impl FileConfig {
129    fn parse_column(
130        &self,
131        timezone: TimeZone,
132        column: &str,
133        row: usize,
134        value: &str,
135    ) -> Result<Value, String> {
136        use chrono::TimeZone;
137
138        Ok(match self.schema.get(column) {
139            Some(format) => {
140                let mut split = format.splitn(2, '|').map(|segment| segment.trim());
141
142                match (split.next(), split.next()) {
143                    (Some("date"), None) => Value::Timestamp(
144                        chrono::FixedOffset::east_opt(0)
145                            .expect("invalid timestamp")
146                            .from_utc_datetime(
147                                &chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d")
148                                    .map_err(|_| {
149                                        format!("unable to parse date {value} found in row {row}")
150                                    })?
151                                    .and_hms_opt(0, 0, 0)
152                                    .expect("invalid timestamp"),
153                            )
154                            .into(),
155                    ),
156                    (Some("date"), Some(format)) => Value::Timestamp(
157                        chrono::FixedOffset::east_opt(0)
158                            .expect("invalid timestamp")
159                            .from_utc_datetime(
160                                &chrono::NaiveDate::parse_from_str(value, format)
161                                    .map_err(|_| {
162                                        format!("unable to parse date {value} found in row {row}")
163                                    })?
164                                    .and_hms_opt(0, 0, 0)
165                                    .expect("invalid timestamp"),
166                            )
167                            .into(),
168                    ),
169                    _ => {
170                        let conversion =
171                            Conversion::parse(format, timezone).map_err(|err| err.to_string())?;
172                        conversion
173                            .convert(Bytes::copy_from_slice(value.as_bytes()))
174                            .map_err(|_| format!("unable to parse {value} found in row {row}"))?
175                    }
176                }
177            }
178            None => value.into(),
179        })
180    }
181
182    /// Load the configured file into memory. Required to create a new file enrichment table.
183    pub fn load_file(&self, timezone: TimeZone) -> crate::Result<FileData> {
184        let Encoding::Csv {
185            include_headers,
186            delimiter,
187        } = self.file.encoding;
188
189        let mut reader = csv::ReaderBuilder::new()
190            .has_headers(include_headers)
191            .delimiter(delimiter as u8)
192            .from_path(&self.file.path)?;
193
194        let first_row = reader.records().next();
195        let headers = if include_headers {
196            reader
197                .headers()?
198                .iter()
199                .map(|col| col.to_string())
200                .collect::<Vec<_>>()
201        } else {
202            // If there are no headers in the datafile we make headers as the numerical index of
203            // the column.
204            match first_row {
205                Some(Ok(ref row)) => (0..row.len()).map(|idx| idx.to_string()).collect(),
206                _ => Vec::new(),
207            }
208        };
209
210        let data = first_row
211            .into_iter()
212            .chain(reader.records())
213            .map(|row| {
214                Ok(row?
215                    .iter()
216                    .enumerate()
217                    .map(|(idx, col)| self.parse_column(timezone, &headers[idx], idx, col))
218                    .collect::<Result<Vec<_>, String>>()?)
219            })
220            .collect::<crate::Result<Vec<_>>>()?;
221
222        trace!(
223            "Loaded enrichment file {} with headers {:?}.",
224            self.file.path.to_str().unwrap_or("path with invalid utf"),
225            headers
226        );
227
228        let file = reader.into_inner();
229
230        Ok(FileData {
231            headers,
232            data,
233            modified: file.metadata()?.modified()?,
234        })
235    }
236}
237
238impl EnrichmentTableConfig for FileConfig {
239    async fn build(
240        &self,
241        globals: &crate::config::GlobalOptions,
242        _prev_state: Option<Box<dyn std::any::Any + Send + Sync>>,
243    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
244        Ok(Box::new(File::new(
245            self.clone(),
246            self.load_file(globals.timezone())?,
247        )))
248    }
249}
250
251impl_generate_config_from_default!(FileConfig);
252
253/// The data resulting from loading a configured file.
254pub struct FileData {
255    /// The ordered set of headers of the data columns.
256    pub headers: Vec<String>,
257    /// The data contained in the file.
258    pub data: Vec<Vec<Value>>,
259    /// The last modified time of the file.
260    pub modified: SystemTime,
261}
262
263/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a CSV file.
264#[derive(Clone)]
265pub struct File {
266    config: FileConfig,
267    last_modified: SystemTime,
268    data: Vec<Vec<Value>>,
269    headers: Vec<String>,
270    indexes: Vec<(
271        Case,
272        Vec<usize>,
273        HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>,
274    )>,
275}
276
277impl File {
278    /// Creates a new [File] based on the provided config.
279    pub fn new(config: FileConfig, data: FileData) -> Self {
280        Self {
281            config,
282            last_modified: data.modified,
283            data: data.data,
284            headers: data.headers,
285            indexes: Vec::new(),
286        }
287    }
288
289    fn column_index(&self, col: &str) -> Option<usize> {
290        self.headers.iter().position(|header| header == col)
291    }
292
293    /// Does the given row match all the conditions specified?
294    fn row_equals(
295        &self,
296        case: Case,
297        condition: &[Condition],
298        row: &[Value],
299        wildcard: Option<&Value>,
300    ) -> bool {
301        condition.iter().all(|condition| match condition {
302            Condition::Equals { field, value } => match self.column_index(field) {
303                None => false,
304                Some(idx) => {
305                    let current_row_value = &row[idx];
306
307                    // Helper closure for comparing current_row_value with another value,
308                    // respecting the specified case for Value::Bytes.
309                    let compare_values = |val_to_compare: &Value| -> bool {
310                        match (case, current_row_value, val_to_compare) {
311                            (
312                                Case::Insensitive,
313                                Value::Bytes(bytes_row),
314                                Value::Bytes(bytes_cmp),
315                            ) => {
316                                // Perform case-insensitive comparison for byte strings.
317                                // If both are valid UTF-8, compare their lowercase versions.
318                                // If both are non-UTF-8 bytes, compare them directly.
319                                // If one is UTF-8 and the other is not, they are considered not equal.
320                                match (
321                                    std::str::from_utf8(bytes_row),
322                                    std::str::from_utf8(bytes_cmp),
323                                ) {
324                                    (Ok(s_row), Ok(s_cmp)) => {
325                                        s_row.to_lowercase() == s_cmp.to_lowercase()
326                                    }
327                                    (Err(_), Err(_)) => bytes_row == bytes_cmp,
328                                    _ => false,
329                                }
330                            }
331                            // For Case::Sensitive, or for Case::Insensitive with non-Bytes types,
332                            // perform a direct equality check.
333                            _ => current_row_value == val_to_compare,
334                        }
335                    };
336
337                    // First, check if the row value matches the condition's value.
338                    if compare_values(value) {
339                        true
340                    } else if let Some(wc_val) = wildcard {
341                        // If not, and a wildcard is provided, check if the row value matches the wildcard.
342                        compare_values(wc_val)
343                    } else {
344                        // Otherwise, no match.
345                        false
346                    }
347                }
348            },
349            Condition::BetweenDates { field, from, to } => match self.column_index(field) {
350                None => false,
351                Some(idx) => match row[idx] {
352                    Value::Timestamp(date) => from <= &date && &date <= to,
353                    _ => false,
354                },
355            },
356            Condition::FromDate { field, from } => match self.column_index(field) {
357                None => false,
358                Some(idx) => match row[idx] {
359                    Value::Timestamp(date) => from <= &date,
360                    _ => false,
361                },
362            },
363            Condition::ToDate { field, to } => match self.column_index(field) {
364                None => false,
365                Some(idx) => match row[idx] {
366                    Value::Timestamp(date) => &date <= to,
367                    _ => false,
368                },
369            },
370        })
371    }
372
373    fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap {
374        self.headers
375            .iter()
376            .zip(row)
377            .filter(|(header, _)| {
378                select
379                    .map(|select| select.contains(header))
380                    // If no select is passed, we assume all columns are included
381                    .unwrap_or(true)
382            })
383            .map(|(header, col)| (header.as_str().into(), col.clone()))
384            .collect()
385    }
386
387    /// Order the fields in the index according to the position they are found in the header.
388    fn normalize_index_fields(&self, index: &[&str]) -> Result<Vec<usize>, Error> {
389        // Get the positions of the fields we are indexing
390        let normalized = self
391            .headers
392            .iter()
393            .enumerate()
394            .filter_map(|(idx, col)| {
395                if index.contains(&col.as_ref()) {
396                    Some(idx)
397                } else {
398                    None
399                }
400            })
401            .collect::<Vec<_>>();
402
403        if normalized.len() != index.len() {
404            let fields = index
405                .iter()
406                .filter_map(|col| {
407                    if self.headers.iter().any(|header| header == *col) {
408                        None
409                    } else {
410                        Some(col.to_string())
411                    }
412                })
413                .collect();
414            Err(Error::MissingDatasetFields { fields })
415        } else {
416            Ok(normalized)
417        }
418    }
419
420    /// Creates an index with the given fields.
421    /// Uses seahash to create a hash of the data that is used as the key in a hashmap lookup to
422    /// the index of the row in the data.
423    ///
424    /// Ensure fields that are searched via a comparison are not included in the index!
425    fn index_data(
426        &self,
427        fieldidx: &[usize],
428        case: Case,
429    ) -> Result<HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>, Error> {
430        let mut index = HashMap::with_capacity_and_hasher(
431            self.data.len(),
432            hash_hasher::HashBuildHasher::default(),
433        );
434
435        for (idx, row) in self.data.iter().enumerate() {
436            let mut hash = seahash::SeaHasher::default();
437
438            for idx in fieldidx {
439                hash_value(&mut hash, case, &row[*idx])?;
440            }
441
442            let key = hash.finish();
443
444            let entry = index.entry(key).or_insert_with(Vec::new);
445            entry.push(idx);
446        }
447
448        index.shrink_to_fit();
449
450        Ok(index)
451    }
452
453    /// Sequentially searches through the iterator for the given condition.
454    fn sequential<'a, I>(
455        &'a self,
456        data: I,
457        case: Case,
458        condition: &'a [Condition<'a>],
459        select: Option<&'a [String]>,
460        wildcard: Option<&'a Value>,
461    ) -> impl Iterator<Item = ObjectMap> + 'a
462    where
463        I: Iterator<Item = &'a Vec<Value>> + 'a,
464    {
465        data.filter_map(move |row| {
466            if self.row_equals(case, condition, row, wildcard) {
467                Some(self.add_columns(select, row))
468            } else {
469                None
470            }
471        })
472    }
473
474    fn indexed<'a>(
475        &'a self,
476        case: Case,
477        condition: &'a [Condition<'a>],
478        handle: IndexHandle,
479    ) -> Result<Option<&'a Vec<usize>>, Error> {
480        // The index to use has been passed, we can use this to search the data.
481        // We are assuming that the caller has passed an index that represents the fields
482        // being passed in the condition.
483        let mut hash = seahash::SeaHasher::default();
484
485        for header in self.headers.iter() {
486            if let Some(Condition::Equals { value, .. }) = condition.iter().find(
487                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
488            ) {
489                hash_value(&mut hash, case, value)?;
490            }
491        }
492
493        let key = hash.finish();
494
495        let IndexHandle(handle) = handle;
496        Ok(self.indexes[handle].2.get(&key))
497    }
498
499    fn indexed_with_wildcard<'a>(
500        &'a self,
501        case: Case,
502        wildcard: &'a Value,
503        condition: &'a [Condition<'a>],
504        handle: IndexHandle,
505    ) -> Result<Option<&'a Vec<usize>>, Error> {
506        if let Some(result) = self.indexed(case, condition, handle)? {
507            return Ok(Some(result));
508        }
509
510        // If lookup fails and a wildcard is provided, compute hash for the wildcard
511        let mut wildcard_hash = seahash::SeaHasher::default();
512        for header in self.headers.iter() {
513            if condition.iter().any(
514                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
515            ) {
516                hash_value(&mut wildcard_hash, case, wildcard)?;
517            }
518        }
519
520        let wildcard_key = wildcard_hash.finish();
521        let IndexHandle(handle) = handle;
522        Ok(self.indexes[handle].2.get(&wildcard_key))
523    }
524}
525
526/// Adds the bytes from the given value to the hash.
527/// Each field is terminated by a `0` value to separate the fields
528fn hash_value(hasher: &mut seahash::SeaHasher, case: Case, value: &Value) -> Result<(), Error> {
529    match value {
530        Value::Bytes(bytes) => match case {
531            Case::Sensitive => hasher.write(bytes),
532            Case::Insensitive => hasher.write(
533                std::str::from_utf8(bytes)
534                    .map_err(|source| Error::InvalidUtfInColumn { source })?
535                    .to_lowercase()
536                    .as_bytes(),
537            ),
538        },
539        value => {
540            let bytes: bytes::Bytes = value
541                .encode_as_bytes()
542                .map_err(|details| Error::FailedToEncodeValue { details })?;
543            hasher.write(&bytes);
544        }
545    }
546
547    hasher.write_u8(0);
548
549    Ok(())
550}
551
552/// Returns an error if the iterator doesn't yield exactly one result.
553fn single_or_err<I, T>(mut iter: T) -> Result<I, Error>
554where
555    T: Iterator<Item = I>,
556{
557    let result = iter.next();
558
559    if iter.next().is_some() {
560        // More than one row has been found.
561        Err(Error::MoreThanOneRowFound)
562    } else {
563        result.ok_or(Error::NoRowsFound)
564    }
565}
566
567impl Table for File {
568    fn find_table_row<'a>(
569        &self,
570        case: Case,
571        condition: &'a [Condition<'a>],
572        select: Option<&'a [String]>,
573        wildcard: Option<&Value>,
574        index: Option<IndexHandle>,
575    ) -> Result<ObjectMap, Error> {
576        match index {
577            None => {
578                // No index has been passed so we need to do a Sequential Scan.
579                single_or_err(self.sequential(self.data.iter(), case, condition, select, wildcard))
580            }
581            Some(handle) => {
582                let result = if let Some(wildcard) = wildcard {
583                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
584                } else {
585                    self.indexed(case, condition, handle)?
586                }
587                .ok_or(Error::NoRowsFound)?
588                .iter()
589                .map(|idx| &self.data[*idx]);
590
591                // Perform a sequential scan over the indexed result.
592                single_or_err(self.sequential(result, case, condition, select, wildcard))
593            }
594        }
595    }
596
597    fn find_table_rows<'a>(
598        &self,
599        case: Case,
600        condition: &'a [Condition<'a>],
601        select: Option<&'a [String]>,
602        wildcard: Option<&Value>,
603        index: Option<IndexHandle>,
604    ) -> Result<Vec<ObjectMap>, Error> {
605        match index {
606            None => {
607                // No index has been passed so we need to do a Sequential Scan.
608                Ok(self
609                    .sequential(self.data.iter(), case, condition, select, wildcard)
610                    .collect())
611            }
612            Some(handle) => {
613                // Perform a sequential scan over the indexed result.
614                let indexed_result = if let Some(wildcard) = wildcard {
615                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
616                } else {
617                    self.indexed(case, condition, handle)?
618                };
619
620                Ok(self
621                    .sequential(
622                        indexed_result
623                            .iter()
624                            .flat_map(|results| results.iter().map(|idx| &self.data[*idx])),
625                        case,
626                        condition,
627                        select,
628                        wildcard,
629                    )
630                    .collect())
631            }
632        }
633    }
634
635    fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
636        let normalized = self.normalize_index_fields(fields)?;
637        match self
638            .indexes
639            .iter()
640            .position(|index| index.0 == case && index.1 == normalized)
641        {
642            Some(pos) => {
643                // This index already exists
644                Ok(IndexHandle(pos))
645            }
646            None => {
647                let index = self.index_data(&normalized, case)?;
648                self.indexes.push((case, normalized, index));
649                // The returned index handle is the position of the index in our list of indexes.
650                Ok(IndexHandle(self.indexes.len() - 1))
651            }
652        }
653    }
654
655    /// Returns a list of the field names that are in each index
656    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
657        self.indexes
658            .iter()
659            .map(|index| {
660                let (case, fields, _) = index;
661                (
662                    *case,
663                    fields
664                        .iter()
665                        .map(|idx| self.headers[*idx].clone())
666                        .collect::<Vec<_>>(),
667                )
668            })
669            .collect::<Vec<_>>()
670    }
671
672    /// Checks the modified timestamp of the data file to see if data has changed.
673    fn needs_reload(&self) -> bool {
674        matches!(fs::metadata(&self.config.file.path)
675            .and_then(|metadata| metadata.modified()),
676            Ok(modified) if modified > self.last_modified)
677    }
678}
679
680impl std::fmt::Debug for File {
681    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682        write!(
683            f,
684            "File {} row(s) {} index(es)",
685            self.data.len(),
686            self.indexes.len()
687        )
688    }
689}
690
691#[cfg(test)]
692mod tests {
693    use chrono::{TimeZone, Timelike};
694
695    use super::*;
696
697    #[test]
698    fn parse_file_with_headers() {
699        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
700        let path = dir.path().join("table.csv");
701        fs::write(path.clone(), "foo,bar\na,1\nb,2").expect("Failed to write enrichment table");
702
703        let config = FileConfig {
704            file: FileSettings {
705                path,
706                encoding: Encoding::Csv {
707                    include_headers: true,
708                    delimiter: default_delimiter(),
709                },
710            },
711            schema: HashMap::new(),
712        };
713        let data = config
714            .load_file(Default::default())
715            .expect("Failed to parse csv");
716        assert_eq!(vec!["foo".to_string(), "bar".to_string()], data.headers);
717        assert_eq!(
718            vec![
719                vec![Value::from("a"), Value::from("1")],
720                vec![Value::from("b"), Value::from("2")],
721            ],
722            data.data
723        );
724    }
725
726    #[test]
727    fn parse_file_no_headers() {
728        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
729        let path = dir.path().join("table.csv");
730        fs::write(path.clone(), "a,1\nb,2").expect("Failed to write enrichment table");
731
732        let config = FileConfig {
733            file: FileSettings {
734                path,
735                encoding: Encoding::Csv {
736                    include_headers: false,
737                    delimiter: default_delimiter(),
738                },
739            },
740            schema: HashMap::new(),
741        };
742        let data = config
743            .load_file(Default::default())
744            .expect("Failed to parse csv");
745        assert_eq!(vec!["0".to_string(), "1".to_string()], data.headers);
746        assert_eq!(
747            vec![
748                vec![Value::from("a"), Value::from("1")],
749                vec![Value::from("b"), Value::from("2")],
750            ],
751            data.data
752        );
753    }
754
755    #[test]
756    fn parse_column() {
757        let mut schema = HashMap::new();
758        schema.insert("col1".to_string(), " string ".to_string());
759        schema.insert("col2".to_string(), " date ".to_string());
760        schema.insert("col3".to_string(), "date|%m/%d/%Y".to_string());
761        schema.insert("col3-spaces".to_string(), "date | %m %d %Y".to_string());
762        schema.insert("col4".to_string(), "timestamp|%+".to_string());
763        schema.insert("col4-spaces".to_string(), "timestamp | %+".to_string());
764        schema.insert("col5".to_string(), "int".to_string());
765        let config = FileConfig {
766            file: Default::default(),
767            schema,
768        };
769
770        assert_eq!(
771            Ok(Value::from("zork")),
772            config.parse_column(Default::default(), "col1", 1, "zork")
773        );
774
775        assert_eq!(
776            Ok(Value::from(
777                chrono::Utc
778                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
779                    .single()
780                    .expect("invalid timestamp")
781            )),
782            config.parse_column(Default::default(), "col2", 1, "2020-03-05")
783        );
784
785        assert_eq!(
786            Ok(Value::from(
787                chrono::Utc
788                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
789                    .single()
790                    .expect("invalid timestamp")
791            )),
792            config.parse_column(Default::default(), "col3", 1, "03/05/2020")
793        );
794
795        assert_eq!(
796            Ok(Value::from(
797                chrono::Utc
798                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
799                    .single()
800                    .expect("invalid timestamp")
801            )),
802            config.parse_column(Default::default(), "col3-spaces", 1, "03 05 2020")
803        );
804
805        assert_eq!(
806            Ok(Value::from(
807                chrono::Utc
808                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
809                    .single()
810                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
811                    .expect("invalid timestamp")
812            )),
813            config.parse_column(
814                Default::default(),
815                "col4",
816                1,
817                "2001-07-08T00:34:00.026490+09:30"
818            )
819        );
820
821        assert_eq!(
822            Ok(Value::from(
823                chrono::Utc
824                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
825                    .single()
826                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
827                    .expect("invalid timestamp")
828            )),
829            config.parse_column(
830                Default::default(),
831                "col4-spaces",
832                1,
833                "2001-07-08T00:34:00.026490+09:30"
834            )
835        );
836
837        assert_eq!(
838            Ok(Value::from(42)),
839            config.parse_column(Default::default(), "col5", 1, "42")
840        );
841    }
842
843    #[test]
844    fn seahash() {
845        // Ensure we can separate fields to create a distinct hash.
846        let mut one = seahash::SeaHasher::default();
847        one.write(b"norknoog");
848        one.write_u8(0);
849        one.write(b"donk");
850
851        let mut two = seahash::SeaHasher::default();
852        two.write(b"nork");
853        one.write_u8(0);
854        two.write(b"noogdonk");
855
856        assert_ne!(one.finish(), two.finish());
857    }
858
859    #[test]
860    fn finds_row() {
861        let file = File::new(
862            Default::default(),
863            FileData {
864                modified: SystemTime::now(),
865                data: vec![
866                    vec!["zip".into(), "zup".into()],
867                    vec!["zirp".into(), "zurp".into()],
868                ],
869                headers: vec!["field1".to_string(), "field2".to_string()],
870            },
871        );
872
873        let condition = Condition::Equals {
874            field: "field1",
875            value: Value::from("zirp"),
876        };
877
878        assert_eq!(
879            Ok(ObjectMap::from([
880                ("field1".into(), Value::from("zirp")),
881                ("field2".into(), Value::from("zurp")),
882            ])),
883            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
884        );
885    }
886
887    #[test]
888    fn finds_row_with_wildcard() {
889        let file = File::new(
890            Default::default(),
891            FileData {
892                modified: SystemTime::now(),
893                data: vec![
894                    vec!["zip".into(), "zup".into()],
895                    vec!["zirp".into(), "zurp".into()],
896                ],
897                headers: vec!["field1".to_string(), "field2".to_string()],
898            },
899        );
900
901        let wildcard = Value::from("zirp");
902
903        let condition = Condition::Equals {
904            field: "field1",
905            value: Value::from("nonexistent"),
906        };
907
908        assert_eq!(
909            Ok(ObjectMap::from([
910                ("field1".into(), Value::from("zirp")),
911                ("field2".into(), Value::from("zurp")),
912            ])),
913            file.find_table_row(Case::Sensitive, &[condition], None, Some(&wildcard), None)
914        );
915    }
916
917    #[test]
918    fn duplicate_indexes() {
919        let mut file = File::new(
920            Default::default(),
921            FileData {
922                modified: SystemTime::now(),
923                data: Vec::new(),
924                headers: vec![
925                    "field1".to_string(),
926                    "field2".to_string(),
927                    "field3".to_string(),
928                ],
929            },
930        );
931
932        let handle1 = file.add_index(Case::Sensitive, &["field2", "field3"]);
933        let handle2 = file.add_index(Case::Sensitive, &["field3", "field2"]);
934
935        assert_eq!(handle1, handle2);
936        assert_eq!(1, file.indexes.len());
937    }
938
939    #[test]
940    fn errors_on_missing_columns() {
941        let mut file = File::new(
942            Default::default(),
943            FileData {
944                modified: SystemTime::now(),
945                data: Vec::new(),
946                headers: vec![
947                    "field1".to_string(),
948                    "field2".to_string(),
949                    "field3".to_string(),
950                ],
951            },
952        );
953
954        let error = file.add_index(Case::Sensitive, &["apples", "field2", "bananas"]);
955        assert_eq!(
956            Err(Error::MissingDatasetFields {
957                fields: vec!["apples".into(), "bananas".into()],
958            }),
959            error
960        )
961    }
962
963    #[test]
964    fn finds_row_with_index() {
965        let mut file = File::new(
966            Default::default(),
967            FileData {
968                modified: SystemTime::now(),
969                data: vec![
970                    vec!["zip".into(), "zup".into()],
971                    vec!["zirp".into(), "zurp".into()],
972                ],
973                headers: vec!["field1".to_string(), "field2".to_string()],
974            },
975        );
976
977        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
978
979        let condition = Condition::Equals {
980            field: "field1",
981            value: Value::from("zirp"),
982        };
983
984        assert_eq!(
985            Ok(ObjectMap::from([
986                ("field1".into(), Value::from("zirp")),
987                ("field2".into(), Value::from("zurp")),
988            ])),
989            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
990        );
991    }
992
993    #[test]
994    fn finds_row_with_index_case_sensitive_and_wildcard() {
995        let mut file = File::new(
996            Default::default(),
997            FileData {
998                modified: SystemTime::now(),
999                data: vec![
1000                    vec!["zip".into(), "zup".into()],
1001                    vec!["zirp".into(), "zurp".into()],
1002                ],
1003                headers: vec!["field1".to_string(), "field2".to_string()],
1004            },
1005        );
1006
1007        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1008        let wildcard = Value::from("zirp");
1009
1010        let condition = Condition::Equals {
1011            field: "field1",
1012            value: Value::from("nonexistent"),
1013        };
1014
1015        assert_eq!(
1016            Ok(ObjectMap::from([
1017                ("field1".into(), Value::from("zirp")),
1018                ("field2".into(), Value::from("zurp")),
1019            ])),
1020            file.find_table_row(
1021                Case::Sensitive,
1022                &[condition],
1023                None,
1024                Some(&wildcard),
1025                Some(handle)
1026            )
1027        );
1028    }
1029
1030    #[test]
1031    fn finds_rows_with_index_case_sensitive() {
1032        let mut file = File::new(
1033            Default::default(),
1034            FileData {
1035                modified: SystemTime::now(),
1036                data: vec![
1037                    vec!["zip".into(), "zup".into()],
1038                    vec!["zirp".into(), "zurp".into()],
1039                    vec!["zip".into(), "zoop".into()],
1040                ],
1041                headers: vec!["field1".to_string(), "field2".to_string()],
1042            },
1043        );
1044
1045        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1046
1047        assert_eq!(
1048            Ok(vec![
1049                ObjectMap::from([
1050                    ("field1".into(), Value::from("zip")),
1051                    ("field2".into(), Value::from("zup")),
1052                ]),
1053                ObjectMap::from([
1054                    ("field1".into(), Value::from("zip")),
1055                    ("field2".into(), Value::from("zoop")),
1056                ]),
1057            ]),
1058            file.find_table_rows(
1059                Case::Sensitive,
1060                &[Condition::Equals {
1061                    field: "field1",
1062                    value: Value::from("zip"),
1063                }],
1064                None,
1065                None,
1066                Some(handle)
1067            )
1068        );
1069
1070        assert_eq!(
1071            Ok(vec![]),
1072            file.find_table_rows(
1073                Case::Sensitive,
1074                &[Condition::Equals {
1075                    field: "field1",
1076                    value: Value::from("ZiP"),
1077                }],
1078                None,
1079                None,
1080                Some(handle)
1081            )
1082        );
1083    }
1084
1085    #[test]
1086    fn selects_columns() {
1087        let mut file = File::new(
1088            Default::default(),
1089            FileData {
1090                modified: SystemTime::now(),
1091                data: vec![
1092                    vec!["zip".into(), "zup".into(), "zoop".into()],
1093                    vec!["zirp".into(), "zurp".into(), "zork".into()],
1094                    vec!["zip".into(), "zoop".into(), "zibble".into()],
1095                ],
1096                headers: vec![
1097                    "field1".to_string(),
1098                    "field2".to_string(),
1099                    "field3".to_string(),
1100                ],
1101            },
1102        );
1103
1104        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1105
1106        let condition = Condition::Equals {
1107            field: "field1",
1108            value: Value::from("zip"),
1109        };
1110
1111        assert_eq!(
1112            Ok(vec![
1113                ObjectMap::from([
1114                    ("field1".into(), Value::from("zip")),
1115                    ("field3".into(), Value::from("zoop")),
1116                ]),
1117                ObjectMap::from([
1118                    ("field1".into(), Value::from("zip")),
1119                    ("field3".into(), Value::from("zibble")),
1120                ]),
1121            ]),
1122            file.find_table_rows(
1123                Case::Sensitive,
1124                &[condition],
1125                Some(&["field1".to_string(), "field3".to_string()]),
1126                None,
1127                Some(handle)
1128            )
1129        );
1130    }
1131
1132    #[test]
1133    fn finds_rows_with_index_case_insensitive() {
1134        let mut file = File::new(
1135            Default::default(),
1136            FileData {
1137                modified: SystemTime::now(),
1138                data: vec![
1139                    vec!["zip".into(), "zup".into()],
1140                    vec!["zirp".into(), "zurp".into()],
1141                    vec!["zip".into(), "zoop".into()],
1142                ],
1143                headers: vec!["field1".to_string(), "field2".to_string()],
1144            },
1145        );
1146
1147        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1148
1149        assert_eq!(
1150            Ok(vec![
1151                ObjectMap::from([
1152                    ("field1".into(), Value::from("zip")),
1153                    ("field2".into(), Value::from("zup")),
1154                ]),
1155                ObjectMap::from([
1156                    ("field1".into(), Value::from("zip")),
1157                    ("field2".into(), Value::from("zoop")),
1158                ]),
1159            ]),
1160            file.find_table_rows(
1161                Case::Insensitive,
1162                &[Condition::Equals {
1163                    field: "field1",
1164                    value: Value::from("zip"),
1165                }],
1166                None,
1167                None,
1168                Some(handle)
1169            )
1170        );
1171
1172        assert_eq!(
1173            Ok(vec![
1174                ObjectMap::from([
1175                    ("field1".into(), Value::from("zip")),
1176                    ("field2".into(), Value::from("zup")),
1177                ]),
1178                ObjectMap::from([
1179                    ("field1".into(), Value::from("zip")),
1180                    ("field2".into(), Value::from("zoop")),
1181                ]),
1182            ]),
1183            file.find_table_rows(
1184                Case::Insensitive,
1185                &[Condition::Equals {
1186                    field: "field1",
1187                    value: Value::from("ZiP"),
1188                }],
1189                None,
1190                None,
1191                Some(handle)
1192            )
1193        );
1194    }
1195
1196    #[test]
1197    fn finds_rows_with_index_case_insensitive_and_wildcard() {
1198        let mut file = File::new(
1199            Default::default(),
1200            FileData {
1201                modified: SystemTime::now(),
1202                data: vec![
1203                    vec!["zip".into(), "zup".into()],
1204                    vec!["zirp".into(), "zurp".into()],
1205                    vec!["zip".into(), "zoop".into()],
1206                ],
1207                headers: vec!["field1".to_string(), "field2".to_string()],
1208            },
1209        );
1210
1211        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1212
1213        assert_eq!(
1214            Ok(vec![
1215                ObjectMap::from([
1216                    ("field1".into(), Value::from("zip")),
1217                    ("field2".into(), Value::from("zup")),
1218                ]),
1219                ObjectMap::from([
1220                    ("field1".into(), Value::from("zip")),
1221                    ("field2".into(), Value::from("zoop")),
1222                ]),
1223            ]),
1224            file.find_table_rows(
1225                Case::Insensitive,
1226                &[Condition::Equals {
1227                    field: "field1",
1228                    value: Value::from("nonexistent"),
1229                }],
1230                None,
1231                Some(&Value::from("zip")),
1232                Some(handle)
1233            )
1234        );
1235
1236        assert_eq!(
1237            Ok(vec![
1238                ObjectMap::from([
1239                    ("field1".into(), Value::from("zip")),
1240                    ("field2".into(), Value::from("zup")),
1241                ]),
1242                ObjectMap::from([
1243                    ("field1".into(), Value::from("zip")),
1244                    ("field2".into(), Value::from("zoop")),
1245                ]),
1246            ]),
1247            file.find_table_rows(
1248                Case::Insensitive,
1249                &[Condition::Equals {
1250                    field: "field1",
1251                    value: Value::from("ZiP"),
1252                }],
1253                None,
1254                Some(&Value::from("ZiP")),
1255                Some(handle)
1256            )
1257        );
1258    }
1259
1260    #[test]
1261    fn finds_row_between_dates() {
1262        let mut file = File::new(
1263            Default::default(),
1264            FileData {
1265                modified: SystemTime::now(),
1266                data: vec![
1267                    vec![
1268                        "zip".into(),
1269                        Value::Timestamp(
1270                            chrono::Utc
1271                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1272                                .single()
1273                                .expect("invalid timestamp"),
1274                        ),
1275                    ],
1276                    vec![
1277                        "zip".into(),
1278                        Value::Timestamp(
1279                            chrono::Utc
1280                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1281                                .single()
1282                                .expect("invalid timestamp"),
1283                        ),
1284                    ],
1285                ],
1286                headers: vec!["field1".to_string(), "field2".to_string()],
1287            },
1288        );
1289
1290        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1291
1292        let conditions = [
1293            Condition::Equals {
1294                field: "field1",
1295                value: "zip".into(),
1296            },
1297            Condition::BetweenDates {
1298                field: "field2",
1299                from: chrono::Utc
1300                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1301                    .single()
1302                    .expect("invalid timestamp"),
1303                to: chrono::Utc
1304                    .with_ymd_and_hms(2017, 1, 1, 0, 0, 0)
1305                    .single()
1306                    .expect("invalid timestamp"),
1307            },
1308        ];
1309
1310        assert_eq!(
1311            Ok(ObjectMap::from([
1312                ("field1".into(), Value::from("zip")),
1313                (
1314                    "field2".into(),
1315                    Value::Timestamp(
1316                        chrono::Utc
1317                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1318                            .single()
1319                            .expect("invalid timestamp")
1320                    )
1321                )
1322            ])),
1323            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1324        );
1325    }
1326
1327    #[test]
1328    fn finds_row_from_date() {
1329        let mut file = File::new(
1330            Default::default(),
1331            FileData {
1332                modified: SystemTime::now(),
1333                data: vec![
1334                    vec![
1335                        "zip".into(),
1336                        Value::Timestamp(
1337                            chrono::Utc
1338                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1339                                .single()
1340                                .expect("invalid timestamp"),
1341                        ),
1342                    ],
1343                    vec![
1344                        "zip".into(),
1345                        Value::Timestamp(
1346                            chrono::Utc
1347                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1348                                .single()
1349                                .expect("invalid timestamp"),
1350                        ),
1351                    ],
1352                ],
1353                headers: vec!["field1".to_string(), "field2".to_string()],
1354            },
1355        );
1356
1357        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1358
1359        let conditions = [
1360            Condition::Equals {
1361                field: "field1",
1362                value: "zip".into(),
1363            },
1364            Condition::FromDate {
1365                field: "field2",
1366                from: chrono::Utc
1367                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1368                    .single()
1369                    .expect("invalid timestamp"),
1370            },
1371        ];
1372
1373        assert_eq!(
1374            Ok(ObjectMap::from([
1375                ("field1".into(), Value::from("zip")),
1376                (
1377                    "field2".into(),
1378                    Value::Timestamp(
1379                        chrono::Utc
1380                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1381                            .single()
1382                            .expect("invalid timestamp")
1383                    )
1384                )
1385            ])),
1386            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1387        );
1388    }
1389
1390    #[test]
1391    fn finds_row_to_date() {
1392        let mut file = File::new(
1393            Default::default(),
1394            FileData {
1395                modified: SystemTime::now(),
1396                data: vec![
1397                    vec![
1398                        "zip".into(),
1399                        Value::Timestamp(
1400                            chrono::Utc
1401                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1402                                .single()
1403                                .expect("invalid timestamp"),
1404                        ),
1405                    ],
1406                    vec![
1407                        "zip".into(),
1408                        Value::Timestamp(
1409                            chrono::Utc
1410                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1411                                .single()
1412                                .expect("invalid timestamp"),
1413                        ),
1414                    ],
1415                ],
1416                headers: vec!["field1".to_string(), "field2".to_string()],
1417            },
1418        );
1419
1420        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1421
1422        let conditions = [
1423            Condition::Equals {
1424                field: "field1",
1425                value: "zip".into(),
1426            },
1427            Condition::ToDate {
1428                field: "field2",
1429                to: chrono::Utc
1430                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1431                    .single()
1432                    .expect("invalid timestamp"),
1433            },
1434        ];
1435
1436        assert_eq!(
1437            Ok(ObjectMap::from([
1438                ("field1".into(), Value::from("zip")),
1439                (
1440                    "field2".into(),
1441                    Value::Timestamp(
1442                        chrono::Utc
1443                            .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1444                            .single()
1445                            .expect("invalid timestamp")
1446                    )
1447                )
1448            ])),
1449            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1450        );
1451    }
1452
1453    #[test]
1454    fn doesnt_find_row() {
1455        let file = File::new(
1456            Default::default(),
1457            FileData {
1458                modified: SystemTime::now(),
1459                data: vec![
1460                    vec!["zip".into(), "zup".into()],
1461                    vec!["zirp".into(), "zurp".into()],
1462                ],
1463                headers: vec!["field1".to_string(), "field2".to_string()],
1464            },
1465        );
1466
1467        let condition = Condition::Equals {
1468            field: "field1",
1469            value: Value::from("zorp"),
1470        };
1471
1472        assert_eq!(
1473            Err(Error::NoRowsFound),
1474            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
1475        );
1476    }
1477
1478    #[test]
1479    fn doesnt_find_row_with_index() {
1480        let mut file = File::new(
1481            Default::default(),
1482            FileData {
1483                modified: SystemTime::now(),
1484                data: vec![
1485                    vec!["zip".into(), "zup".into()],
1486                    vec!["zirp".into(), "zurp".into()],
1487                ],
1488                headers: vec!["field1".to_string(), "field2".to_string()],
1489            },
1490        );
1491
1492        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1493
1494        let condition = Condition::Equals {
1495            field: "field1",
1496            value: Value::from("zorp"),
1497        };
1498
1499        assert_eq!(
1500            Err(Error::NoRowsFound),
1501            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
1502        );
1503    }
1504
1505    #[test]
1506    fn doesnt_find_row_with_index_and_wildcard() {
1507        let mut file = File::new(
1508            Default::default(),
1509            FileData {
1510                modified: SystemTime::now(),
1511                data: vec![
1512                    vec!["zip".into(), "zup".into()],
1513                    vec!["zirp".into(), "zurp".into()],
1514                ],
1515                headers: vec!["field1".to_string(), "field2".to_string()],
1516            },
1517        );
1518
1519        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1520        let wildcard = Value::from("nonexistent");
1521
1522        let condition = Condition::Equals {
1523            field: "field1",
1524            value: Value::from("zorp"),
1525        };
1526
1527        assert_eq!(
1528            Err(Error::NoRowsFound),
1529            file.find_table_row(
1530                Case::Sensitive,
1531                &[condition],
1532                None,
1533                Some(&wildcard),
1534                Some(handle)
1535            )
1536        );
1537    }
1538}