Skip to main content

vector/sinks/influxdb/
mod.rs

1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::{
13    configurable::configurable_component,
14    event::{KeyString, MetricTags},
15    sensitive_string::SensitiveString,
16};
17
18use crate::http::HttpClient;
19
20pub(in crate::sinks) enum Field {
21    /// string
22    String(String),
23    /// float
24    Float(f64),
25    /// unsigned integer
26    /// Influx can support 64 bit integers if compiled with a flag, see:
27    /// <https://github.com/influxdata/influxdb/issues/7801#issuecomment-466801839>
28    UnsignedInt(u64),
29    /// integer
30    Int(i64),
31    /// boolean
32    Bool(bool),
33}
34
35#[derive(Clone, Copy, Debug)]
36pub(in crate::sinks) enum ProtocolVersion {
37    V1,
38    V2,
39}
40
41#[derive(Debug, Snafu)]
42enum ConfigError {
43    #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
44    MissingConfiguration,
45    #[snafu(display(
46        "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
47        v1_settings,
48        v2_settings
49    ))]
50    BothConfiguration {
51        v1_settings: InfluxDb1Settings,
52        v2_settings: InfluxDb2Settings,
53    },
54}
55
56/// Configuration settings for InfluxDB v0.x/v1.x.
57#[configurable_component]
58#[derive(Clone, Debug)]
59pub struct InfluxDb1Settings {
60    /// The name of the database to write into.
61    ///
62    /// Only relevant when using InfluxDB v0.x/v1.x.
63    #[configurable(metadata(docs::examples = "vector-database"))]
64    #[configurable(metadata(docs::examples = "iot-store"))]
65    database: String,
66
67    /// The consistency level to use for writes.
68    ///
69    /// Only relevant when using InfluxDB v0.x/v1.x.
70    #[configurable(metadata(docs::examples = "any"))]
71    #[configurable(metadata(docs::examples = "one"))]
72    #[configurable(metadata(docs::examples = "quorum"))]
73    #[configurable(metadata(docs::examples = "all"))]
74    consistency: Option<String>,
75
76    /// The target retention policy for writes.
77    ///
78    /// Only relevant when using InfluxDB v0.x/v1.x.
79    #[configurable(metadata(docs::examples = "autogen"))]
80    #[configurable(metadata(docs::examples = "one_day_only"))]
81    retention_policy_name: Option<String>,
82
83    /// The username to authenticate with.
84    ///
85    /// Only relevant when using InfluxDB v0.x/v1.x.
86    #[configurable(metadata(docs::examples = "todd"))]
87    #[configurable(metadata(docs::examples = "vector-source"))]
88    username: Option<String>,
89
90    /// The password to authenticate with.
91    ///
92    /// Only relevant when using InfluxDB v0.x/v1.x.
93    #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
94    #[configurable(metadata(docs::examples = "influxdb4ever"))]
95    password: Option<SensitiveString>,
96}
97
98/// Configuration settings for InfluxDB v2.x.
99#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct InfluxDb2Settings {
102    /// The name of the organization to write into.
103    ///
104    /// Only relevant when using InfluxDB v2.x and above.
105    #[configurable(metadata(docs::examples = "my-org"))]
106    #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
107    org: String,
108
109    /// The name of the bucket to write into.
110    ///
111    /// Only relevant when using InfluxDB v2.x and above.
112    #[configurable(metadata(docs::examples = "vector-bucket"))]
113    #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
114    bucket: String,
115
116    /// The [token][token_docs] to authenticate with.
117    ///
118    /// Only relevant when using InfluxDB v2.x and above.
119    ///
120    /// [token_docs]: https://v2.docs.influxdata.com/v2.0/security/tokens/
121    #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
122    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
123    token: SensitiveString,
124}
125
126trait InfluxDbSettings: std::fmt::Debug {
127    fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
128    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
129    fn token(&self) -> SensitiveString;
130    fn protocol_version(&self) -> ProtocolVersion;
131}
132
133impl InfluxDbSettings for InfluxDb1Settings {
134    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
135        encode_uri(
136            &endpoint,
137            "write",
138            &[
139                ("consistency", self.consistency.clone()),
140                ("db", Some(self.database.clone())),
141                ("rp", self.retention_policy_name.clone()),
142                ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
143                ("u", self.username.clone()),
144                ("precision", Some("ns".to_owned())),
145            ],
146        )
147    }
148
149    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
150        encode_uri(&endpoint, "ping", &[])
151    }
152
153    fn token(&self) -> SensitiveString {
154        SensitiveString::default()
155    }
156
157    fn protocol_version(&self) -> ProtocolVersion {
158        ProtocolVersion::V1
159    }
160}
161
162impl InfluxDbSettings for InfluxDb2Settings {
163    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
164        encode_uri(
165            &endpoint,
166            "api/v2/write",
167            &[
168                ("org", Some(self.org.clone())),
169                ("bucket", Some(self.bucket.clone())),
170                ("precision", Some("ns".to_owned())),
171            ],
172        )
173    }
174
175    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
176        encode_uri(&endpoint, "ping", &[])
177    }
178
179    fn token(&self) -> SensitiveString {
180        self.token.clone()
181    }
182
183    fn protocol_version(&self) -> ProtocolVersion {
184        ProtocolVersion::V2
185    }
186}
187
188fn influxdb_settings(
189    influxdb1_settings: Option<InfluxDb1Settings>,
190    influxdb2_settings: Option<InfluxDb2Settings>,
191) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
192    match (influxdb1_settings, influxdb2_settings) {
193        (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
194            v1_settings,
195            v2_settings,
196        }
197        .into()),
198        (None, None) => Err(ConfigError::MissingConfiguration.into()),
199        (Some(settings), _) => Ok(Box::new(settings)),
200        (_, Some(settings)) => Ok(Box::new(settings)),
201    }
202}
203
204// V1: https://docs.influxdata.com/influxdb/v1.7/tools/api/#ping-http-endpoint
205// V2: https://v2.docs.influxdata.com/v2.0/api/#operation/GetHealth
206fn healthcheck(
207    endpoint: String,
208    influxdb1_settings: Option<InfluxDb1Settings>,
209    influxdb2_settings: Option<InfluxDb2Settings>,
210    mut client: HttpClient,
211) -> crate::Result<super::Healthcheck> {
212    let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
213
214    let uri = settings.healthcheck_uri(endpoint)?;
215
216    let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
217
218    Ok(async move {
219        client
220            .call(request)
221            .await
222            .map_err(|error| error.into())
223            .and_then(|response| match response.status() {
224                StatusCode::OK => Ok(()),
225                StatusCode::NO_CONTENT => Ok(()),
226                other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
227            })
228    }
229    .boxed())
230}
231
232// https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
233pub(in crate::sinks) fn influx_line_protocol(
234    protocol_version: ProtocolVersion,
235    measurement: &str,
236    tags: Option<MetricTags>,
237    fields: Option<HashMap<KeyString, Field>>,
238    timestamp: i64,
239    line_protocol: &mut BytesMut,
240) -> Result<(), &'static str> {
241    // Fields
242    let unwrapped_fields = fields.unwrap_or_default();
243    // LineProtocol should have a field
244    if unwrapped_fields.is_empty() {
245        return Err("fields must not be empty");
246    }
247
248    encode_string(measurement, line_protocol);
249
250    // Tags are optional
251    let unwrapped_tags = tags.unwrap_or_default();
252    if !unwrapped_tags.is_empty() {
253        line_protocol.put_u8(b',');
254        encode_tags(unwrapped_tags, line_protocol);
255    }
256    line_protocol.put_u8(b' ');
257
258    // Fields
259    encode_fields(protocol_version, unwrapped_fields, line_protocol);
260    line_protocol.put_u8(b' ');
261
262    // Timestamp
263    line_protocol.put_slice(&timestamp.to_string().into_bytes());
264    line_protocol.put_u8(b'\n');
265    Ok(())
266}
267
268fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
269    let original_len = output.len();
270    // `tags` is already sorted
271    for (key, value) in tags.iter_single() {
272        if key.is_empty() || value.is_empty() {
273            continue;
274        }
275        encode_string(key, output);
276        output.put_u8(b'=');
277        encode_string(value, output);
278        output.put_u8(b',');
279    }
280
281    // remove last ','
282    if output.len() > original_len {
283        output.truncate(output.len() - 1);
284    }
285}
286
287fn encode_fields(
288    protocol_version: ProtocolVersion,
289    fields: HashMap<KeyString, Field>,
290    output: &mut BytesMut,
291) {
292    let original_len = output.len();
293    for (key, value) in fields.into_iter() {
294        encode_string(&key, output);
295        output.put_u8(b'=');
296        match value {
297            Field::String(s) => {
298                output.put_u8(b'"');
299                for c in s.chars() {
300                    if "\\\"".contains(c) {
301                        output.put_u8(b'\\');
302                    }
303                    let mut c_buffer: [u8; 4] = [0; 4];
304                    output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
305                }
306                output.put_u8(b'"');
307            }
308            Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
309            Field::UnsignedInt(i) => {
310                output.put_slice(&i.to_string().into_bytes());
311                let c = match protocol_version {
312                    ProtocolVersion::V1 => 'i',
313                    ProtocolVersion::V2 => 'u',
314                };
315                let mut c_buffer: [u8; 4] = [0; 4];
316                output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
317            }
318            Field::Int(i) => {
319                output.put_slice(&i.to_string().into_bytes());
320                output.put_u8(b'i');
321            }
322            Field::Bool(b) => {
323                output.put_slice(&b.to_string().into_bytes());
324            }
325        };
326        output.put_u8(b',');
327    }
328
329    // remove last ','
330    if output.len() > original_len {
331        output.truncate(output.len() - 1);
332    }
333}
334
335fn encode_string(key: &str, output: &mut BytesMut) {
336    for c in key.chars() {
337        if "\\, =".contains(c) {
338            output.put_u8(b'\\');
339        }
340        let mut c_buffer: [u8; 4] = [0; 4];
341        output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
342    }
343}
344
345pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
346    if let Some(ts) = timestamp {
347        ts.timestamp_nanos_opt().unwrap()
348    } else {
349        encode_timestamp(Some(Utc::now()))
350    }
351}
352
353pub(in crate::sinks) fn encode_uri(
354    endpoint: &str,
355    path: &str,
356    pairs: &[(&str, Option<String>)],
357) -> crate::Result<Uri> {
358    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
359
360    for pair in pairs {
361        if let Some(v) = &pair.1 {
362            serializer.append_pair(pair.0, v);
363        }
364    }
365
366    let mut url = if endpoint.ends_with('/') {
367        format!("{}{}?{}", endpoint, path, serializer.finish())
368    } else {
369        format!("{}/{}?{}", endpoint, path, serializer.finish())
370    };
371
372    if url.ends_with('?') {
373        url.pop();
374    }
375
376    Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
377}
378
379#[cfg(test)]
380#[allow(dead_code)]
381pub mod test_util {
382    use std::{fs::File, io::Read};
383
384    use chrono::{DateTime, SecondsFormat, Timelike, Utc, offset::TimeZone};
385    use vector_lib::metric_tags;
386
387    use super::*;
388    use crate::tls;
389
390    pub(crate) const ORG: &str = "my-org";
391    pub(crate) const BUCKET: &str = "my-bucket";
392    pub(crate) const TOKEN: &str = "my-token";
393
394    pub(crate) fn next_database() -> String {
395        format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
396    }
397
398    pub(crate) fn ts() -> DateTime<Utc> {
399        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
400            .single()
401            .and_then(|t| t.with_nanosecond(11))
402            .expect("invalid timestamp")
403    }
404
405    pub(crate) fn tags() -> MetricTags {
406        metric_tags!(
407            "normal_tag" => "value",
408            "true_tag" => "true",
409            "empty_tag" => "",
410        )
411    }
412
413    pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
414        let encoded_fields: Vec<&str> = value.split(',').collect();
415
416        assert_eq!(fields.len(), encoded_fields.len());
417
418        for field in fields.into_iter() {
419            assert!(
420                encoded_fields.contains(&field),
421                "Fields: {value} has to have: {field}"
422            )
423        }
424    }
425
426    pub(crate) fn address_v1(secure: bool) -> String {
427        if secure {
428            std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
429                .unwrap_or_else(|_| "http://localhost:8087".into())
430        } else {
431            std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
432                .unwrap_or_else(|_| "http://localhost:8086".into())
433        }
434    }
435
436    pub(crate) fn address_v2() -> String {
437        std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
438    }
439
440    // ns.requests,metric_type=distribution,normal_tag=value,true_tag=true avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15 1542182950000000011
441    //
442    // =>
443    //
444    // ns.requests
445    // metric_type=distribution,normal_tag=value,true_tag=true
446    // avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15
447    // 1542182950000000011
448    //
449    pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
450        let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
451        // tags and timestamp may not be present
452        let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
453        let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
454
455        (measurement, tags, fields.to_string(), ts)
456    }
457
458    fn client() -> reqwest::Client {
459        let mut test_ca = Vec::<u8>::new();
460        File::open(tls::TEST_PEM_CA_PATH)
461            .unwrap()
462            .read_to_end(&mut test_ca)
463            .unwrap();
464        let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
465
466        reqwest::Client::builder()
467            .add_root_certificate(test_ca)
468            .build()
469            .unwrap()
470    }
471
472    pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
473        client()
474            .get(format!("{endpoint}/query"))
475            .query(&[("q", query)])
476            .send()
477            .await
478            .unwrap()
479    }
480
481    pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
482        let database = next_database();
483        let status = query_v1(endpoint, &format!("create database {database}"))
484            .await
485            .status();
486        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
487        // Some times InfluxDB will return OK before it can actually
488        // accept writes to the database, leading to test failures. Test
489        // this with empty writes and loop if it reports the database
490        // does not exist yet.
491        crate::test_util::wait_for(|| {
492            let write_url = format!("{}/write?db={}", endpoint, &database);
493            async move {
494                match client()
495                    .post(&write_url)
496                    .header("Content-Type", "text/plain")
497                    .header("Authorization", &format!("Token {TOKEN}"))
498                    .body("")
499                    .send()
500                    .await
501                    .unwrap()
502                    .status()
503                {
504                    http::StatusCode::NO_CONTENT => true,
505                    http::StatusCode::NOT_FOUND => false,
506                    status => panic!("Unexpected status: {status}"),
507                }
508            }
509        })
510        .await;
511        database
512    }
513
514    pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
515        let status = query_v1(endpoint, &format!("drop database {database}"))
516            .await
517            .status();
518        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
519    }
520
521    pub(crate) async fn onboarding_v2(endpoint: &str) {
522        let mut body = std::collections::HashMap::new();
523        body.insert("username", "my-user");
524        body.insert("password", "my-password");
525        body.insert("org", ORG);
526        body.insert("bucket", BUCKET);
527        body.insert("token", TOKEN);
528
529        let client = reqwest::Client::builder()
530            .danger_accept_invalid_certs(true)
531            .build()
532            .unwrap();
533
534        let res = client
535            .post(format!("{endpoint}/api/v2/setup"))
536            .json(&body)
537            .header("accept", "application/json")
538            .send()
539            .await
540            .unwrap();
541
542        let status = res.status();
543
544        assert!(
545            status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
546            "UnexpectedStatus: {status}"
547        );
548    }
549
550    pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
551        strip_timestamp(timestamp.to_rfc3339_opts(format, true))
552    }
553
554    // InfluxDB strips off trailing zeros in timestamps in metrics
555    fn strip_timestamp(timestamp: String) -> String {
556        #[expect(
557            clippy::string_slice,
558            reason = "last two bytes are always ASCII ('0Z' or '.Z'), guaranteed char boundaries"
559        )]
560        let strip_one = || format!("{}Z", &timestamp[..timestamp.len() - 2]);
561        match timestamp {
562            _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
563            _ if timestamp.ends_with(".Z") => strip_one(),
564            _ => timestamp,
565        }
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use serde::{Deserialize, Serialize};
572
573    use super::*;
574    use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
575
576    #[derive(Deserialize, Serialize, Debug, Clone, Default)]
577    #[serde(deny_unknown_fields)]
578    pub struct InfluxDbTestConfig {
579        #[serde(flatten)]
580        pub influxdb1_settings: Option<InfluxDb1Settings>,
581        #[serde(flatten)]
582        pub influxdb2_settings: Option<InfluxDb2Settings>,
583    }
584
585    #[test]
586    fn test_influxdb_settings_both() {
587        let config = indoc::indoc! {r#"
588        bucket: "my-bucket"
589        org: "my-org"
590        token: "my-token"
591        database: "my-database"
592        "#};
593        let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
594        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
595        assert_eq!(
596            settings.expect_err("expected error").to_string(),
597            "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
598        );
599    }
600
601    #[test]
602    fn test_influxdb_settings_missing() {
603        let config = "{}";
604        let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
605        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
606        assert_eq!(
607            settings.expect_err("expected error").to_string(),
608            "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
609        );
610    }
611
612    #[test]
613    fn test_influxdb1_settings() {
614        let config = indoc::indoc! {r#"
615        database: "my-database"
616        "#};
617        let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
618        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
619    }
620
621    #[test]
622    fn test_influxdb2_settings() {
623        let config = indoc::indoc! {r#"
624        bucket: "my-bucket"
625        org: "my-org"
626        token: "my-token"
627        "#};
628        let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
629        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
630    }
631
632    #[test]
633    fn test_influxdb1_test_write_uri() {
634        let settings = InfluxDb1Settings {
635            consistency: Some("quorum".to_owned()),
636            database: "vector_db".to_owned(),
637            retention_policy_name: Some("autogen".to_owned()),
638            username: Some("writer".to_owned()),
639            password: Some("secret".to_owned().into()),
640        };
641
642        let uri = settings
643            .write_uri("http://localhost:8086".to_owned())
644            .unwrap();
645        assert_eq!(
646            "http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns",
647            uri.to_string()
648        )
649    }
650
651    #[test]
652    fn test_influxdb2_test_write_uri() {
653        let settings = InfluxDb2Settings {
654            org: "my-org".to_owned(),
655            bucket: "my-bucket".to_owned(),
656            token: "my-token".to_owned().into(),
657        };
658
659        let uri = settings
660            .write_uri("http://localhost:9999".to_owned())
661            .unwrap();
662        assert_eq!(
663            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
664            uri.to_string()
665        )
666    }
667
668    #[test]
669    fn test_influxdb1_test_healthcheck_uri() {
670        let settings = InfluxDb1Settings {
671            consistency: Some("quorum".to_owned()),
672            database: "vector_db".to_owned(),
673            retention_policy_name: Some("autogen".to_owned()),
674            username: Some("writer".to_owned()),
675            password: Some("secret".to_owned().into()),
676        };
677
678        let uri = settings
679            .healthcheck_uri("http://localhost:8086".to_owned())
680            .unwrap();
681        assert_eq!("http://localhost:8086/ping", uri.to_string())
682    }
683
684    #[test]
685    fn test_influxdb2_test_healthcheck_uri() {
686        let settings = InfluxDb2Settings {
687            org: "my-org".to_owned(),
688            bucket: "my-bucket".to_owned(),
689            token: "my-token".to_owned().into(),
690        };
691
692        let uri = settings
693            .healthcheck_uri("http://localhost:9999".to_owned())
694            .unwrap();
695        assert_eq!("http://localhost:9999/ping", uri.to_string())
696    }
697
698    #[test]
699    fn test_encode_tags() {
700        let mut value = BytesMut::new();
701        encode_tags(tags(), &mut value);
702
703        assert_eq!(value, "normal_tag=value,true_tag=true");
704
705        let tags_to_escape = vec![
706            ("tag".to_owned(), "val=ue".to_owned()),
707            ("name escape".to_owned(), "true".to_owned()),
708            ("value_escape".to_owned(), "value escape".to_owned()),
709            ("a_first_place".to_owned(), "10".to_owned()),
710        ]
711        .into_iter()
712        .collect();
713
714        let mut value = BytesMut::new();
715        encode_tags(tags_to_escape, &mut value);
716        assert_eq!(
717            value,
718            "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
719        );
720    }
721
722    #[test]
723    fn tags_order() {
724        let mut value = BytesMut::new();
725        encode_tags(
726            vec![
727                ("a", "value"),
728                ("b", "value"),
729                ("c", "value"),
730                ("d", "value"),
731                ("e", "value"),
732            ]
733            .into_iter()
734            .map(|(k, v)| (k.to_owned(), v.to_owned()))
735            .collect(),
736            &mut value,
737        );
738        assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
739    }
740
741    #[test]
742    fn test_encode_fields_v1() {
743        let fields = vec![
744            ("field_string".into(), Field::String("string value".into())),
745            (
746                "field_string_escape".into(),
747                Field::String("string\\val\"ue".into()),
748            ),
749            ("field_float".into(), Field::Float(123.45)),
750            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
751            ("field_int".into(), Field::Int(657646)),
752            ("field_bool_true".into(), Field::Bool(true)),
753            ("field_bool_false".into(), Field::Bool(false)),
754            ("escape key".into(), Field::Float(10.0)),
755        ]
756        .into_iter()
757        .collect();
758
759        let mut value = BytesMut::new();
760        encode_fields(ProtocolVersion::V1, fields, &mut value);
761        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
762        assert_fields(
763            value,
764            [
765                "escape\\ key=10",
766                "field_float=123.45",
767                "field_string=\"string value\"",
768                "field_string_escape=\"string\\\\val\\\"ue\"",
769                "field_unsigned_int=657i",
770                "field_int=657646i",
771                "field_bool_true=true",
772                "field_bool_false=false",
773            ]
774            .to_vec(),
775        )
776    }
777
778    #[test]
779    fn test_encode_fields() {
780        let fields = vec![
781            ("field_string".into(), Field::String("string value".into())),
782            (
783                "field_string_escape".into(),
784                Field::String("string\\val\"ue".into()),
785            ),
786            ("field_float".into(), Field::Float(123.45)),
787            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
788            ("field_int".into(), Field::Int(657646)),
789            ("field_bool_true".into(), Field::Bool(true)),
790            ("field_bool_false".into(), Field::Bool(false)),
791            ("escape key".into(), Field::Float(10.0)),
792        ]
793        .into_iter()
794        .collect();
795
796        let mut value = BytesMut::new();
797        encode_fields(ProtocolVersion::V2, fields, &mut value);
798        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
799        assert_fields(
800            value,
801            [
802                "escape\\ key=10",
803                "field_float=123.45",
804                "field_string=\"string value\"",
805                "field_string_escape=\"string\\\\val\\\"ue\"",
806                "field_unsigned_int=657u",
807                "field_int=657646i",
808                "field_bool_true=true",
809                "field_bool_false=false",
810            ]
811            .to_vec(),
812        )
813    }
814
815    #[test]
816    fn test_encode_string() {
817        let mut value = BytesMut::new();
818        encode_string("measurement_name", &mut value);
819        assert_eq!(value, "measurement_name");
820
821        let mut value = BytesMut::new();
822        encode_string("measurement name", &mut value);
823        assert_eq!(value, "measurement\\ name");
824
825        let mut value = BytesMut::new();
826        encode_string("measurement=name", &mut value);
827        assert_eq!(value, "measurement\\=name");
828
829        let mut value = BytesMut::new();
830        encode_string("measurement,name", &mut value);
831        assert_eq!(value, "measurement\\,name");
832    }
833
834    #[test]
835    fn test_encode_timestamp() {
836        let start = Utc::now()
837            .timestamp_nanos_opt()
838            .expect("Timestamp out of range");
839        assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
840        assert!(encode_timestamp(None) >= start)
841    }
842
843    #[test]
844    fn test_encode_uri_valid() {
845        let uri = encode_uri(
846            "http://localhost:9999",
847            "api/v2/write",
848            &[
849                ("org", Some("my-org".to_owned())),
850                ("bucket", Some("my-bucket".to_owned())),
851                ("precision", Some("ns".to_owned())),
852            ],
853        )
854        .unwrap();
855        assert_eq!(
856            uri,
857            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
858        );
859
860        let uri = encode_uri(
861            "http://localhost:9999/",
862            "api/v2/write",
863            &[
864                ("org", Some("my-org".to_owned())),
865                ("bucket", Some("my-bucket".to_owned())),
866            ],
867        )
868        .unwrap();
869        assert_eq!(
870            uri,
871            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
872        );
873
874        let uri = encode_uri(
875            "http://localhost:9999",
876            "api/v2/write",
877            &[
878                ("org", Some("Organization name".to_owned())),
879                ("bucket", Some("Bucket=name".to_owned())),
880                ("none", None),
881            ],
882        )
883        .unwrap();
884        assert_eq!(
885            uri,
886            "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
887        );
888    }
889
890    #[test]
891    fn test_encode_uri_invalid() {
892        encode_uri(
893            "localhost:9999",
894            "api/v2/write",
895            &[
896                ("org", Some("my-org".to_owned())),
897                ("bucket", Some("my-bucket".to_owned())),
898            ],
899        )
900        .unwrap_err();
901    }
902}
903
904#[cfg(feature = "influxdb-integration-tests")]
905#[cfg(test)]
906mod integration_tests {
907    use crate::{
908        config::ProxyConfig,
909        http::HttpClient,
910        sinks::influxdb::{
911            InfluxDb1Settings, InfluxDb2Settings, healthcheck,
912            test_util::{BUCKET, ORG, TOKEN, address_v1, address_v2, next_database, onboarding_v2},
913        },
914    };
915
916    #[tokio::test]
917    async fn influxdb2_healthchecks_ok() {
918        let endpoint = address_v2();
919        onboarding_v2(&endpoint).await;
920
921        let endpoint = address_v2();
922        let influxdb1_settings = None;
923        let influxdb2_settings = Some(InfluxDb2Settings {
924            org: ORG.to_string(),
925            bucket: BUCKET.to_string(),
926            token: TOKEN.to_string().into(),
927        });
928        let proxy = ProxyConfig::default();
929        let client = HttpClient::new(None, &proxy).unwrap();
930
931        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
932            .unwrap()
933            .await
934            .unwrap()
935    }
936
937    #[tokio::test]
938    #[should_panic]
939    async fn influxdb2_healthchecks_fail() {
940        let endpoint = "http://127.0.0.1:9999".to_string();
941        onboarding_v2(&endpoint).await;
942
943        let influxdb1_settings = None;
944        let influxdb2_settings = Some(InfluxDb2Settings {
945            org: ORG.to_string(),
946            bucket: BUCKET.to_string(),
947            token: TOKEN.to_string().into(),
948        });
949        let proxy = ProxyConfig::default();
950        let client = HttpClient::new(None, &proxy).unwrap();
951
952        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
953            .unwrap()
954            .await
955            .unwrap();
956    }
957
958    #[tokio::test]
959    async fn influxdb1_healthchecks_ok() {
960        let endpoint = address_v1(false);
961
962        let influxdb1_settings = Some(InfluxDb1Settings {
963            database: next_database(),
964            consistency: None,
965            retention_policy_name: None,
966            username: None,
967            password: None,
968        });
969        let influxdb2_settings = None;
970        let proxy = ProxyConfig::default();
971        let client = HttpClient::new(None, &proxy).unwrap();
972
973        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
974            .unwrap()
975            .await
976            .unwrap();
977    }
978
979    #[tokio::test]
980    #[should_panic]
981    async fn influxdb1_healthchecks_fail() {
982        let endpoint = "http://127.0.0.1:8086".to_string();
983        let influxdb1_settings = Some(InfluxDb1Settings {
984            database: next_database(),
985            consistency: None,
986            retention_policy_name: None,
987            username: None,
988            password: None,
989        });
990        let influxdb2_settings = None;
991        let proxy = ProxyConfig::default();
992        let client = HttpClient::new(None, &proxy).unwrap();
993
994        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
995            .unwrap()
996            .await
997            .unwrap();
998    }
999}