Skip to main content

vector/sources/statsd/
parser.rs

1use std::{
2    error, fmt,
3    num::{ParseFloatError, ParseIntError},
4    str::Utf8Error,
5    sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11    event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12    sources::{statsd::ConversionUnit, util::extract_tag_key_and_value},
13};
14
15static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
16static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
17
18#[derive(Clone)]
19pub struct Parser {
20    sanitize: bool,
21    convert_to: ConversionUnit,
22}
23
24impl Parser {
25    pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
26        Self {
27            sanitize: sanitize_keys,
28            convert_to,
29        }
30    }
31
32    pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
33        // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#datagram-format
34        let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
35        if key_and_body.len() != 2 {
36            return Err(ParseError::Malformed(
37                "should be key and body with ':' separator",
38            ));
39        }
40        let (key, body) = (key_and_body[0], key_and_body[1]);
41
42        let parts = body.split('|').collect::<Vec<_>>();
43        if parts.len() < 2 {
44            return Err(ParseError::Malformed(
45                "body should have at least two pipe separated components",
46            ));
47        }
48
49        let name = sanitize_key(key, self.sanitize);
50        let metric_type = parts[1];
51
52        // sampling part is optional and comes after metric type part
53        let sampling = parts.get(2).filter(|s| s.starts_with('@'));
54        let sample_rate = if let Some(s) = sampling {
55            1.0 / sanitize_sampling(parse_sampling(s)?)
56        } else {
57            1.0
58        };
59
60        // tags are optional and could be found either after sampling of after metric type part
61        let tags = if sampling.is_none() {
62            parts.get(2)
63        } else {
64            parts.get(3)
65        };
66        let tags = tags.filter(|s| s.starts_with('#'));
67        let tags = tags.map(parse_tags).transpose()?;
68
69        let metric = match metric_type {
70            "c" => {
71                let val: f64 = parts[0].parse()?;
72                Metric::new(
73                    name,
74                    MetricKind::Incremental,
75                    MetricValue::Counter {
76                        value: val * sample_rate,
77                    },
78                )
79                .with_tags(tags)
80            }
81            unit @ "h" | unit @ "ms" | unit @ "d" => {
82                let val: f64 = parts[0].parse()?;
83                let converted_val = match unit {
84                    "ms" => match self.convert_to {
85                        ConversionUnit::Seconds => val / 1000.0,
86                        ConversionUnit::Milliseconds => val,
87                    },
88                    _ => val,
89                };
90                Metric::new(
91                    name,
92                    MetricKind::Incremental,
93                    MetricValue::Distribution {
94                        samples: vector_lib::samples![converted_val => sample_rate as u32],
95                        statistic: convert_to_statistic(unit),
96                    },
97                )
98                .with_tags(tags)
99            }
100            "g" => {
101                let value = if parts[0]
102                    .chars()
103                    .next()
104                    .map(|c| c.is_ascii_digit())
105                    .ok_or(ParseError::Malformed("empty first body component"))?
106                {
107                    parts[0].parse()?
108                } else {
109                    let mut chars = parts[0].chars();
110                    chars.next();
111                    chars.as_str().parse()?
112                };
113
114                match parse_direction(parts[0])? {
115                    None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
116                        .with_tags(tags),
117                    Some(sign) => Metric::new(
118                        name,
119                        MetricKind::Incremental,
120                        MetricValue::Gauge {
121                            value: value * sign,
122                        },
123                    )
124                    .with_tags(tags),
125                }
126            }
127            "s" => Metric::new(
128                name,
129                MetricKind::Incremental,
130                MetricValue::Set {
131                    values: vec![parts[0].into()].into_iter().collect(),
132                },
133            )
134            .with_tags(tags),
135            other => return Err(ParseError::UnknownMetricType(other.into())),
136        };
137        Ok(metric)
138    }
139}
140
141fn parse_sampling(input: &str) -> Result<f64, ParseError> {
142    let rest = input
143        .strip_prefix('@')
144        .filter(|s| !s.is_empty())
145        .ok_or(ParseError::Malformed(
146            "expected non empty '@'-prefixed sampling component",
147        ))?;
148
149    let num: f64 = rest.parse()?;
150    if num.is_sign_positive() {
151        Ok(num)
152    } else {
153        Err(ParseError::Malformed("sample rate can't be negative"))
154    }
155}
156
157/// Statsd (and dogstatsd) support bare, single and multi-value tags.
158fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
159    let rest = input
160        .strip_prefix('#')
161        .filter(|s| !s.is_empty())
162        .ok_or(ParseError::Malformed(
163            "expected non empty '#'-prefixed tags component",
164        ))?;
165
166    Ok(rest.split(',').map(extract_tag_key_and_value).collect())
167}
168
169fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
170    match input
171        .chars()
172        .next()
173        .ok_or(ParseError::Malformed("empty body component"))?
174    {
175        '+' => Ok(Some(1.0)),
176        '-' => Ok(Some(-1.0)),
177        c if c.is_ascii_digit() => Ok(None),
178        _other => Err(ParseError::Malformed("invalid gauge value prefix")),
179    }
180}
181
182fn sanitize_key(key: &str, sanitize: bool) -> String {
183    if !sanitize {
184        key.to_owned()
185    } else {
186        let s = key.replace('/', "'-");
187        let s = WHITESPACE.replace_all(&s, "_");
188        let s = NONALPHANUM.replace_all(&s, "");
189        s.into()
190    }
191}
192
193fn sanitize_sampling(sampling: f64) -> f64 {
194    if sampling == 0.0 { 1.0 } else { sampling }
195}
196
197fn convert_to_statistic(unit: &str) -> StatisticKind {
198    match unit {
199        "d" => StatisticKind::Summary,
200        _ => StatisticKind::Histogram,
201    }
202}
203
204#[derive(Debug, PartialEq, Eq)]
205pub enum ParseError {
206    InvalidUtf8(Utf8Error),
207    Malformed(&'static str),
208    UnknownMetricType(String),
209    InvalidInteger(ParseIntError),
210    InvalidFloat(ParseFloatError),
211}
212
213impl fmt::Display for ParseError {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "Statsd parse error: {self:?}")
216    }
217}
218
219vector_lib::impl_event_data_eq!(ParseError);
220
221impl error::Error for ParseError {}
222
223impl From<ParseIntError> for ParseError {
224    fn from(e: ParseIntError) -> ParseError {
225        ParseError::InvalidInteger(e)
226    }
227}
228
229impl From<ParseFloatError> for ParseError {
230    fn from(e: ParseFloatError) -> ParseError {
231        ParseError::InvalidFloat(e)
232    }
233}
234
235#[cfg(test)]
236mod test {
237    use vector_lib::{assert_event_data_eq, event::metric::TagValue, metric_tags};
238
239    use super::{ParseError, Parser, sanitize_key, sanitize_sampling};
240    use crate::{
241        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
242        sources::statsd::ConversionUnit,
243    };
244
245    const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
246    fn parse(packet: &str) -> Result<Metric, ParseError> {
247        SANITIZING_PARSER.parse(packet)
248    }
249
250    const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
251    fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
252        NON_CONVERTING_PARSER.parse(packet)
253    }
254
255    const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
256    fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
257        NON_SANITIZING_PARSER.parse(packet)
258    }
259
260    #[test]
261    fn basic_counter() {
262        assert_event_data_eq!(
263            parse("foo:1|c"),
264            Ok(Metric::new(
265                "foo",
266                MetricKind::Incremental,
267                MetricValue::Counter { value: 1.0 },
268            )),
269        );
270    }
271
272    #[test]
273    fn tagged_counter() {
274        assert_event_data_eq!(
275            parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
276            Ok(Metric::new(
277                "foo-however_baz",
278                MetricKind::Incremental,
279                MetricValue::Counter { value: 1.0 },
280            )
281            .with_tags(Some(metric_tags!(
282                "tag1" => TagValue::Bare,
283                "tag2" => "value",
284            )))),
285        );
286    }
287
288    #[test]
289    fn tagged_not_sanitized_counter() {
290        assert_event_data_eq!(
291            unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
292            Ok(Metric::new(
293                "foo/bar@baz baz",
294                MetricKind::Incremental,
295                MetricValue::Counter { value: 1.0 },
296            )
297            .with_tags(Some(metric_tags!(
298                "tag1" => TagValue::Bare,
299                "tag2" => "value",
300            )))),
301        );
302    }
303
304    #[test]
305    fn enhanced_tags() {
306        assert_event_data_eq!(
307            parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
308            Ok(Metric::new(
309                "foo",
310                MetricKind::Incremental,
311                MetricValue::Counter { value: 1.0 },
312            )
313            .with_tags(Some(metric_tags!(
314                "tag1" => TagValue::Bare,
315                "tag2" => "valueA",
316                "tag2" => "valueB",
317                "tag3" => "value",
318                "tag3" => TagValue::Bare,
319                "tag4" => "",
320            )))),
321        );
322    }
323
324    #[test]
325    fn sampled_counter() {
326        assert_event_data_eq!(
327            parse("bar:2|c|@0.1"),
328            Ok(Metric::new(
329                "bar",
330                MetricKind::Incremental,
331                MetricValue::Counter { value: 20.0 },
332            )),
333        );
334    }
335
336    #[test]
337    fn zero_sampled_counter() {
338        assert_event_data_eq!(
339            parse("bar:2|c|@0"),
340            Ok(Metric::new(
341                "bar",
342                MetricKind::Incremental,
343                MetricValue::Counter { value: 2.0 },
344            )),
345        );
346    }
347
348    #[test]
349    fn sampled_timer() {
350        assert_event_data_eq!(
351            parse("glork:320|ms|@0.1"),
352            Ok(Metric::new(
353                "glork",
354                MetricKind::Incremental,
355                MetricValue::Distribution {
356                    samples: vector_lib::samples![0.320 => 10],
357                    statistic: StatisticKind::Histogram
358                },
359            )),
360        );
361    }
362
363    #[test]
364    fn sampled_timer_non_converting() {
365        assert_event_data_eq!(
366            parse_non_converting("glork:320|ms|@0.1"),
367            Ok(Metric::new(
368                "glork",
369                MetricKind::Incremental,
370                MetricValue::Distribution {
371                    samples: vector_lib::samples![320.0 => 10],
372                    statistic: StatisticKind::Histogram
373                },
374            )),
375        );
376    }
377
378    #[test]
379    fn sampled_tagged_histogram() {
380        assert_event_data_eq!(
381            parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
382            Ok(Metric::new(
383                "glork",
384                MetricKind::Incremental,
385                MetricValue::Distribution {
386                    samples: vector_lib::samples![320.0 => 10],
387                    statistic: StatisticKind::Histogram
388                },
389            )
390            .with_tags(Some(metric_tags!(
391                "region" => "us-west1",
392                "production" => TagValue::Bare,
393                "e" => "",
394            )))),
395        );
396    }
397
398    #[test]
399    fn sampled_distribution() {
400        assert_event_data_eq!(
401            parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
402            Ok(Metric::new(
403                "glork",
404                MetricKind::Incremental,
405                MetricValue::Distribution {
406                    samples: vector_lib::samples![320.0 => 10],
407                    statistic: StatisticKind::Summary
408                },
409            )
410            .with_tags(Some(metric_tags!(
411                "region" => "us-west1",
412                "production" => TagValue::Bare,
413                "e" => "",
414            )))),
415        );
416    }
417
418    #[test]
419    fn simple_gauge() {
420        assert_event_data_eq!(
421            parse("gaugor:333|g"),
422            Ok(Metric::new(
423                "gaugor",
424                MetricKind::Absolute,
425                MetricValue::Gauge { value: 333.0 },
426            )),
427        );
428    }
429
430    #[test]
431    fn signed_gauge() {
432        assert_event_data_eq!(
433            parse("gaugor:-4|g"),
434            Ok(Metric::new(
435                "gaugor",
436                MetricKind::Incremental,
437                MetricValue::Gauge { value: -4.0 },
438            )),
439        );
440        assert_event_data_eq!(
441            parse("gaugor:+10|g"),
442            Ok(Metric::new(
443                "gaugor",
444                MetricKind::Incremental,
445                MetricValue::Gauge { value: 10.0 },
446            )),
447        );
448    }
449
450    #[test]
451    fn gauge_multibyte_utf8_prefix_is_error_not_panic() {
452        // A multi-byte UTF-8 character as the value prefix must return a parse
453        // error, not panic with "byte index 1 is not a char boundary".
454        let input = std::str::from_utf8(b"m:\xc3\xa9|g").unwrap();
455        assert!(parse(input).is_err());
456        assert!(parse("m:é|g").is_err());
457    }
458
459    #[test]
460    fn sets() {
461        assert_event_data_eq!(
462            parse("uniques:765|s"),
463            Ok(Metric::new(
464                "uniques",
465                MetricKind::Incremental,
466                MetricValue::Set {
467                    values: vec!["765".into()].into_iter().collect()
468                },
469            )),
470        );
471    }
472
473    #[test]
474    fn sanitizing_keys() {
475        assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
476        assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
477        assert_eq!("foo_bar_baz", sanitize_key("foo bar  baz", true));
478        assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
479        assert_eq!(
480            "foo. @& bar_$!#.baz",
481            sanitize_key("foo. @& bar_$!#.baz", false)
482        );
483    }
484
485    #[test]
486    fn sanitizing_sampling() {
487        assert_eq!(1.0, sanitize_sampling(0.0));
488        assert_eq!(2.5, sanitize_sampling(2.5));
489        assert_eq!(-5.0, sanitize_sampling(-5.0));
490    }
491}