vector/enrichment_tables/
mmdb.rs1use std::{fs, net::IpAddr, path::PathBuf, sync::Arc, time::SystemTime};
6
7use maxminddb::Reader;
8use vector_lib::{
9 configurable::configurable_component,
10 enrichment::{Case, Condition, Error, IndexHandle, Table},
11};
12use vrl::value::{ObjectMap, Value};
13
14use crate::config::{EnrichmentTableConfig, GenerateConfig};
15
16#[derive(Clone, Debug, Eq, PartialEq)]
18#[configurable_component(enrichment_table("mmdb"))]
19pub struct MmdbConfig {
20 pub path: PathBuf,
24}
25
26impl GenerateConfig for MmdbConfig {
27 fn generate_config() -> toml::Value {
28 toml::Value::try_from(Self {
29 path: "/path/to/GeoLite2-City.mmdb".into(),
30 })
31 .unwrap()
32 }
33}
34
35impl EnrichmentTableConfig for MmdbConfig {
36 async fn build(
37 &self,
38 _: &crate::config::GlobalOptions,
39 _: Option<Box<dyn std::any::Any + Send + Sync>>,
40 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
41 Ok(Box::new(Mmdb::new(self.clone())?))
42 }
43}
44
45#[derive(Clone)]
46pub struct Mmdb {
48 config: MmdbConfig,
49 dbreader: Arc<maxminddb::Reader<Vec<u8>>>,
50 last_modified: SystemTime,
51}
52
53impl Mmdb {
54 pub fn new(config: MmdbConfig) -> crate::Result<Self> {
56 let dbreader = Arc::new(Reader::open_readfile(&config.path)?);
57
58 let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
60 let result = dbreader.lookup(ip)?.decode::<ObjectMap>().map(|_| ());
61
62 match result {
63 Ok(_) => Ok(Mmdb {
64 last_modified: fs::metadata(&config.path)?.modified()?,
65 dbreader,
66 config,
67 }),
68 Err(error) => Err(error.into()),
69 }
70 }
71
72 fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
73 let data = self.dbreader.lookup(ip).ok()?.decode().ok()??;
74
75 if let Some(fields) = select {
76 let mut filtered = Value::from(ObjectMap::new());
77 let mut data_value = Value::from(data);
78 for field in fields {
79 filtered.insert(
80 field.as_str(),
81 data_value
82 .remove(field.as_str(), false)
83 .unwrap_or(Value::Null),
84 );
85 }
86 filtered.into_object()
87 } else {
88 Some(data)
89 }
90 }
91}
92
93impl Table for Mmdb {
94 fn find_table_row<'a>(
100 &self,
101 case: Case,
102 condition: &'a [Condition<'a>],
103 select: Option<&[String]>,
104 wildcard: Option<&Value>,
105 index: Option<IndexHandle>,
106 ) -> Result<ObjectMap, Error> {
107 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
108
109 match rows.pop() {
110 Some(row) if rows.is_empty() => Ok(row),
111 Some(_) => Err(Error::MoreThanOneRowFound),
112 None => Err(Error::NoRowsFound),
113 }
114 }
115
116 fn find_table_rows<'a>(
120 &self,
121 _: Case,
122 condition: &'a [Condition<'a>],
123 select: Option<&[String]>,
124 _wildcard: Option<&Value>,
125 _: Option<IndexHandle>,
126 ) -> Result<Vec<ObjectMap>, Error> {
127 match condition.first() {
128 Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
129 Some(Condition::Equals { value, .. }) => {
130 let ip = value
131 .to_string_lossy()
132 .parse::<IpAddr>()
133 .map_err(|source| Error::InvalidAddress { source })?;
134 Ok(self
135 .lookup(ip, select)
136 .map(|values| vec![values])
137 .unwrap_or_default())
138 }
139 Some(_) => Err(Error::OnlyEqualityConditionAllowed),
140 None => Err(Error::MissingCondition { kind: "IP" }),
141 }
142 }
143
144 fn add_index(&mut self, _: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
150 match fields.len() {
151 0 => Err(Error::MissingRequiredField { field: "IP" }),
152 1 => Ok(IndexHandle(0)),
153 _ => Err(Error::OnlyOneFieldAllowed),
154 }
155 }
156
157 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
159 Vec::new()
160 }
161
162 fn needs_reload(&self) -> bool {
164 matches!(fs::metadata(&self.config.path)
165 .and_then(|metadata| metadata.modified()),
166 Ok(modified) if modified > self.last_modified)
167 }
168}
169
170impl std::fmt::Debug for Mmdb {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 write!(f, "Maxmind database {})", self.config.path.display())
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use vrl::value::Value;
179
180 use super::*;
181
182 #[test]
183 fn city_partial_lookup() {
184 let values = find_select(
185 "2.125.160.216",
186 "tests/data/GeoIP2-City-Test.mmdb",
187 Some(&[
188 "location.latitude".to_string(),
189 "location.longitude".to_string(),
190 ]),
191 )
192 .unwrap();
193
194 let mut expected = ObjectMap::new();
195 expected.insert(
196 "location".into(),
197 ObjectMap::from([
198 ("latitude".into(), Value::from(51.75)),
199 ("longitude".into(), Value::from(-1.25)),
200 ])
201 .into(),
202 );
203
204 assert_eq!(values, expected);
205 }
206
207 #[test]
208 fn isp_lookup() {
209 let values = find("208.192.1.2", "tests/data/GeoIP2-ISP-Test.mmdb").unwrap();
210
211 let mut expected = ObjectMap::new();
212 expected.insert("autonomous_system_number".into(), 701i64.into());
213 expected.insert(
214 "autonomous_system_organization".into(),
215 "MCI Communications Services, Inc. d/b/a Verizon Business".into(),
216 );
217 expected.insert("isp".into(), "Verizon Business".into());
218 expected.insert("organization".into(), "Verizon Business".into());
219
220 assert_eq!(values, expected);
221 }
222
223 #[test]
224 fn connection_type_lookup_success() {
225 let values = find(
226 "201.243.200.1",
227 "tests/data/GeoIP2-Connection-Type-Test.mmdb",
228 )
229 .unwrap();
230
231 let mut expected = ObjectMap::new();
232 expected.insert("connection_type".into(), "Corporate".into());
233
234 assert_eq!(values, expected);
235 }
236
237 #[test]
238 fn lookup_missing() {
239 let values = find("10.1.12.1", "tests/data/custom-type.mmdb");
240
241 assert!(values.is_none());
242 }
243
244 #[test]
245 fn custom_mmdb_type() {
246 let values = find("208.192.1.2", "tests/data/custom-type.mmdb").unwrap();
247
248 let mut expected = ObjectMap::new();
249 expected.insert("hostname".into(), "custom".into());
250 expected.insert(
251 "nested".into(),
252 ObjectMap::from([
253 ("hostname".into(), "custom".into()),
254 ("original_cidr".into(), "208.192.1.2/24".into()),
255 ])
256 .into(),
257 );
258
259 assert_eq!(values, expected);
260 }
261
262 fn find(ip: &str, database: &str) -> Option<ObjectMap> {
263 find_select(ip, database, None)
264 }
265
266 fn find_select(ip: &str, database: &str, select: Option<&[String]>) -> Option<ObjectMap> {
267 Mmdb::new(MmdbConfig {
268 path: database.into(),
269 })
270 .unwrap()
271 .find_table_rows(
272 Case::Insensitive,
273 &[Condition::Equals {
274 field: "ip",
275 value: ip.into(),
276 }],
277 select,
278 None,
279 None,
280 )
281 .unwrap()
282 .pop()
283 }
284}