1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{SinkExt, future::BoxFuture, stream};
5use tower::Service;
6use vector_lib::{
7 ByteSizeOf, EstimatedJsonEncodedSizeOf,
8 configurable::configurable_component,
9 event::metric::{MetricSketch, MetricTags, Quantile},
10};
11
12use crate::{
13 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
14 event::{
15 Event, KeyString,
16 metric::{Metric, MetricValue, Sample, StatisticKind},
17 },
18 http::HttpClient,
19 internal_events::InfluxdbEncodingError,
20 sinks::{
21 Healthcheck, VectorSink,
22 influxdb::{
23 Field, InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, encode_timestamp,
24 healthcheck, influx_line_protocol, influxdb_settings,
25 },
26 util::{
27 BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
28 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
29 encode_namespace,
30 http::{HttpBatchService, HttpRetryLogic},
31 statistic::{DistributionStatistic, validate_quantiles},
32 },
33 },
34 tls::{TlsConfig, TlsSettings},
35};
36
37#[derive(Clone)]
38struct InfluxDbSvc {
39 config: InfluxDbConfig,
40 protocol_version: ProtocolVersion,
41 inner: HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>>,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct InfluxDbDefaultBatchSettings;
46
47impl SinkBatchSettings for InfluxDbDefaultBatchSettings {
48 const MAX_EVENTS: Option<usize> = Some(20);
49 const MAX_BYTES: Option<usize> = None;
50 const TIMEOUT_SECS: f64 = 1.0;
51}
52
53#[configurable_component(sink("influxdb_metrics", "Deliver metric event data to InfluxDB."))]
55#[derive(Clone, Debug, Default)]
56#[serde(deny_unknown_fields)]
57pub struct InfluxDbConfig {
58 #[serde(alias = "namespace")]
63 #[configurable(metadata(docs::examples = "service"))]
64 pub default_namespace: Option<String>,
65
66 #[configurable(metadata(docs::examples = "http://localhost:8086/"))]
70 pub endpoint: String,
71
72 #[serde(flatten)]
73 pub influxdb1_settings: Option<InfluxDb1Settings>,
74
75 #[serde(flatten)]
76 pub influxdb2_settings: Option<InfluxDb2Settings>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub batch: BatchConfig<InfluxDbDefaultBatchSettings>,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub request: TowerRequestConfig,
85
86 #[configurable(metadata(docs::additional_props_description = "A tag key/value pair."))]
88 #[configurable(metadata(docs::examples = "example_tags()"))]
89 pub tags: Option<HashMap<String, String>>,
90
91 #[configurable(derived)]
92 pub tls: Option<TlsConfig>,
93
94 #[serde(default = "default_summary_quantiles")]
96 pub quantiles: Vec<f64>,
97
98 #[configurable(derived)]
99 #[serde(
100 default,
101 deserialize_with = "crate::serde::bool_or_struct",
102 skip_serializing_if = "crate::serde::is_default"
103 )]
104 acknowledgements: AcknowledgementsConfig,
105}
106
107pub fn default_summary_quantiles() -> Vec<f64> {
108 vec![0.5, 0.75, 0.9, 0.95, 0.99]
109}
110
111pub fn example_tags() -> HashMap<String, String> {
112 HashMap::from([("region".to_string(), "us-west-1".to_string())])
113}
114
115impl_generate_config_from_default!(InfluxDbConfig);
116
117#[async_trait::async_trait]
118#[typetag::serde(name = "influxdb_metrics")]
119impl SinkConfig for InfluxDbConfig {
120 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
121 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
122 let client = HttpClient::new(tls_settings, cx.proxy())?;
123 let healthcheck = healthcheck(
124 self.clone().endpoint,
125 self.clone().influxdb1_settings,
126 self.clone().influxdb2_settings,
127 client.clone(),
128 )?;
129 validate_quantiles(&self.quantiles)?;
130 let sink = InfluxDbSvc::new(self.clone(), client)?;
131 Ok((sink, healthcheck))
132 }
133
134 fn input(&self) -> Input {
135 Input::metric()
136 }
137
138 fn acknowledgements(&self) -> &AcknowledgementsConfig {
139 &self.acknowledgements
140 }
141}
142
143impl InfluxDbSvc {
144 pub fn new(config: InfluxDbConfig, client: HttpClient) -> crate::Result<VectorSink> {
145 let settings = influxdb_settings(
146 config.influxdb1_settings.clone(),
147 config.influxdb2_settings.clone(),
148 )?;
149
150 let endpoint = config.endpoint.clone();
151 let token = settings.token();
152 let protocol_version = settings.protocol_version();
153
154 let batch = config.batch.into_batch_settings()?;
155 let request = config.request.into_settings();
156
157 let uri = settings.write_uri(endpoint)?;
158
159 let http_service = HttpBatchService::new(client, create_build_request(uri, token.inner()));
160
161 let influxdb_http_service = InfluxDbSvc {
162 config,
163 protocol_version,
164 inner: http_service,
165 };
166 let mut normalizer = MetricNormalizer::<InfluxMetricNormalize>::default();
167
168 let sink = request
169 .batch_sink(
170 HttpRetryLogic::default(),
171 influxdb_http_service,
172 MetricsBuffer::new(batch.size),
173 batch.timeout,
174 )
175 .with_flat_map(move |event: Event| {
176 stream::iter({
177 let byte_size = event.size_of();
178 let json_size = event.estimated_json_encoded_size_of();
179
180 normalizer
181 .normalize(event.into_metric())
182 .map(|metric| Ok(EncodedEvent::new(metric, byte_size, json_size)))
183 })
184 })
185 .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error, internal_log_rate_limit = false));
186
187 #[allow(deprecated)]
188 Ok(VectorSink::from_event_sink(sink))
189 }
190}
191
192impl Service<Vec<Metric>> for InfluxDbSvc {
193 type Response = http::Response<Bytes>;
194 type Error = crate::Error;
195 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
196
197 fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
199 self.inner.poll_ready(cx)
200 }
201
202 fn call(&mut self, items: Vec<Metric>) -> Self::Future {
204 let input = encode_events(
205 self.protocol_version,
206 items,
207 self.config.default_namespace.as_deref(),
208 self.config.tags.as_ref(),
209 &self.config.quantiles,
210 );
211 let body = input.freeze();
212
213 self.inner.call(body)
214 }
215}
216
217fn create_build_request(
218 uri: http::Uri,
219 token: &str,
220) -> impl Fn(Bytes) -> BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>
221+ Sync
222+ Send
223+ 'static
224+ use<> {
225 let auth = format!("Token {token}");
226 move |body| {
227 Box::pin(ready(
228 hyper::Request::post(uri.clone())
229 .header("Content-Type", "text/plain")
230 .header("Authorization", auth.clone())
231 .body(body)
232 .map_err(Into::into),
233 ))
234 }
235}
236
237fn merge_tags(event: &Metric, tags: Option<&HashMap<String, String>>) -> Option<MetricTags> {
238 match (event.tags().cloned(), tags) {
239 (Some(mut event_tags), Some(config_tags)) => {
240 event_tags.extend(config_tags.iter().map(|(k, v)| (k.clone(), v.clone())));
241 Some(event_tags)
242 }
243 (Some(event_tags), None) => Some(event_tags),
244 (None, Some(config_tags)) => Some(
245 config_tags
246 .iter()
247 .map(|(k, v)| (k.clone(), v.clone()))
248 .collect(),
249 ),
250 (None, None) => None,
251 }
252}
253
254#[derive(Default)]
255pub struct InfluxMetricNormalize;
256
257impl MetricNormalize for InfluxMetricNormalize {
258 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
259 match (metric.kind(), &metric.value()) {
260 (_, MetricValue::Counter { .. }) => state.make_incremental(metric),
263 (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
265 _ => Some(metric),
267 }
268 }
269}
270
271fn encode_events(
272 protocol_version: ProtocolVersion,
273 events: Vec<Metric>,
274 default_namespace: Option<&str>,
275 tags: Option<&HashMap<String, String>>,
276 quantiles: &[f64],
277) -> BytesMut {
278 let mut output = BytesMut::new();
279 let count = events.len();
280
281 for event in events.into_iter() {
282 let fullname = encode_namespace(event.namespace().or(default_namespace), '.', event.name());
283 let ts = encode_timestamp(event.timestamp());
284 let tags = merge_tags(&event, tags);
285 let (metric_type, fields) = get_type_and_fields(event.value(), quantiles);
286
287 let mut unwrapped_tags = tags.unwrap_or_default();
288 unwrapped_tags.replace("metric_type".to_owned(), metric_type.to_owned());
289
290 if let Err(error_message) = influx_line_protocol(
291 protocol_version,
292 &fullname,
293 Some(unwrapped_tags),
294 fields,
295 ts,
296 &mut output,
297 ) {
298 emit!(InfluxdbEncodingError {
299 error_message,
300 count,
301 });
302 };
303 }
304
305 if !output.is_empty() {
307 output.truncate(output.len() - 1);
308 }
309 output
310}
311
312fn get_type_and_fields(
313 value: &MetricValue,
314 quantiles: &[f64],
315) -> (&'static str, Option<HashMap<KeyString, Field>>) {
316 match value {
317 MetricValue::Counter { value } => ("counter", Some(to_fields(*value))),
318 MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))),
319 MetricValue::Set { values } => ("set", Some(to_fields(values.len() as f64))),
320 MetricValue::AggregatedHistogram {
321 buckets,
322 count,
323 sum,
324 } => {
325 let mut fields: HashMap<KeyString, Field> = buckets
326 .iter()
327 .map(|sample| {
328 (
329 format!("bucket_{}", sample.upper_limit).into(),
330 Field::UnsignedInt(sample.count),
331 )
332 })
333 .collect();
334 fields.insert("count".into(), Field::UnsignedInt(*count));
335 fields.insert("sum".into(), Field::Float(*sum));
336
337 ("histogram", Some(fields))
338 }
339 MetricValue::AggregatedSummary {
340 quantiles,
341 count,
342 sum,
343 } => {
344 let mut fields: HashMap<KeyString, Field> = quantiles
345 .iter()
346 .map(|quantile| {
347 (
348 format!("quantile_{}", quantile.quantile).into(),
349 Field::Float(quantile.value),
350 )
351 })
352 .collect();
353 fields.insert("count".into(), Field::UnsignedInt(*count));
354 fields.insert("sum".into(), Field::Float(*sum));
355
356 ("summary", Some(fields))
357 }
358 MetricValue::Distribution { samples, statistic } => {
359 let quantiles = match statistic {
360 StatisticKind::Histogram => &[0.95] as &[_],
361 StatisticKind::Summary => quantiles,
362 };
363 let fields = encode_distribution(samples, quantiles);
364 ("distribution", fields)
365 }
366 MetricValue::Sketch { sketch } => match sketch {
367 MetricSketch::AgentDDSketch(ddsketch) => {
368 let mut fields = [0.5, 0.75, 0.9, 0.99]
371 .iter()
372 .map(|q| {
373 let quantile = Quantile {
374 quantile: *q,
375 value: ddsketch.quantile(*q).unwrap_or(0.0),
376 };
377 (
378 quantile.to_percentile_string().into(),
379 Field::Float(quantile.value),
380 )
381 })
382 .collect::<HashMap<KeyString, _>>();
383 fields.insert(
384 "count".into(),
385 Field::UnsignedInt(u64::from(ddsketch.count())),
386 );
387 fields.insert(
388 "min".into(),
389 Field::Float(ddsketch.min().unwrap_or(f64::MAX)),
390 );
391 fields.insert(
392 "max".into(),
393 Field::Float(ddsketch.max().unwrap_or(f64::MIN)),
394 );
395 fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0)));
396 fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0)));
397
398 ("sketch", Some(fields))
399 }
400 },
401 }
402}
403
404fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option<HashMap<KeyString, Field>> {
405 let statistic = DistributionStatistic::from_samples(samples, quantiles)?;
406
407 Some(
408 [
409 ("min".into(), Field::Float(statistic.min)),
410 ("max".into(), Field::Float(statistic.max)),
411 ("median".into(), Field::Float(statistic.median)),
412 ("avg".into(), Field::Float(statistic.avg)),
413 ("sum".into(), Field::Float(statistic.sum)),
414 ("count".into(), Field::Float(statistic.count as f64)),
415 ]
416 .into_iter()
417 .chain(
418 statistic
419 .quantiles
420 .iter()
421 .map(|&(p, val)| (format!("quantile_{p:.2}").into(), Field::Float(val))),
422 )
423 .collect(),
424 )
425}
426
427fn to_fields(value: f64) -> HashMap<KeyString, Field> {
428 [("value".into(), Field::Float(value))]
429 .into_iter()
430 .collect()
431}
432
433#[cfg(test)]
434mod tests {
435 use indoc::indoc;
436 use similar_asserts::assert_eq;
437
438 use super::*;
439 use crate::{
440 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
441 sinks::influxdb::test_util::{assert_fields, split_line_protocol, tags, ts},
442 };
443
444 #[test]
445 fn generate_config() {
446 crate::test_util::test_generate_config::<InfluxDbConfig>();
447 }
448
449 #[test]
450 fn test_config_with_tags() {
451 let config = indoc! {r#"
452 namespace: "vector"
453 endpoint: "http://localhost:9999"
454 tags:
455 region: "us-west-1"
456 "#};
457
458 serde_yaml::from_str::<InfluxDbConfig>(config).unwrap();
459 }
460
461 #[test]
462 fn test_encode_counter() {
463 let events = vec![
464 Metric::new(
465 "total",
466 MetricKind::Incremental,
467 MetricValue::Counter { value: 1.5 },
468 )
469 .with_namespace(Some("ns"))
470 .with_timestamp(Some(ts())),
471 Metric::new(
472 "check",
473 MetricKind::Incremental,
474 MetricValue::Counter { value: 1.0 },
475 )
476 .with_namespace(Some("ns"))
477 .with_tags(Some(tags()))
478 .with_timestamp(Some(ts())),
479 ];
480
481 let line_protocols = encode_events(ProtocolVersion::V2, events, Some("vector"), None, &[]);
482 assert_eq!(
483 line_protocols,
484 "ns.total,metric_type=counter value=1.5 1542182950000000011\n\
485 ns.check,metric_type=counter,normal_tag=value,true_tag=true value=1 1542182950000000011"
486 );
487 }
488
489 #[test]
490 fn test_encode_gauge() {
491 let events = vec![
492 Metric::new(
493 "meter",
494 MetricKind::Incremental,
495 MetricValue::Gauge { value: -1.5 },
496 )
497 .with_namespace(Some("ns"))
498 .with_tags(Some(tags()))
499 .with_timestamp(Some(ts())),
500 ];
501
502 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
503 assert_eq!(
504 line_protocols,
505 "ns.meter,metric_type=gauge,normal_tag=value,true_tag=true value=-1.5 1542182950000000011"
506 );
507 }
508
509 #[test]
510 fn test_encode_set() {
511 let events = vec![
512 Metric::new(
513 "users",
514 MetricKind::Incremental,
515 MetricValue::Set {
516 values: vec!["alice".into(), "bob".into()].into_iter().collect(),
517 },
518 )
519 .with_namespace(Some("ns"))
520 .with_tags(Some(tags()))
521 .with_timestamp(Some(ts())),
522 ];
523
524 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
525 assert_eq!(
526 line_protocols,
527 "ns.users,metric_type=set,normal_tag=value,true_tag=true value=2 1542182950000000011"
528 );
529 }
530
531 #[test]
532 fn test_encode_histogram_v1() {
533 let events = vec![
534 Metric::new(
535 "requests",
536 MetricKind::Absolute,
537 MetricValue::AggregatedHistogram {
538 buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
539 count: 6,
540 sum: 12.5,
541 },
542 )
543 .with_namespace(Some("ns"))
544 .with_tags(Some(tags()))
545 .with_timestamp(Some(ts())),
546 ];
547
548 let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
549 let line_protocols =
550 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
551 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
552 assert_eq!(line_protocols.len(), 1);
553
554 let line_protocol1 = split_line_protocol(line_protocols[0]);
555 assert_eq!("ns.requests", line_protocol1.0);
556 assert_eq!(
557 "metric_type=histogram,normal_tag=value,true_tag=true",
558 line_protocol1.1
559 );
560 assert_fields(
561 line_protocol1.2.to_string(),
562 [
563 "bucket_1=1i",
564 "bucket_2.1=2i",
565 "bucket_3=3i",
566 "count=6i",
567 "sum=12.5",
568 ]
569 .to_vec(),
570 );
571 assert_eq!("1542182950000000011", line_protocol1.3);
572 }
573
574 #[test]
575 fn test_encode_histogram() {
576 let events = vec![
577 Metric::new(
578 "requests",
579 MetricKind::Absolute,
580 MetricValue::AggregatedHistogram {
581 buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
582 count: 6,
583 sum: 12.5,
584 },
585 )
586 .with_namespace(Some("ns"))
587 .with_tags(Some(tags()))
588 .with_timestamp(Some(ts())),
589 ];
590
591 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
592 let line_protocols =
593 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
594 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
595 assert_eq!(line_protocols.len(), 1);
596
597 let line_protocol1 = split_line_protocol(line_protocols[0]);
598 assert_eq!("ns.requests", line_protocol1.0);
599 assert_eq!(
600 "metric_type=histogram,normal_tag=value,true_tag=true",
601 line_protocol1.1
602 );
603 assert_fields(
604 line_protocol1.2.to_string(),
605 [
606 "bucket_1=1u",
607 "bucket_2.1=2u",
608 "bucket_3=3u",
609 "count=6u",
610 "sum=12.5",
611 ]
612 .to_vec(),
613 );
614 assert_eq!("1542182950000000011", line_protocol1.3);
615 }
616
617 #[test]
618 fn test_encode_summary_v1() {
619 let events = vec![
620 Metric::new(
621 "requests_sum",
622 MetricKind::Absolute,
623 MetricValue::AggregatedSummary {
624 quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
625 count: 6,
626 sum: 12.0,
627 },
628 )
629 .with_namespace(Some("ns"))
630 .with_tags(Some(tags()))
631 .with_timestamp(Some(ts())),
632 ];
633
634 let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
635 let line_protocols =
636 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
637 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
638 assert_eq!(line_protocols.len(), 1);
639
640 let line_protocol1 = split_line_protocol(line_protocols[0]);
641 assert_eq!("ns.requests_sum", line_protocol1.0);
642 assert_eq!(
643 "metric_type=summary,normal_tag=value,true_tag=true",
644 line_protocol1.1
645 );
646 assert_fields(
647 line_protocol1.2.to_string(),
648 [
649 "count=6i",
650 "quantile_0.01=1.5",
651 "quantile_0.5=2",
652 "quantile_0.99=3",
653 "sum=12",
654 ]
655 .to_vec(),
656 );
657 assert_eq!("1542182950000000011", line_protocol1.3);
658 }
659
660 #[test]
661 fn test_encode_summary() {
662 let events = vec![
663 Metric::new(
664 "requests_sum",
665 MetricKind::Absolute,
666 MetricValue::AggregatedSummary {
667 quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
668 count: 6,
669 sum: 12.0,
670 },
671 )
672 .with_namespace(Some("ns"))
673 .with_tags(Some(tags()))
674 .with_timestamp(Some(ts())),
675 ];
676
677 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
678 let line_protocols =
679 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
680 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
681 assert_eq!(line_protocols.len(), 1);
682
683 let line_protocol1 = split_line_protocol(line_protocols[0]);
684 assert_eq!("ns.requests_sum", line_protocol1.0);
685 assert_eq!(
686 "metric_type=summary,normal_tag=value,true_tag=true",
687 line_protocol1.1
688 );
689 assert_fields(
690 line_protocol1.2.to_string(),
691 [
692 "count=6u",
693 "quantile_0.01=1.5",
694 "quantile_0.5=2",
695 "quantile_0.99=3",
696 "sum=12",
697 ]
698 .to_vec(),
699 );
700 assert_eq!("1542182950000000011", line_protocol1.3);
701 }
702
703 #[test]
704 fn test_encode_distribution() {
705 let events = vec![
706 Metric::new(
707 "requests",
708 MetricKind::Incremental,
709 MetricValue::Distribution {
710 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
711 statistic: StatisticKind::Histogram,
712 },
713 )
714 .with_namespace(Some("ns"))
715 .with_tags(Some(tags()))
716 .with_timestamp(Some(ts())),
717 Metric::new(
718 "dense_stats",
719 MetricKind::Incremental,
720 MetricValue::Distribution {
721 samples: (0..20)
722 .map(|v| Sample {
723 value: f64::from(v),
724 rate: 1,
725 })
726 .collect(),
727 statistic: StatisticKind::Histogram,
728 },
729 )
730 .with_namespace(Some("ns"))
731 .with_timestamp(Some(ts())),
732 Metric::new(
733 "sparse_stats",
734 MetricKind::Incremental,
735 MetricValue::Distribution {
736 samples: (1..5)
737 .map(|v| Sample {
738 value: f64::from(v),
739 rate: v,
740 })
741 .collect(),
742 statistic: StatisticKind::Histogram,
743 },
744 )
745 .with_namespace(Some("ns"))
746 .with_timestamp(Some(ts())),
747 ];
748
749 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
750 let line_protocols =
751 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
752 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
753 assert_eq!(line_protocols.len(), 3);
754
755 let line_protocol1 = split_line_protocol(line_protocols[0]);
756 assert_eq!("ns.requests", line_protocol1.0);
757 assert_eq!(
758 "metric_type=distribution,normal_tag=value,true_tag=true",
759 line_protocol1.1
760 );
761 assert_fields(
762 line_protocol1.2.to_string(),
763 [
764 "avg=1.875",
765 "count=8",
766 "max=3",
767 "median=2",
768 "min=1",
769 "quantile_0.95=3",
770 "sum=15",
771 ]
772 .to_vec(),
773 );
774 assert_eq!("1542182950000000011", line_protocol1.3);
775
776 let line_protocol2 = split_line_protocol(line_protocols[1]);
777 assert_eq!("ns.dense_stats", line_protocol2.0);
778 assert_eq!("metric_type=distribution", line_protocol2.1);
779 assert_fields(
780 line_protocol2.2.to_string(),
781 [
782 "avg=9.5",
783 "count=20",
784 "max=19",
785 "median=9",
786 "min=0",
787 "quantile_0.95=18",
788 "sum=190",
789 ]
790 .to_vec(),
791 );
792 assert_eq!("1542182950000000011", line_protocol2.3);
793
794 let line_protocol3 = split_line_protocol(line_protocols[2]);
795 assert_eq!("ns.sparse_stats", line_protocol3.0);
796 assert_eq!("metric_type=distribution", line_protocol3.1);
797 assert_fields(
798 line_protocol3.2.to_string(),
799 [
800 "avg=3",
801 "count=10",
802 "max=4",
803 "median=3",
804 "min=1",
805 "quantile_0.95=4",
806 "sum=30",
807 ]
808 .to_vec(),
809 );
810 assert_eq!("1542182950000000011", line_protocol3.3);
811 }
812
813 #[test]
814 fn test_encode_distribution_empty_stats() {
815 let events = vec![
816 Metric::new(
817 "requests",
818 MetricKind::Incremental,
819 MetricValue::Distribution {
820 samples: vec![],
821 statistic: StatisticKind::Histogram,
822 },
823 )
824 .with_namespace(Some("ns"))
825 .with_tags(Some(tags()))
826 .with_timestamp(Some(ts())),
827 ];
828
829 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
830 assert_eq!(line_protocols.len(), 0);
831 }
832
833 #[test]
834 fn test_encode_distribution_zero_counts_stats() {
835 let events = vec![
836 Metric::new(
837 "requests",
838 MetricKind::Incremental,
839 MetricValue::Distribution {
840 samples: vector_lib::samples![1.0 => 0, 2.0 => 0],
841 statistic: StatisticKind::Histogram,
842 },
843 )
844 .with_namespace(Some("ns"))
845 .with_tags(Some(tags()))
846 .with_timestamp(Some(ts())),
847 ];
848
849 let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
850 assert_eq!(line_protocols.len(), 0);
851 }
852
853 #[test]
854 fn test_encode_distribution_summary() {
855 let events = vec![
856 Metric::new(
857 "requests",
858 MetricKind::Incremental,
859 MetricValue::Distribution {
860 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
861 statistic: StatisticKind::Summary,
862 },
863 )
864 .with_namespace(Some("ns"))
865 .with_tags(Some(tags()))
866 .with_timestamp(Some(ts())),
867 ];
868
869 let line_protocols = encode_events(
870 ProtocolVersion::V2,
871 events,
872 None,
873 None,
874 &default_summary_quantiles(),
875 );
876 let line_protocols =
877 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
878 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
879 assert_eq!(line_protocols.len(), 1);
880
881 let line_protocol = split_line_protocol(line_protocols[0]);
882 assert_eq!("ns.requests", line_protocol.0);
883 assert_eq!(
884 "metric_type=distribution,normal_tag=value,true_tag=true",
885 line_protocol.1
886 );
887 assert_fields(
888 line_protocol.2.to_string(),
889 [
890 "avg=1.875",
891 "count=8",
892 "max=3",
893 "median=2",
894 "min=1",
895 "sum=15",
896 "quantile_0.50=2",
897 "quantile_0.75=2",
898 "quantile_0.90=3",
899 "quantile_0.95=3",
900 "quantile_0.99=3",
901 ]
902 .to_vec(),
903 );
904 assert_eq!("1542182950000000011", line_protocol.3);
905 }
906
907 #[test]
908 fn test_encode_with_some_tags() {
909 crate::test_util::trace_init();
910
911 let events = vec![
912 Metric::new(
913 "cpu",
914 MetricKind::Absolute,
915 MetricValue::Gauge { value: 2.5 },
916 )
917 .with_namespace(Some("vector"))
918 .with_timestamp(Some(ts())),
919 Metric::new(
920 "mem",
921 MetricKind::Absolute,
922 MetricValue::Gauge { value: 1000.0 },
923 )
924 .with_namespace(Some("vector"))
925 .with_tags(Some(tags()))
926 .with_timestamp(Some(ts())),
927 ];
928
929 let mut tags = HashMap::new();
930 tags.insert("host".to_owned(), "local".to_owned());
931 tags.insert("datacenter".to_owned(), "us-east".to_owned());
932
933 let line_protocols = encode_events(
934 ProtocolVersion::V1,
935 events,
936 Some("ns"),
937 Some(tags).as_ref(),
938 &[],
939 );
940 let line_protocols =
941 String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
942 let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
943 assert_eq!(line_protocols.len(), 2);
944 assert_eq!(
945 line_protocols[0],
946 "vector.cpu,datacenter=us-east,host=local,metric_type=gauge value=2.5 1542182950000000011"
947 );
948 assert_eq!(
949 line_protocols[1],
950 "vector.mem,datacenter=us-east,host=local,metric_type=gauge,normal_tag=value,true_tag=true value=1000 1542182950000000011"
951 );
952 }
953}
954
955#[cfg(feature = "influxdb-integration-tests")]
956#[cfg(test)]
957mod integration_tests {
958 use chrono::{SecondsFormat, Utc};
959 use futures::stream;
960 use similar_asserts::assert_eq;
961 use vector_lib::metric_tags;
962
963 use crate::{
964 config::{SinkConfig, SinkContext},
965 event::{
966 Event,
967 metric::{Metric, MetricKind, MetricValue},
968 },
969 http::HttpClient,
970 sinks::influxdb::{
971 InfluxDb1Settings, InfluxDb2Settings,
972 metrics::{InfluxDbConfig, InfluxDbSvc, default_summary_quantiles},
973 test_util::{
974 BUCKET, ORG, TOKEN, address_v1, address_v2, cleanup_v1, format_timestamp,
975 onboarding_v1, onboarding_v2, query_v1,
976 },
977 },
978 test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
979 tls::{self, TlsConfig},
980 };
981
982 #[tokio::test]
983 async fn inserts_metrics_v1_over_https() {
984 insert_metrics_v1(
985 address_v1(true).as_str(),
986 Some(TlsConfig {
987 ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
988 ..Default::default()
989 }),
990 )
991 .await
992 }
993
994 #[tokio::test]
995 async fn inserts_metrics_v1_over_http() {
996 insert_metrics_v1(address_v1(false).as_str(), None).await
997 }
998
999 async fn insert_metrics_v1(url: &str, tls: Option<TlsConfig>) {
1000 crate::test_util::trace_init();
1001 let database = onboarding_v1(url).await;
1002
1003 let cx = SinkContext::default();
1004
1005 let config = InfluxDbConfig {
1006 endpoint: url.to_string(),
1007 influxdb1_settings: Some(InfluxDb1Settings {
1008 consistency: None,
1009 database: database.clone(),
1010 retention_policy_name: Some("autogen".to_string()),
1011 username: None,
1012 password: None,
1013 }),
1014 influxdb2_settings: None,
1015 batch: Default::default(),
1016 request: Default::default(),
1017 tls,
1018 quantiles: default_summary_quantiles(),
1019 tags: None,
1020 default_namespace: None,
1021 acknowledgements: Default::default(),
1022 };
1023
1024 let events: Vec<_> = (0..10).map(create_event).collect();
1025 let (sink, _) = config.build(cx).await.expect("error when building config");
1026 run_and_assert_sink_compliance(sink, stream::iter(events.clone()), &HTTP_SINK_TAGS).await;
1027
1028 let res = query_v1_json(url, &format!("show series on {database}")).await;
1029
1030 assert_eq!(
1047 res["results"][0]["series"][0]["values"]
1048 .as_array()
1049 .unwrap()
1050 .len(),
1051 events.len()
1052 );
1053
1054 for event in events {
1055 let metric = event.into_metric();
1056 let name = format!("{}.{}", metric.namespace().unwrap(), metric.name());
1057 let value = match metric.value() {
1058 MetricValue::Counter { value } => *value,
1059 _ => unreachable!(),
1060 };
1061 let timestamp = format_timestamp(metric.timestamp().unwrap(), SecondsFormat::Nanos);
1062 let res = query_v1_json(url, &format!("select * from {database}..\"{name}\"")).await;
1063
1064 assert_eq!(
1065 res,
1066 serde_json::json! {
1067 {"results": [{
1068 "statement_id": 0,
1069 "series": [{
1070 "name": name,
1071 "columns": ["time", "metric_type", "production", "region", "value"],
1072 "values": [[timestamp, "counter", "true", "us-west-1", value as isize]]
1073 }]
1074 }]}
1075 }
1076 );
1077 }
1078
1079 cleanup_v1(url, &database).await;
1080 }
1081
1082 async fn query_v1_json(url: &str, query: &str) -> serde_json::Value {
1083 let string = query_v1(url, query)
1084 .await
1085 .text()
1086 .await
1087 .expect("Fetching text from InfluxDB query failed");
1088 serde_json::from_str(&string).expect("Error when parsing InfluxDB response JSON")
1089 }
1090
1091 #[tokio::test]
1092 async fn influxdb2_metrics_put_data() {
1093 crate::test_util::trace_init();
1094 let endpoint = address_v2();
1095 onboarding_v2(&endpoint).await;
1096
1097 let cx = SinkContext::default();
1098
1099 let config = InfluxDbConfig {
1100 endpoint,
1101 influxdb1_settings: None,
1102 influxdb2_settings: Some(InfluxDb2Settings {
1103 org: ORG.to_string(),
1104 bucket: BUCKET.to_string(),
1105 token: TOKEN.to_string().into(),
1106 }),
1107 quantiles: default_summary_quantiles(),
1108 batch: Default::default(),
1109 request: Default::default(),
1110 tags: None,
1111 tls: None,
1112 default_namespace: None,
1113 acknowledgements: Default::default(),
1114 };
1115
1116 let metric = format!(
1117 "counter-{}",
1118 Utc::now()
1119 .timestamp_nanos_opt()
1120 .expect("Timestamp out of range")
1121 );
1122 let mut events = Vec::new();
1123 for i in 0..10 {
1124 let event = Event::Metric(
1125 Metric::new(
1126 metric.clone(),
1127 MetricKind::Incremental,
1128 MetricValue::Counter { value: i as f64 },
1129 )
1130 .with_namespace(Some("ns"))
1131 .with_tags(Some(metric_tags!(
1132 "region" => "us-west-1",
1133 "production" => "true",
1134 ))),
1135 );
1136 events.push(event);
1137 }
1138
1139 let client = HttpClient::new(None, cx.proxy()).unwrap();
1140 let sink = InfluxDbSvc::new(config, client).unwrap();
1141 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
1142
1143 let mut body = std::collections::HashMap::new();
1144 body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"ns.{metric}\")"));
1145 body.insert("type", "flux".to_owned());
1146
1147 let client = reqwest::Client::builder()
1148 .danger_accept_invalid_certs(true)
1149 .build()
1150 .unwrap();
1151
1152 let res = client
1153 .post(format!("{}/api/v2/query?org=my-org", address_v2()))
1154 .json(&body)
1155 .header("accept", "application/json")
1156 .header("Authorization", "Token my-token")
1157 .send()
1158 .await
1159 .unwrap();
1160 let string = res.text().await.unwrap();
1161
1162 let lines = string.split('\n').collect::<Vec<&str>>();
1163 let header = lines[0].split(',').collect::<Vec<&str>>();
1164 let record = lines[1].split(',').collect::<Vec<&str>>();
1165
1166 assert_eq!(
1167 record[header
1168 .iter()
1169 .position(|&r| r.trim() == "metric_type")
1170 .unwrap()]
1171 .trim(),
1172 "counter"
1173 );
1174 assert_eq!(
1175 record[header
1176 .iter()
1177 .position(|&r| r.trim() == "production")
1178 .unwrap()]
1179 .trim(),
1180 "true"
1181 );
1182 assert_eq!(
1183 record[header.iter().position(|&r| r.trim() == "region").unwrap()].trim(),
1184 "us-west-1"
1185 );
1186 assert_eq!(
1187 record[header
1188 .iter()
1189 .position(|&r| r.trim() == "_measurement")
1190 .unwrap()]
1191 .trim(),
1192 format!("ns.{}", metric)
1193 );
1194 assert_eq!(
1195 record[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1196 "value"
1197 );
1198 assert_eq!(
1199 record[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1200 "45"
1201 );
1202 }
1203
1204 fn create_event(i: i32) -> Event {
1205 Event::Metric(
1206 Metric::new(
1207 format!("counter-{i}"),
1208 MetricKind::Incremental,
1209 MetricValue::Counter { value: i as f64 },
1210 )
1211 .with_namespace(Some("ns"))
1212 .with_tags(Some(metric_tags!(
1213 "region" => "us-west-1",
1214 "production" => "true",
1215 )))
1216 .with_timestamp(Some(Utc::now())),
1217 )
1218 }
1219}