1use std::{
2 error, fmt,
3 num::{ParseFloatError, ParseIntError},
4 str::Utf8Error,
5 sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11 event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12 sources::{statsd::ConversionUnit, util::extract_tag_key_and_value},
13};
14
15static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
16static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
17
18#[derive(Clone)]
19pub struct Parser {
20 sanitize: bool,
21 convert_to: ConversionUnit,
22}
23
24impl Parser {
25 pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
26 Self {
27 sanitize: sanitize_keys,
28 convert_to,
29 }
30 }
31
32 pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
33 let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
35 if key_and_body.len() != 2 {
36 return Err(ParseError::Malformed(
37 "should be key and body with ':' separator",
38 ));
39 }
40 let (key, body) = (key_and_body[0], key_and_body[1]);
41
42 let parts = body.split('|').collect::<Vec<_>>();
43 if parts.len() < 2 {
44 return Err(ParseError::Malformed(
45 "body should have at least two pipe separated components",
46 ));
47 }
48
49 let name = sanitize_key(key, self.sanitize);
50 let metric_type = parts[1];
51
52 let sampling = parts.get(2).filter(|s| s.starts_with('@'));
54 let sample_rate = if let Some(s) = sampling {
55 1.0 / sanitize_sampling(parse_sampling(s)?)
56 } else {
57 1.0
58 };
59
60 let tags = if sampling.is_none() {
62 parts.get(2)
63 } else {
64 parts.get(3)
65 };
66 let tags = tags.filter(|s| s.starts_with('#'));
67 let tags = tags.map(parse_tags).transpose()?;
68
69 let metric = match metric_type {
70 "c" => {
71 let val: f64 = parts[0].parse()?;
72 Metric::new(
73 name,
74 MetricKind::Incremental,
75 MetricValue::Counter {
76 value: val * sample_rate,
77 },
78 )
79 .with_tags(tags)
80 }
81 unit @ "h" | unit @ "ms" | unit @ "d" => {
82 let val: f64 = parts[0].parse()?;
83 let converted_val = match unit {
84 "ms" => match self.convert_to {
85 ConversionUnit::Seconds => val / 1000.0,
86 ConversionUnit::Milliseconds => val,
87 },
88 _ => val,
89 };
90 Metric::new(
91 name,
92 MetricKind::Incremental,
93 MetricValue::Distribution {
94 samples: vector_lib::samples![converted_val => sample_rate as u32],
95 statistic: convert_to_statistic(unit),
96 },
97 )
98 .with_tags(tags)
99 }
100 "g" => {
101 let value = if parts[0]
102 .chars()
103 .next()
104 .map(|c| c.is_ascii_digit())
105 .ok_or(ParseError::Malformed("empty first body component"))?
106 {
107 parts[0].parse()?
108 } else {
109 let mut chars = parts[0].chars();
110 chars.next();
111 chars.as_str().parse()?
112 };
113
114 match parse_direction(parts[0])? {
115 None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
116 .with_tags(tags),
117 Some(sign) => Metric::new(
118 name,
119 MetricKind::Incremental,
120 MetricValue::Gauge {
121 value: value * sign,
122 },
123 )
124 .with_tags(tags),
125 }
126 }
127 "s" => Metric::new(
128 name,
129 MetricKind::Incremental,
130 MetricValue::Set {
131 values: vec![parts[0].into()].into_iter().collect(),
132 },
133 )
134 .with_tags(tags),
135 other => return Err(ParseError::UnknownMetricType(other.into())),
136 };
137 Ok(metric)
138 }
139}
140
141fn parse_sampling(input: &str) -> Result<f64, ParseError> {
142 let rest = input
143 .strip_prefix('@')
144 .filter(|s| !s.is_empty())
145 .ok_or(ParseError::Malformed(
146 "expected non empty '@'-prefixed sampling component",
147 ))?;
148
149 let num: f64 = rest.parse()?;
150 if num.is_sign_positive() {
151 Ok(num)
152 } else {
153 Err(ParseError::Malformed("sample rate can't be negative"))
154 }
155}
156
157fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
159 let rest = input
160 .strip_prefix('#')
161 .filter(|s| !s.is_empty())
162 .ok_or(ParseError::Malformed(
163 "expected non empty '#'-prefixed tags component",
164 ))?;
165
166 Ok(rest.split(',').map(extract_tag_key_and_value).collect())
167}
168
169fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
170 match input
171 .chars()
172 .next()
173 .ok_or(ParseError::Malformed("empty body component"))?
174 {
175 '+' => Ok(Some(1.0)),
176 '-' => Ok(Some(-1.0)),
177 c if c.is_ascii_digit() => Ok(None),
178 _other => Err(ParseError::Malformed("invalid gauge value prefix")),
179 }
180}
181
182fn sanitize_key(key: &str, sanitize: bool) -> String {
183 if !sanitize {
184 key.to_owned()
185 } else {
186 let s = key.replace('/', "'-");
187 let s = WHITESPACE.replace_all(&s, "_");
188 let s = NONALPHANUM.replace_all(&s, "");
189 s.into()
190 }
191}
192
193fn sanitize_sampling(sampling: f64) -> f64 {
194 if sampling == 0.0 { 1.0 } else { sampling }
195}
196
197fn convert_to_statistic(unit: &str) -> StatisticKind {
198 match unit {
199 "d" => StatisticKind::Summary,
200 _ => StatisticKind::Histogram,
201 }
202}
203
204#[derive(Debug, PartialEq, Eq)]
205pub enum ParseError {
206 InvalidUtf8(Utf8Error),
207 Malformed(&'static str),
208 UnknownMetricType(String),
209 InvalidInteger(ParseIntError),
210 InvalidFloat(ParseFloatError),
211}
212
213impl fmt::Display for ParseError {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "Statsd parse error: {self:?}")
216 }
217}
218
219vector_lib::impl_event_data_eq!(ParseError);
220
221impl error::Error for ParseError {}
222
223impl From<ParseIntError> for ParseError {
224 fn from(e: ParseIntError) -> ParseError {
225 ParseError::InvalidInteger(e)
226 }
227}
228
229impl From<ParseFloatError> for ParseError {
230 fn from(e: ParseFloatError) -> ParseError {
231 ParseError::InvalidFloat(e)
232 }
233}
234
235#[cfg(test)]
236mod test {
237 use vector_lib::{assert_event_data_eq, event::metric::TagValue, metric_tags};
238
239 use super::{ParseError, Parser, sanitize_key, sanitize_sampling};
240 use crate::{
241 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
242 sources::statsd::ConversionUnit,
243 };
244
245 const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
246 fn parse(packet: &str) -> Result<Metric, ParseError> {
247 SANITIZING_PARSER.parse(packet)
248 }
249
250 const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
251 fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
252 NON_CONVERTING_PARSER.parse(packet)
253 }
254
255 const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
256 fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
257 NON_SANITIZING_PARSER.parse(packet)
258 }
259
260 #[test]
261 fn basic_counter() {
262 assert_event_data_eq!(
263 parse("foo:1|c"),
264 Ok(Metric::new(
265 "foo",
266 MetricKind::Incremental,
267 MetricValue::Counter { value: 1.0 },
268 )),
269 );
270 }
271
272 #[test]
273 fn tagged_counter() {
274 assert_event_data_eq!(
275 parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
276 Ok(Metric::new(
277 "foo-however_baz",
278 MetricKind::Incremental,
279 MetricValue::Counter { value: 1.0 },
280 )
281 .with_tags(Some(metric_tags!(
282 "tag1" => TagValue::Bare,
283 "tag2" => "value",
284 )))),
285 );
286 }
287
288 #[test]
289 fn tagged_not_sanitized_counter() {
290 assert_event_data_eq!(
291 unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
292 Ok(Metric::new(
293 "foo/bar@baz baz",
294 MetricKind::Incremental,
295 MetricValue::Counter { value: 1.0 },
296 )
297 .with_tags(Some(metric_tags!(
298 "tag1" => TagValue::Bare,
299 "tag2" => "value",
300 )))),
301 );
302 }
303
304 #[test]
305 fn enhanced_tags() {
306 assert_event_data_eq!(
307 parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
308 Ok(Metric::new(
309 "foo",
310 MetricKind::Incremental,
311 MetricValue::Counter { value: 1.0 },
312 )
313 .with_tags(Some(metric_tags!(
314 "tag1" => TagValue::Bare,
315 "tag2" => "valueA",
316 "tag2" => "valueB",
317 "tag3" => "value",
318 "tag3" => TagValue::Bare,
319 "tag4" => "",
320 )))),
321 );
322 }
323
324 #[test]
325 fn sampled_counter() {
326 assert_event_data_eq!(
327 parse("bar:2|c|@0.1"),
328 Ok(Metric::new(
329 "bar",
330 MetricKind::Incremental,
331 MetricValue::Counter { value: 20.0 },
332 )),
333 );
334 }
335
336 #[test]
337 fn zero_sampled_counter() {
338 assert_event_data_eq!(
339 parse("bar:2|c|@0"),
340 Ok(Metric::new(
341 "bar",
342 MetricKind::Incremental,
343 MetricValue::Counter { value: 2.0 },
344 )),
345 );
346 }
347
348 #[test]
349 fn sampled_timer() {
350 assert_event_data_eq!(
351 parse("glork:320|ms|@0.1"),
352 Ok(Metric::new(
353 "glork",
354 MetricKind::Incremental,
355 MetricValue::Distribution {
356 samples: vector_lib::samples![0.320 => 10],
357 statistic: StatisticKind::Histogram
358 },
359 )),
360 );
361 }
362
363 #[test]
364 fn sampled_timer_non_converting() {
365 assert_event_data_eq!(
366 parse_non_converting("glork:320|ms|@0.1"),
367 Ok(Metric::new(
368 "glork",
369 MetricKind::Incremental,
370 MetricValue::Distribution {
371 samples: vector_lib::samples![320.0 => 10],
372 statistic: StatisticKind::Histogram
373 },
374 )),
375 );
376 }
377
378 #[test]
379 fn sampled_tagged_histogram() {
380 assert_event_data_eq!(
381 parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
382 Ok(Metric::new(
383 "glork",
384 MetricKind::Incremental,
385 MetricValue::Distribution {
386 samples: vector_lib::samples![320.0 => 10],
387 statistic: StatisticKind::Histogram
388 },
389 )
390 .with_tags(Some(metric_tags!(
391 "region" => "us-west1",
392 "production" => TagValue::Bare,
393 "e" => "",
394 )))),
395 );
396 }
397
398 #[test]
399 fn sampled_distribution() {
400 assert_event_data_eq!(
401 parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
402 Ok(Metric::new(
403 "glork",
404 MetricKind::Incremental,
405 MetricValue::Distribution {
406 samples: vector_lib::samples![320.0 => 10],
407 statistic: StatisticKind::Summary
408 },
409 )
410 .with_tags(Some(metric_tags!(
411 "region" => "us-west1",
412 "production" => TagValue::Bare,
413 "e" => "",
414 )))),
415 );
416 }
417
418 #[test]
419 fn simple_gauge() {
420 assert_event_data_eq!(
421 parse("gaugor:333|g"),
422 Ok(Metric::new(
423 "gaugor",
424 MetricKind::Absolute,
425 MetricValue::Gauge { value: 333.0 },
426 )),
427 );
428 }
429
430 #[test]
431 fn signed_gauge() {
432 assert_event_data_eq!(
433 parse("gaugor:-4|g"),
434 Ok(Metric::new(
435 "gaugor",
436 MetricKind::Incremental,
437 MetricValue::Gauge { value: -4.0 },
438 )),
439 );
440 assert_event_data_eq!(
441 parse("gaugor:+10|g"),
442 Ok(Metric::new(
443 "gaugor",
444 MetricKind::Incremental,
445 MetricValue::Gauge { value: 10.0 },
446 )),
447 );
448 }
449
450 #[test]
451 fn gauge_multibyte_utf8_prefix_is_error_not_panic() {
452 let input = std::str::from_utf8(b"m:\xc3\xa9|g").unwrap();
455 assert!(parse(input).is_err());
456 assert!(parse("m:é|g").is_err());
457 }
458
459 #[test]
460 fn sets() {
461 assert_event_data_eq!(
462 parse("uniques:765|s"),
463 Ok(Metric::new(
464 "uniques",
465 MetricKind::Incremental,
466 MetricValue::Set {
467 values: vec!["765".into()].into_iter().collect()
468 },
469 )),
470 );
471 }
472
473 #[test]
474 fn sanitizing_keys() {
475 assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
476 assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
477 assert_eq!("foo_bar_baz", sanitize_key("foo bar baz", true));
478 assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
479 assert_eq!(
480 "foo. @& bar_$!#.baz",
481 sanitize_key("foo. @& bar_$!#.baz", false)
482 );
483 }
484
485 #[test]
486 fn sanitizing_sampling() {
487 assert_eq!(1.0, sanitize_sampling(0.0));
488 assert_eq!(2.5, sanitize_sampling(2.5));
489 assert_eq!(-5.0, sanitize_sampling(-5.0));
490 }
491}