Skip to main content

vector/sinks/influxdb/
metrics.rs

1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{SinkExt, future::BoxFuture, stream};
5use tower::Service;
6use vector_lib::{
7    ByteSizeOf, EstimatedJsonEncodedSizeOf,
8    configurable::configurable_component,
9    event::metric::{MetricSketch, MetricTags, Quantile},
10};
11
12use crate::{
13    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
14    event::{
15        Event, KeyString,
16        metric::{Metric, MetricValue, Sample, StatisticKind},
17    },
18    http::HttpClient,
19    internal_events::InfluxdbEncodingError,
20    sinks::{
21        Healthcheck, VectorSink,
22        influxdb::{
23            Field, InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, encode_timestamp,
24            healthcheck, influx_line_protocol, influxdb_settings,
25        },
26        util::{
27            BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
28            buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
29            encode_namespace,
30            http::{HttpBatchService, HttpRetryLogic},
31            statistic::{DistributionStatistic, validate_quantiles},
32        },
33    },
34    tls::{TlsConfig, TlsSettings},
35};
36
37#[derive(Clone)]
38struct InfluxDbSvc {
39    config: InfluxDbConfig,
40    protocol_version: ProtocolVersion,
41    inner: HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>>,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct InfluxDbDefaultBatchSettings;
46
47impl SinkBatchSettings for InfluxDbDefaultBatchSettings {
48    const MAX_EVENTS: Option<usize> = Some(20);
49    const MAX_BYTES: Option<usize> = None;
50    const TIMEOUT_SECS: f64 = 1.0;
51}
52
53/// Configuration for the `influxdb_metrics` sink.
54#[configurable_component(sink("influxdb_metrics", "Deliver metric event data to InfluxDB."))]
55#[derive(Clone, Debug, Default)]
56#[serde(deny_unknown_fields)]
57pub struct InfluxDbConfig {
58    /// Sets the default namespace for any metrics sent.
59    ///
60    /// This namespace is only used if a metric has no existing namespace. When a namespace is
61    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
62    #[serde(alias = "namespace")]
63    #[configurable(metadata(docs::examples = "service"))]
64    pub default_namespace: Option<String>,
65
66    /// The endpoint to send data to.
67    ///
68    /// This should be a full HTTP URI, including the scheme, host, and port.
69    #[configurable(metadata(docs::examples = "http://localhost:8086/"))]
70    pub endpoint: String,
71
72    #[serde(flatten)]
73    pub influxdb1_settings: Option<InfluxDb1Settings>,
74
75    #[serde(flatten)]
76    pub influxdb2_settings: Option<InfluxDb2Settings>,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub batch: BatchConfig<InfluxDbDefaultBatchSettings>,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub request: TowerRequestConfig,
85
86    /// A map of additional tags, in the key/value pair format, to add to each measurement.
87    #[configurable(metadata(docs::additional_props_description = "A tag key/value pair."))]
88    #[configurable(metadata(docs::examples = "example_tags()"))]
89    pub tags: Option<HashMap<String, String>>,
90
91    #[configurable(derived)]
92    pub tls: Option<TlsConfig>,
93
94    /// The list of quantiles to calculate when sending distribution metrics.
95    #[serde(default = "default_summary_quantiles")]
96    pub quantiles: Vec<f64>,
97
98    #[configurable(derived)]
99    #[serde(
100        default,
101        deserialize_with = "crate::serde::bool_or_struct",
102        skip_serializing_if = "crate::serde::is_default"
103    )]
104    acknowledgements: AcknowledgementsConfig,
105}
106
107pub fn default_summary_quantiles() -> Vec<f64> {
108    vec![0.5, 0.75, 0.9, 0.95, 0.99]
109}
110
111pub fn example_tags() -> HashMap<String, String> {
112    HashMap::from([("region".to_string(), "us-west-1".to_string())])
113}
114
115impl_generate_config_from_default!(InfluxDbConfig);
116
117#[async_trait::async_trait]
118#[typetag::serde(name = "influxdb_metrics")]
119impl SinkConfig for InfluxDbConfig {
120    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
121        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
122        let client = HttpClient::new(tls_settings, cx.proxy())?;
123        let healthcheck = healthcheck(
124            self.clone().endpoint,
125            self.clone().influxdb1_settings,
126            self.clone().influxdb2_settings,
127            client.clone(),
128        )?;
129        validate_quantiles(&self.quantiles)?;
130        let sink = InfluxDbSvc::new(self.clone(), client)?;
131        Ok((sink, healthcheck))
132    }
133
134    fn input(&self) -> Input {
135        Input::metric()
136    }
137
138    fn acknowledgements(&self) -> &AcknowledgementsConfig {
139        &self.acknowledgements
140    }
141}
142
143impl InfluxDbSvc {
144    pub fn new(config: InfluxDbConfig, client: HttpClient) -> crate::Result<VectorSink> {
145        let settings = influxdb_settings(
146            config.influxdb1_settings.clone(),
147            config.influxdb2_settings.clone(),
148        )?;
149
150        let endpoint = config.endpoint.clone();
151        let token = settings.token();
152        let protocol_version = settings.protocol_version();
153
154        let batch = config.batch.into_batch_settings()?;
155        let request = config.request.into_settings();
156
157        let uri = settings.write_uri(endpoint)?;
158
159        let http_service = HttpBatchService::new(client, create_build_request(uri, token.inner()));
160
161        let influxdb_http_service = InfluxDbSvc {
162            config,
163            protocol_version,
164            inner: http_service,
165        };
166        let mut normalizer = MetricNormalizer::<InfluxMetricNormalize>::default();
167
168        let sink = request
169            .batch_sink(
170                HttpRetryLogic::default(),
171                influxdb_http_service,
172                MetricsBuffer::new(batch.size),
173                batch.timeout,
174            )
175            .with_flat_map(move |event: Event| {
176                stream::iter({
177                    let byte_size = event.size_of();
178                    let json_size = event.estimated_json_encoded_size_of();
179
180                    normalizer
181                        .normalize(event.into_metric())
182                        .map(|metric| Ok(EncodedEvent::new(metric, byte_size, json_size)))
183                })
184            })
185            .sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error, internal_log_rate_limit = false));
186
187        #[allow(deprecated)]
188        Ok(VectorSink::from_event_sink(sink))
189    }
190}
191
192impl Service<Vec<Metric>> for InfluxDbSvc {
193    type Response = http::Response<Bytes>;
194    type Error = crate::Error;
195    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
196
197    // Emission of Error internal event is handled upstream by the caller
198    fn poll_ready(&mut self, cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
199        self.inner.poll_ready(cx)
200    }
201
202    // Emission of Error internal event is handled upstream by the caller
203    fn call(&mut self, items: Vec<Metric>) -> Self::Future {
204        let input = encode_events(
205            self.protocol_version,
206            items,
207            self.config.default_namespace.as_deref(),
208            self.config.tags.as_ref(),
209            &self.config.quantiles,
210        );
211        let body = input.freeze();
212
213        self.inner.call(body)
214    }
215}
216
217fn create_build_request(
218    uri: http::Uri,
219    token: &str,
220) -> impl Fn(Bytes) -> BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>
221+ Sync
222+ Send
223+ 'static
224+ use<> {
225    let auth = format!("Token {token}");
226    move |body| {
227        Box::pin(ready(
228            hyper::Request::post(uri.clone())
229                .header("Content-Type", "text/plain")
230                .header("Authorization", auth.clone())
231                .body(body)
232                .map_err(Into::into),
233        ))
234    }
235}
236
237fn merge_tags(event: &Metric, tags: Option<&HashMap<String, String>>) -> Option<MetricTags> {
238    match (event.tags().cloned(), tags) {
239        (Some(mut event_tags), Some(config_tags)) => {
240            event_tags.extend(config_tags.iter().map(|(k, v)| (k.clone(), v.clone())));
241            Some(event_tags)
242        }
243        (Some(event_tags), None) => Some(event_tags),
244        (None, Some(config_tags)) => Some(
245            config_tags
246                .iter()
247                .map(|(k, v)| (k.clone(), v.clone()))
248                .collect(),
249        ),
250        (None, None) => None,
251    }
252}
253
254#[derive(Default)]
255pub struct InfluxMetricNormalize;
256
257impl MetricNormalize for InfluxMetricNormalize {
258    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
259        match (metric.kind(), &metric.value()) {
260            // Counters are disaggregated. We take the previous value from the state
261            // and emit the difference between previous and current as a Counter
262            (_, MetricValue::Counter { .. }) => state.make_incremental(metric),
263            // Convert incremental gauges into absolute ones
264            (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
265            // All others are left as-is
266            _ => Some(metric),
267        }
268    }
269}
270
271fn encode_events(
272    protocol_version: ProtocolVersion,
273    events: Vec<Metric>,
274    default_namespace: Option<&str>,
275    tags: Option<&HashMap<String, String>>,
276    quantiles: &[f64],
277) -> BytesMut {
278    let mut output = BytesMut::new();
279    let count = events.len();
280
281    for event in events.into_iter() {
282        let fullname = encode_namespace(event.namespace().or(default_namespace), '.', event.name());
283        let ts = encode_timestamp(event.timestamp());
284        let tags = merge_tags(&event, tags);
285        let (metric_type, fields) = get_type_and_fields(event.value(), quantiles);
286
287        let mut unwrapped_tags = tags.unwrap_or_default();
288        unwrapped_tags.replace("metric_type".to_owned(), metric_type.to_owned());
289
290        if let Err(error_message) = influx_line_protocol(
291            protocol_version,
292            &fullname,
293            Some(unwrapped_tags),
294            fields,
295            ts,
296            &mut output,
297        ) {
298            emit!(InfluxdbEncodingError {
299                error_message,
300                count,
301            });
302        };
303    }
304
305    // remove last '\n'
306    if !output.is_empty() {
307        output.truncate(output.len() - 1);
308    }
309    output
310}
311
312fn get_type_and_fields(
313    value: &MetricValue,
314    quantiles: &[f64],
315) -> (&'static str, Option<HashMap<KeyString, Field>>) {
316    match value {
317        MetricValue::Counter { value } => ("counter", Some(to_fields(*value))),
318        MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))),
319        MetricValue::Set { values } => ("set", Some(to_fields(values.len() as f64))),
320        MetricValue::AggregatedHistogram {
321            buckets,
322            count,
323            sum,
324        } => {
325            let mut fields: HashMap<KeyString, Field> = buckets
326                .iter()
327                .map(|sample| {
328                    (
329                        format!("bucket_{}", sample.upper_limit).into(),
330                        Field::UnsignedInt(sample.count),
331                    )
332                })
333                .collect();
334            fields.insert("count".into(), Field::UnsignedInt(*count));
335            fields.insert("sum".into(), Field::Float(*sum));
336
337            ("histogram", Some(fields))
338        }
339        MetricValue::AggregatedSummary {
340            quantiles,
341            count,
342            sum,
343        } => {
344            let mut fields: HashMap<KeyString, Field> = quantiles
345                .iter()
346                .map(|quantile| {
347                    (
348                        format!("quantile_{}", quantile.quantile).into(),
349                        Field::Float(quantile.value),
350                    )
351                })
352                .collect();
353            fields.insert("count".into(), Field::UnsignedInt(*count));
354            fields.insert("sum".into(), Field::Float(*sum));
355
356            ("summary", Some(fields))
357        }
358        MetricValue::Distribution { samples, statistic } => {
359            let quantiles = match statistic {
360                StatisticKind::Histogram => &[0.95] as &[_],
361                StatisticKind::Summary => quantiles,
362            };
363            let fields = encode_distribution(samples, quantiles);
364            ("distribution", fields)
365        }
366        MetricValue::Sketch { sketch } => match sketch {
367            MetricSketch::AgentDDSketch(ddsketch) => {
368                // Hard-coded quantiles because InfluxDB can't natively do anything useful with the
369                // actual bins.
370                let mut fields = [0.5, 0.75, 0.9, 0.99]
371                    .iter()
372                    .map(|q| {
373                        let quantile = Quantile {
374                            quantile: *q,
375                            value: ddsketch.quantile(*q).unwrap_or(0.0),
376                        };
377                        (
378                            quantile.to_percentile_string().into(),
379                            Field::Float(quantile.value),
380                        )
381                    })
382                    .collect::<HashMap<KeyString, _>>();
383                fields.insert(
384                    "count".into(),
385                    Field::UnsignedInt(u64::from(ddsketch.count())),
386                );
387                fields.insert(
388                    "min".into(),
389                    Field::Float(ddsketch.min().unwrap_or(f64::MAX)),
390                );
391                fields.insert(
392                    "max".into(),
393                    Field::Float(ddsketch.max().unwrap_or(f64::MIN)),
394                );
395                fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0)));
396                fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0)));
397
398                ("sketch", Some(fields))
399            }
400        },
401    }
402}
403
404fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option<HashMap<KeyString, Field>> {
405    let statistic = DistributionStatistic::from_samples(samples, quantiles)?;
406
407    Some(
408        [
409            ("min".into(), Field::Float(statistic.min)),
410            ("max".into(), Field::Float(statistic.max)),
411            ("median".into(), Field::Float(statistic.median)),
412            ("avg".into(), Field::Float(statistic.avg)),
413            ("sum".into(), Field::Float(statistic.sum)),
414            ("count".into(), Field::Float(statistic.count as f64)),
415        ]
416        .into_iter()
417        .chain(
418            statistic
419                .quantiles
420                .iter()
421                .map(|&(p, val)| (format!("quantile_{p:.2}").into(), Field::Float(val))),
422        )
423        .collect(),
424    )
425}
426
427fn to_fields(value: f64) -> HashMap<KeyString, Field> {
428    [("value".into(), Field::Float(value))]
429        .into_iter()
430        .collect()
431}
432
433#[cfg(test)]
434mod tests {
435    use indoc::indoc;
436    use similar_asserts::assert_eq;
437
438    use super::*;
439    use crate::{
440        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
441        sinks::influxdb::test_util::{assert_fields, split_line_protocol, tags, ts},
442    };
443
444    #[test]
445    fn generate_config() {
446        crate::test_util::test_generate_config::<InfluxDbConfig>();
447    }
448
449    #[test]
450    fn test_config_with_tags() {
451        let config = indoc! {r#"
452            namespace: "vector"
453            endpoint: "http://localhost:9999"
454            tags:
455              region: "us-west-1"
456        "#};
457
458        serde_yaml::from_str::<InfluxDbConfig>(config).unwrap();
459    }
460
461    #[test]
462    fn test_encode_counter() {
463        let events = vec![
464            Metric::new(
465                "total",
466                MetricKind::Incremental,
467                MetricValue::Counter { value: 1.5 },
468            )
469            .with_namespace(Some("ns"))
470            .with_timestamp(Some(ts())),
471            Metric::new(
472                "check",
473                MetricKind::Incremental,
474                MetricValue::Counter { value: 1.0 },
475            )
476            .with_namespace(Some("ns"))
477            .with_tags(Some(tags()))
478            .with_timestamp(Some(ts())),
479        ];
480
481        let line_protocols = encode_events(ProtocolVersion::V2, events, Some("vector"), None, &[]);
482        assert_eq!(
483            line_protocols,
484            "ns.total,metric_type=counter value=1.5 1542182950000000011\n\
485            ns.check,metric_type=counter,normal_tag=value,true_tag=true value=1 1542182950000000011"
486        );
487    }
488
489    #[test]
490    fn test_encode_gauge() {
491        let events = vec![
492            Metric::new(
493                "meter",
494                MetricKind::Incremental,
495                MetricValue::Gauge { value: -1.5 },
496            )
497            .with_namespace(Some("ns"))
498            .with_tags(Some(tags()))
499            .with_timestamp(Some(ts())),
500        ];
501
502        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
503        assert_eq!(
504            line_protocols,
505            "ns.meter,metric_type=gauge,normal_tag=value,true_tag=true value=-1.5 1542182950000000011"
506        );
507    }
508
509    #[test]
510    fn test_encode_set() {
511        let events = vec![
512            Metric::new(
513                "users",
514                MetricKind::Incremental,
515                MetricValue::Set {
516                    values: vec!["alice".into(), "bob".into()].into_iter().collect(),
517                },
518            )
519            .with_namespace(Some("ns"))
520            .with_tags(Some(tags()))
521            .with_timestamp(Some(ts())),
522        ];
523
524        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
525        assert_eq!(
526            line_protocols,
527            "ns.users,metric_type=set,normal_tag=value,true_tag=true value=2 1542182950000000011"
528        );
529    }
530
531    #[test]
532    fn test_encode_histogram_v1() {
533        let events = vec![
534            Metric::new(
535                "requests",
536                MetricKind::Absolute,
537                MetricValue::AggregatedHistogram {
538                    buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
539                    count: 6,
540                    sum: 12.5,
541                },
542            )
543            .with_namespace(Some("ns"))
544            .with_tags(Some(tags()))
545            .with_timestamp(Some(ts())),
546        ];
547
548        let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
549        let line_protocols =
550            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
551        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
552        assert_eq!(line_protocols.len(), 1);
553
554        let line_protocol1 = split_line_protocol(line_protocols[0]);
555        assert_eq!("ns.requests", line_protocol1.0);
556        assert_eq!(
557            "metric_type=histogram,normal_tag=value,true_tag=true",
558            line_protocol1.1
559        );
560        assert_fields(
561            line_protocol1.2.to_string(),
562            [
563                "bucket_1=1i",
564                "bucket_2.1=2i",
565                "bucket_3=3i",
566                "count=6i",
567                "sum=12.5",
568            ]
569            .to_vec(),
570        );
571        assert_eq!("1542182950000000011", line_protocol1.3);
572    }
573
574    #[test]
575    fn test_encode_histogram() {
576        let events = vec![
577            Metric::new(
578                "requests",
579                MetricKind::Absolute,
580                MetricValue::AggregatedHistogram {
581                    buckets: vector_lib::buckets![1.0 => 1, 2.1 => 2, 3.0 => 3],
582                    count: 6,
583                    sum: 12.5,
584                },
585            )
586            .with_namespace(Some("ns"))
587            .with_tags(Some(tags()))
588            .with_timestamp(Some(ts())),
589        ];
590
591        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
592        let line_protocols =
593            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
594        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
595        assert_eq!(line_protocols.len(), 1);
596
597        let line_protocol1 = split_line_protocol(line_protocols[0]);
598        assert_eq!("ns.requests", line_protocol1.0);
599        assert_eq!(
600            "metric_type=histogram,normal_tag=value,true_tag=true",
601            line_protocol1.1
602        );
603        assert_fields(
604            line_protocol1.2.to_string(),
605            [
606                "bucket_1=1u",
607                "bucket_2.1=2u",
608                "bucket_3=3u",
609                "count=6u",
610                "sum=12.5",
611            ]
612            .to_vec(),
613        );
614        assert_eq!("1542182950000000011", line_protocol1.3);
615    }
616
617    #[test]
618    fn test_encode_summary_v1() {
619        let events = vec![
620            Metric::new(
621                "requests_sum",
622                MetricKind::Absolute,
623                MetricValue::AggregatedSummary {
624                    quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
625                    count: 6,
626                    sum: 12.0,
627                },
628            )
629            .with_namespace(Some("ns"))
630            .with_tags(Some(tags()))
631            .with_timestamp(Some(ts())),
632        ];
633
634        let line_protocols = encode_events(ProtocolVersion::V1, events, None, None, &[]);
635        let line_protocols =
636            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
637        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
638        assert_eq!(line_protocols.len(), 1);
639
640        let line_protocol1 = split_line_protocol(line_protocols[0]);
641        assert_eq!("ns.requests_sum", line_protocol1.0);
642        assert_eq!(
643            "metric_type=summary,normal_tag=value,true_tag=true",
644            line_protocol1.1
645        );
646        assert_fields(
647            line_protocol1.2.to_string(),
648            [
649                "count=6i",
650                "quantile_0.01=1.5",
651                "quantile_0.5=2",
652                "quantile_0.99=3",
653                "sum=12",
654            ]
655            .to_vec(),
656        );
657        assert_eq!("1542182950000000011", line_protocol1.3);
658    }
659
660    #[test]
661    fn test_encode_summary() {
662        let events = vec![
663            Metric::new(
664                "requests_sum",
665                MetricKind::Absolute,
666                MetricValue::AggregatedSummary {
667                    quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
668                    count: 6,
669                    sum: 12.0,
670                },
671            )
672            .with_namespace(Some("ns"))
673            .with_tags(Some(tags()))
674            .with_timestamp(Some(ts())),
675        ];
676
677        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
678        let line_protocols =
679            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
680        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
681        assert_eq!(line_protocols.len(), 1);
682
683        let line_protocol1 = split_line_protocol(line_protocols[0]);
684        assert_eq!("ns.requests_sum", line_protocol1.0);
685        assert_eq!(
686            "metric_type=summary,normal_tag=value,true_tag=true",
687            line_protocol1.1
688        );
689        assert_fields(
690            line_protocol1.2.to_string(),
691            [
692                "count=6u",
693                "quantile_0.01=1.5",
694                "quantile_0.5=2",
695                "quantile_0.99=3",
696                "sum=12",
697            ]
698            .to_vec(),
699        );
700        assert_eq!("1542182950000000011", line_protocol1.3);
701    }
702
703    #[test]
704    fn test_encode_distribution() {
705        let events = vec![
706            Metric::new(
707                "requests",
708                MetricKind::Incremental,
709                MetricValue::Distribution {
710                    samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
711                    statistic: StatisticKind::Histogram,
712                },
713            )
714            .with_namespace(Some("ns"))
715            .with_tags(Some(tags()))
716            .with_timestamp(Some(ts())),
717            Metric::new(
718                "dense_stats",
719                MetricKind::Incremental,
720                MetricValue::Distribution {
721                    samples: (0..20)
722                        .map(|v| Sample {
723                            value: f64::from(v),
724                            rate: 1,
725                        })
726                        .collect(),
727                    statistic: StatisticKind::Histogram,
728                },
729            )
730            .with_namespace(Some("ns"))
731            .with_timestamp(Some(ts())),
732            Metric::new(
733                "sparse_stats",
734                MetricKind::Incremental,
735                MetricValue::Distribution {
736                    samples: (1..5)
737                        .map(|v| Sample {
738                            value: f64::from(v),
739                            rate: v,
740                        })
741                        .collect(),
742                    statistic: StatisticKind::Histogram,
743                },
744            )
745            .with_namespace(Some("ns"))
746            .with_timestamp(Some(ts())),
747        ];
748
749        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
750        let line_protocols =
751            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
752        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
753        assert_eq!(line_protocols.len(), 3);
754
755        let line_protocol1 = split_line_protocol(line_protocols[0]);
756        assert_eq!("ns.requests", line_protocol1.0);
757        assert_eq!(
758            "metric_type=distribution,normal_tag=value,true_tag=true",
759            line_protocol1.1
760        );
761        assert_fields(
762            line_protocol1.2.to_string(),
763            [
764                "avg=1.875",
765                "count=8",
766                "max=3",
767                "median=2",
768                "min=1",
769                "quantile_0.95=3",
770                "sum=15",
771            ]
772            .to_vec(),
773        );
774        assert_eq!("1542182950000000011", line_protocol1.3);
775
776        let line_protocol2 = split_line_protocol(line_protocols[1]);
777        assert_eq!("ns.dense_stats", line_protocol2.0);
778        assert_eq!("metric_type=distribution", line_protocol2.1);
779        assert_fields(
780            line_protocol2.2.to_string(),
781            [
782                "avg=9.5",
783                "count=20",
784                "max=19",
785                "median=9",
786                "min=0",
787                "quantile_0.95=18",
788                "sum=190",
789            ]
790            .to_vec(),
791        );
792        assert_eq!("1542182950000000011", line_protocol2.3);
793
794        let line_protocol3 = split_line_protocol(line_protocols[2]);
795        assert_eq!("ns.sparse_stats", line_protocol3.0);
796        assert_eq!("metric_type=distribution", line_protocol3.1);
797        assert_fields(
798            line_protocol3.2.to_string(),
799            [
800                "avg=3",
801                "count=10",
802                "max=4",
803                "median=3",
804                "min=1",
805                "quantile_0.95=4",
806                "sum=30",
807            ]
808            .to_vec(),
809        );
810        assert_eq!("1542182950000000011", line_protocol3.3);
811    }
812
813    #[test]
814    fn test_encode_distribution_empty_stats() {
815        let events = vec![
816            Metric::new(
817                "requests",
818                MetricKind::Incremental,
819                MetricValue::Distribution {
820                    samples: vec![],
821                    statistic: StatisticKind::Histogram,
822                },
823            )
824            .with_namespace(Some("ns"))
825            .with_tags(Some(tags()))
826            .with_timestamp(Some(ts())),
827        ];
828
829        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
830        assert_eq!(line_protocols.len(), 0);
831    }
832
833    #[test]
834    fn test_encode_distribution_zero_counts_stats() {
835        let events = vec![
836            Metric::new(
837                "requests",
838                MetricKind::Incremental,
839                MetricValue::Distribution {
840                    samples: vector_lib::samples![1.0 => 0, 2.0 => 0],
841                    statistic: StatisticKind::Histogram,
842                },
843            )
844            .with_namespace(Some("ns"))
845            .with_tags(Some(tags()))
846            .with_timestamp(Some(ts())),
847        ];
848
849        let line_protocols = encode_events(ProtocolVersion::V2, events, None, None, &[]);
850        assert_eq!(line_protocols.len(), 0);
851    }
852
853    #[test]
854    fn test_encode_distribution_summary() {
855        let events = vec![
856            Metric::new(
857                "requests",
858                MetricKind::Incremental,
859                MetricValue::Distribution {
860                    samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
861                    statistic: StatisticKind::Summary,
862                },
863            )
864            .with_namespace(Some("ns"))
865            .with_tags(Some(tags()))
866            .with_timestamp(Some(ts())),
867        ];
868
869        let line_protocols = encode_events(
870            ProtocolVersion::V2,
871            events,
872            None,
873            None,
874            &default_summary_quantiles(),
875        );
876        let line_protocols =
877            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
878        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
879        assert_eq!(line_protocols.len(), 1);
880
881        let line_protocol = split_line_protocol(line_protocols[0]);
882        assert_eq!("ns.requests", line_protocol.0);
883        assert_eq!(
884            "metric_type=distribution,normal_tag=value,true_tag=true",
885            line_protocol.1
886        );
887        assert_fields(
888            line_protocol.2.to_string(),
889            [
890                "avg=1.875",
891                "count=8",
892                "max=3",
893                "median=2",
894                "min=1",
895                "sum=15",
896                "quantile_0.50=2",
897                "quantile_0.75=2",
898                "quantile_0.90=3",
899                "quantile_0.95=3",
900                "quantile_0.99=3",
901            ]
902            .to_vec(),
903        );
904        assert_eq!("1542182950000000011", line_protocol.3);
905    }
906
907    #[test]
908    fn test_encode_with_some_tags() {
909        crate::test_util::trace_init();
910
911        let events = vec![
912            Metric::new(
913                "cpu",
914                MetricKind::Absolute,
915                MetricValue::Gauge { value: 2.5 },
916            )
917            .with_namespace(Some("vector"))
918            .with_timestamp(Some(ts())),
919            Metric::new(
920                "mem",
921                MetricKind::Absolute,
922                MetricValue::Gauge { value: 1000.0 },
923            )
924            .with_namespace(Some("vector"))
925            .with_tags(Some(tags()))
926            .with_timestamp(Some(ts())),
927        ];
928
929        let mut tags = HashMap::new();
930        tags.insert("host".to_owned(), "local".to_owned());
931        tags.insert("datacenter".to_owned(), "us-east".to_owned());
932
933        let line_protocols = encode_events(
934            ProtocolVersion::V1,
935            events,
936            Some("ns"),
937            Some(tags).as_ref(),
938            &[],
939        );
940        let line_protocols =
941            String::from_utf8(line_protocols.freeze().as_ref().to_owned()).unwrap();
942        let line_protocols: Vec<&str> = line_protocols.split('\n').collect();
943        assert_eq!(line_protocols.len(), 2);
944        assert_eq!(
945            line_protocols[0],
946            "vector.cpu,datacenter=us-east,host=local,metric_type=gauge value=2.5 1542182950000000011"
947        );
948        assert_eq!(
949            line_protocols[1],
950            "vector.mem,datacenter=us-east,host=local,metric_type=gauge,normal_tag=value,true_tag=true value=1000 1542182950000000011"
951        );
952    }
953}
954
955#[cfg(feature = "influxdb-integration-tests")]
956#[cfg(test)]
957mod integration_tests {
958    use chrono::{SecondsFormat, Utc};
959    use futures::stream;
960    use similar_asserts::assert_eq;
961    use vector_lib::metric_tags;
962
963    use crate::{
964        config::{SinkConfig, SinkContext},
965        event::{
966            Event,
967            metric::{Metric, MetricKind, MetricValue},
968        },
969        http::HttpClient,
970        sinks::influxdb::{
971            InfluxDb1Settings, InfluxDb2Settings,
972            metrics::{InfluxDbConfig, InfluxDbSvc, default_summary_quantiles},
973            test_util::{
974                BUCKET, ORG, TOKEN, address_v1, address_v2, cleanup_v1, format_timestamp,
975                onboarding_v1, onboarding_v2, query_v1,
976            },
977        },
978        test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
979        tls::{self, TlsConfig},
980    };
981
982    #[tokio::test]
983    async fn inserts_metrics_v1_over_https() {
984        insert_metrics_v1(
985            address_v1(true).as_str(),
986            Some(TlsConfig {
987                ca_file: Some(tls::TEST_PEM_CA_PATH.into()),
988                ..Default::default()
989            }),
990        )
991        .await
992    }
993
994    #[tokio::test]
995    async fn inserts_metrics_v1_over_http() {
996        insert_metrics_v1(address_v1(false).as_str(), None).await
997    }
998
999    async fn insert_metrics_v1(url: &str, tls: Option<TlsConfig>) {
1000        crate::test_util::trace_init();
1001        let database = onboarding_v1(url).await;
1002
1003        let cx = SinkContext::default();
1004
1005        let config = InfluxDbConfig {
1006            endpoint: url.to_string(),
1007            influxdb1_settings: Some(InfluxDb1Settings {
1008                consistency: None,
1009                database: database.clone(),
1010                retention_policy_name: Some("autogen".to_string()),
1011                username: None,
1012                password: None,
1013            }),
1014            influxdb2_settings: None,
1015            batch: Default::default(),
1016            request: Default::default(),
1017            tls,
1018            quantiles: default_summary_quantiles(),
1019            tags: None,
1020            default_namespace: None,
1021            acknowledgements: Default::default(),
1022        };
1023
1024        let events: Vec<_> = (0..10).map(create_event).collect();
1025        let (sink, _) = config.build(cx).await.expect("error when building config");
1026        run_and_assert_sink_compliance(sink, stream::iter(events.clone()), &HTTP_SINK_TAGS).await;
1027
1028        let res = query_v1_json(url, &format!("show series on {database}")).await;
1029
1030        //
1031        // {"results":[{"statement_id":0,"series":[{"columns":["key"],"values":
1032        //  [
1033        //    ["ns.counter-0,metric_type=counter,production=true,region=us-west-1"],
1034        //    ["ns.counter-1,metric_type=counter,production=true,region=us-west-1"],
1035        //    ["ns.counter-2,metric_type=counter,production=true,region=us-west-1"],
1036        //    ["ns.counter-3,metric_type=counter,production=true,region=us-west-1"],
1037        //    ["ns.counter-4,metric_type=counter,production=true,region=us-west-1"],
1038        //    ["ns.counter-5,metric_type=counter,production=true,region=us-west-1"],
1039        //    ["ns.counter-6,metric_type=counter,production=true,region=us-west-1"],
1040        //    ["ns.counter-7,metric_type=counter,production=true,region=us-west-1"],
1041        //    ["ns.counter-8,metric_type=counter,production=true,region=us-west-1"],
1042        //    ["ns.counter-9,metric_type=counter,production=true,region=us-west-1"]
1043        //  ]}]}]}\n
1044        //
1045
1046        assert_eq!(
1047            res["results"][0]["series"][0]["values"]
1048                .as_array()
1049                .unwrap()
1050                .len(),
1051            events.len()
1052        );
1053
1054        for event in events {
1055            let metric = event.into_metric();
1056            let name = format!("{}.{}", metric.namespace().unwrap(), metric.name());
1057            let value = match metric.value() {
1058                MetricValue::Counter { value } => *value,
1059                _ => unreachable!(),
1060            };
1061            let timestamp = format_timestamp(metric.timestamp().unwrap(), SecondsFormat::Nanos);
1062            let res = query_v1_json(url, &format!("select * from {database}..\"{name}\"")).await;
1063
1064            assert_eq!(
1065                res,
1066                serde_json::json! {
1067                    {"results": [{
1068                        "statement_id": 0,
1069                        "series": [{
1070                            "name": name,
1071                            "columns": ["time", "metric_type", "production", "region", "value"],
1072                            "values": [[timestamp, "counter", "true", "us-west-1", value as isize]]
1073                        }]
1074                    }]}
1075                }
1076            );
1077        }
1078
1079        cleanup_v1(url, &database).await;
1080    }
1081
1082    async fn query_v1_json(url: &str, query: &str) -> serde_json::Value {
1083        let string = query_v1(url, query)
1084            .await
1085            .text()
1086            .await
1087            .expect("Fetching text from InfluxDB query failed");
1088        serde_json::from_str(&string).expect("Error when parsing InfluxDB response JSON")
1089    }
1090
1091    #[tokio::test]
1092    async fn influxdb2_metrics_put_data() {
1093        crate::test_util::trace_init();
1094        let endpoint = address_v2();
1095        onboarding_v2(&endpoint).await;
1096
1097        let cx = SinkContext::default();
1098
1099        let config = InfluxDbConfig {
1100            endpoint,
1101            influxdb1_settings: None,
1102            influxdb2_settings: Some(InfluxDb2Settings {
1103                org: ORG.to_string(),
1104                bucket: BUCKET.to_string(),
1105                token: TOKEN.to_string().into(),
1106            }),
1107            quantiles: default_summary_quantiles(),
1108            batch: Default::default(),
1109            request: Default::default(),
1110            tags: None,
1111            tls: None,
1112            default_namespace: None,
1113            acknowledgements: Default::default(),
1114        };
1115
1116        let metric = format!(
1117            "counter-{}",
1118            Utc::now()
1119                .timestamp_nanos_opt()
1120                .expect("Timestamp out of range")
1121        );
1122        let mut events = Vec::new();
1123        for i in 0..10 {
1124            let event = Event::Metric(
1125                Metric::new(
1126                    metric.clone(),
1127                    MetricKind::Incremental,
1128                    MetricValue::Counter { value: i as f64 },
1129                )
1130                .with_namespace(Some("ns"))
1131                .with_tags(Some(metric_tags!(
1132                    "region" => "us-west-1",
1133                    "production" => "true",
1134                ))),
1135            );
1136            events.push(event);
1137        }
1138
1139        let client = HttpClient::new(None, cx.proxy()).unwrap();
1140        let sink = InfluxDbSvc::new(config, client).unwrap();
1141        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
1142
1143        let mut body = std::collections::HashMap::new();
1144        body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"ns.{metric}\")"));
1145        body.insert("type", "flux".to_owned());
1146
1147        let client = reqwest::Client::builder()
1148            .danger_accept_invalid_certs(true)
1149            .build()
1150            .unwrap();
1151
1152        let res = client
1153            .post(format!("{}/api/v2/query?org=my-org", address_v2()))
1154            .json(&body)
1155            .header("accept", "application/json")
1156            .header("Authorization", "Token my-token")
1157            .send()
1158            .await
1159            .unwrap();
1160        let string = res.text().await.unwrap();
1161
1162        let lines = string.split('\n').collect::<Vec<&str>>();
1163        let header = lines[0].split(',').collect::<Vec<&str>>();
1164        let record = lines[1].split(',').collect::<Vec<&str>>();
1165
1166        assert_eq!(
1167            record[header
1168                .iter()
1169                .position(|&r| r.trim() == "metric_type")
1170                .unwrap()]
1171            .trim(),
1172            "counter"
1173        );
1174        assert_eq!(
1175            record[header
1176                .iter()
1177                .position(|&r| r.trim() == "production")
1178                .unwrap()]
1179            .trim(),
1180            "true"
1181        );
1182        assert_eq!(
1183            record[header.iter().position(|&r| r.trim() == "region").unwrap()].trim(),
1184            "us-west-1"
1185        );
1186        assert_eq!(
1187            record[header
1188                .iter()
1189                .position(|&r| r.trim() == "_measurement")
1190                .unwrap()]
1191            .trim(),
1192            format!("ns.{}", metric)
1193        );
1194        assert_eq!(
1195            record[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1196            "value"
1197        );
1198        assert_eq!(
1199            record[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1200            "45"
1201        );
1202    }
1203
1204    fn create_event(i: i32) -> Event {
1205        Event::Metric(
1206            Metric::new(
1207                format!("counter-{i}"),
1208                MetricKind::Incremental,
1209                MetricValue::Counter { value: i as f64 },
1210            )
1211            .with_namespace(Some("ns"))
1212            .with_tags(Some(metric_tags!(
1213                "region" => "us-west-1",
1214                "production" => "true",
1215            )))
1216            .with_timestamp(Some(Utc::now())),
1217        )
1218    }
1219}