1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::{
13 configurable::configurable_component,
14 event::{KeyString, MetricTags},
15 sensitive_string::SensitiveString,
16};
17
18use crate::http::HttpClient;
19
20pub(in crate::sinks) enum Field {
21 String(String),
23 Float(f64),
25 UnsignedInt(u64),
29 Int(i64),
31 Bool(bool),
33}
34
35#[derive(Clone, Copy, Debug)]
36pub(in crate::sinks) enum ProtocolVersion {
37 V1,
38 V2,
39}
40
41#[derive(Debug, Snafu)]
42enum ConfigError {
43 #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
44 MissingConfiguration,
45 #[snafu(display(
46 "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
47 v1_settings,
48 v2_settings
49 ))]
50 BothConfiguration {
51 v1_settings: InfluxDb1Settings,
52 v2_settings: InfluxDb2Settings,
53 },
54}
55
56#[configurable_component]
58#[derive(Clone, Debug)]
59pub struct InfluxDb1Settings {
60 #[configurable(metadata(docs::examples = "vector-database"))]
64 #[configurable(metadata(docs::examples = "iot-store"))]
65 database: String,
66
67 #[configurable(metadata(docs::examples = "any"))]
71 #[configurable(metadata(docs::examples = "one"))]
72 #[configurable(metadata(docs::examples = "quorum"))]
73 #[configurable(metadata(docs::examples = "all"))]
74 consistency: Option<String>,
75
76 #[configurable(metadata(docs::examples = "autogen"))]
80 #[configurable(metadata(docs::examples = "one_day_only"))]
81 retention_policy_name: Option<String>,
82
83 #[configurable(metadata(docs::examples = "todd"))]
87 #[configurable(metadata(docs::examples = "vector-source"))]
88 username: Option<String>,
89
90 #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
94 #[configurable(metadata(docs::examples = "influxdb4ever"))]
95 password: Option<SensitiveString>,
96}
97
98#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct InfluxDb2Settings {
102 #[configurable(metadata(docs::examples = "my-org"))]
106 #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
107 org: String,
108
109 #[configurable(metadata(docs::examples = "vector-bucket"))]
113 #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
114 bucket: String,
115
116 #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
122 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
123 token: SensitiveString,
124}
125
126trait InfluxDbSettings: std::fmt::Debug {
127 fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
128 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
129 fn token(&self) -> SensitiveString;
130 fn protocol_version(&self) -> ProtocolVersion;
131}
132
133impl InfluxDbSettings for InfluxDb1Settings {
134 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
135 encode_uri(
136 &endpoint,
137 "write",
138 &[
139 ("consistency", self.consistency.clone()),
140 ("db", Some(self.database.clone())),
141 ("rp", self.retention_policy_name.clone()),
142 ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
143 ("u", self.username.clone()),
144 ("precision", Some("ns".to_owned())),
145 ],
146 )
147 }
148
149 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
150 encode_uri(&endpoint, "ping", &[])
151 }
152
153 fn token(&self) -> SensitiveString {
154 SensitiveString::default()
155 }
156
157 fn protocol_version(&self) -> ProtocolVersion {
158 ProtocolVersion::V1
159 }
160}
161
162impl InfluxDbSettings for InfluxDb2Settings {
163 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
164 encode_uri(
165 &endpoint,
166 "api/v2/write",
167 &[
168 ("org", Some(self.org.clone())),
169 ("bucket", Some(self.bucket.clone())),
170 ("precision", Some("ns".to_owned())),
171 ],
172 )
173 }
174
175 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
176 encode_uri(&endpoint, "ping", &[])
177 }
178
179 fn token(&self) -> SensitiveString {
180 self.token.clone()
181 }
182
183 fn protocol_version(&self) -> ProtocolVersion {
184 ProtocolVersion::V2
185 }
186}
187
188fn influxdb_settings(
189 influxdb1_settings: Option<InfluxDb1Settings>,
190 influxdb2_settings: Option<InfluxDb2Settings>,
191) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
192 match (influxdb1_settings, influxdb2_settings) {
193 (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
194 v1_settings,
195 v2_settings,
196 }
197 .into()),
198 (None, None) => Err(ConfigError::MissingConfiguration.into()),
199 (Some(settings), _) => Ok(Box::new(settings)),
200 (_, Some(settings)) => Ok(Box::new(settings)),
201 }
202}
203
204fn healthcheck(
207 endpoint: String,
208 influxdb1_settings: Option<InfluxDb1Settings>,
209 influxdb2_settings: Option<InfluxDb2Settings>,
210 mut client: HttpClient,
211) -> crate::Result<super::Healthcheck> {
212 let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
213
214 let uri = settings.healthcheck_uri(endpoint)?;
215
216 let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
217
218 Ok(async move {
219 client
220 .call(request)
221 .await
222 .map_err(|error| error.into())
223 .and_then(|response| match response.status() {
224 StatusCode::OK => Ok(()),
225 StatusCode::NO_CONTENT => Ok(()),
226 other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
227 })
228 }
229 .boxed())
230}
231
232pub(in crate::sinks) fn influx_line_protocol(
234 protocol_version: ProtocolVersion,
235 measurement: &str,
236 tags: Option<MetricTags>,
237 fields: Option<HashMap<KeyString, Field>>,
238 timestamp: i64,
239 line_protocol: &mut BytesMut,
240) -> Result<(), &'static str> {
241 let unwrapped_fields = fields.unwrap_or_default();
243 if unwrapped_fields.is_empty() {
245 return Err("fields must not be empty");
246 }
247
248 encode_string(measurement, line_protocol);
249
250 let unwrapped_tags = tags.unwrap_or_default();
252 if !unwrapped_tags.is_empty() {
253 line_protocol.put_u8(b',');
254 encode_tags(unwrapped_tags, line_protocol);
255 }
256 line_protocol.put_u8(b' ');
257
258 encode_fields(protocol_version, unwrapped_fields, line_protocol);
260 line_protocol.put_u8(b' ');
261
262 line_protocol.put_slice(×tamp.to_string().into_bytes());
264 line_protocol.put_u8(b'\n');
265 Ok(())
266}
267
268fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
269 let original_len = output.len();
270 for (key, value) in tags.iter_single() {
272 if key.is_empty() || value.is_empty() {
273 continue;
274 }
275 encode_string(key, output);
276 output.put_u8(b'=');
277 encode_string(value, output);
278 output.put_u8(b',');
279 }
280
281 if output.len() > original_len {
283 output.truncate(output.len() - 1);
284 }
285}
286
287fn encode_fields(
288 protocol_version: ProtocolVersion,
289 fields: HashMap<KeyString, Field>,
290 output: &mut BytesMut,
291) {
292 let original_len = output.len();
293 for (key, value) in fields.into_iter() {
294 encode_string(&key, output);
295 output.put_u8(b'=');
296 match value {
297 Field::String(s) => {
298 output.put_u8(b'"');
299 for c in s.chars() {
300 if "\\\"".contains(c) {
301 output.put_u8(b'\\');
302 }
303 let mut c_buffer: [u8; 4] = [0; 4];
304 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
305 }
306 output.put_u8(b'"');
307 }
308 Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
309 Field::UnsignedInt(i) => {
310 output.put_slice(&i.to_string().into_bytes());
311 let c = match protocol_version {
312 ProtocolVersion::V1 => 'i',
313 ProtocolVersion::V2 => 'u',
314 };
315 let mut c_buffer: [u8; 4] = [0; 4];
316 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
317 }
318 Field::Int(i) => {
319 output.put_slice(&i.to_string().into_bytes());
320 output.put_u8(b'i');
321 }
322 Field::Bool(b) => {
323 output.put_slice(&b.to_string().into_bytes());
324 }
325 };
326 output.put_u8(b',');
327 }
328
329 if output.len() > original_len {
331 output.truncate(output.len() - 1);
332 }
333}
334
335fn encode_string(key: &str, output: &mut BytesMut) {
336 for c in key.chars() {
337 if "\\, =".contains(c) {
338 output.put_u8(b'\\');
339 }
340 let mut c_buffer: [u8; 4] = [0; 4];
341 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
342 }
343}
344
345pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
346 if let Some(ts) = timestamp {
347 ts.timestamp_nanos_opt().unwrap()
348 } else {
349 encode_timestamp(Some(Utc::now()))
350 }
351}
352
353pub(in crate::sinks) fn encode_uri(
354 endpoint: &str,
355 path: &str,
356 pairs: &[(&str, Option<String>)],
357) -> crate::Result<Uri> {
358 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
359
360 for pair in pairs {
361 if let Some(v) = &pair.1 {
362 serializer.append_pair(pair.0, v);
363 }
364 }
365
366 let mut url = if endpoint.ends_with('/') {
367 format!("{}{}?{}", endpoint, path, serializer.finish())
368 } else {
369 format!("{}/{}?{}", endpoint, path, serializer.finish())
370 };
371
372 if url.ends_with('?') {
373 url.pop();
374 }
375
376 Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
377}
378
379#[cfg(test)]
380#[allow(dead_code)]
381pub mod test_util {
382 use std::{fs::File, io::Read};
383
384 use chrono::{DateTime, SecondsFormat, Timelike, Utc, offset::TimeZone};
385 use vector_lib::metric_tags;
386
387 use super::*;
388 use crate::tls;
389
390 pub(crate) const ORG: &str = "my-org";
391 pub(crate) const BUCKET: &str = "my-bucket";
392 pub(crate) const TOKEN: &str = "my-token";
393
394 pub(crate) fn next_database() -> String {
395 format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
396 }
397
398 pub(crate) fn ts() -> DateTime<Utc> {
399 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
400 .single()
401 .and_then(|t| t.with_nanosecond(11))
402 .expect("invalid timestamp")
403 }
404
405 pub(crate) fn tags() -> MetricTags {
406 metric_tags!(
407 "normal_tag" => "value",
408 "true_tag" => "true",
409 "empty_tag" => "",
410 )
411 }
412
413 pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
414 let encoded_fields: Vec<&str> = value.split(',').collect();
415
416 assert_eq!(fields.len(), encoded_fields.len());
417
418 for field in fields.into_iter() {
419 assert!(
420 encoded_fields.contains(&field),
421 "Fields: {value} has to have: {field}"
422 )
423 }
424 }
425
426 pub(crate) fn address_v1(secure: bool) -> String {
427 if secure {
428 std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
429 .unwrap_or_else(|_| "http://localhost:8087".into())
430 } else {
431 std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
432 .unwrap_or_else(|_| "http://localhost:8086".into())
433 }
434 }
435
436 pub(crate) fn address_v2() -> String {
437 std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
438 }
439
440 pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
450 let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
451 let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
453 let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
454
455 (measurement, tags, fields.to_string(), ts)
456 }
457
458 fn client() -> reqwest::Client {
459 let mut test_ca = Vec::<u8>::new();
460 File::open(tls::TEST_PEM_CA_PATH)
461 .unwrap()
462 .read_to_end(&mut test_ca)
463 .unwrap();
464 let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
465
466 reqwest::Client::builder()
467 .add_root_certificate(test_ca)
468 .build()
469 .unwrap()
470 }
471
472 pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
473 client()
474 .get(format!("{endpoint}/query"))
475 .query(&[("q", query)])
476 .send()
477 .await
478 .unwrap()
479 }
480
481 pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
482 let database = next_database();
483 let status = query_v1(endpoint, &format!("create database {database}"))
484 .await
485 .status();
486 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
487 crate::test_util::wait_for(|| {
492 let write_url = format!("{}/write?db={}", endpoint, &database);
493 async move {
494 match client()
495 .post(&write_url)
496 .header("Content-Type", "text/plain")
497 .header("Authorization", &format!("Token {TOKEN}"))
498 .body("")
499 .send()
500 .await
501 .unwrap()
502 .status()
503 {
504 http::StatusCode::NO_CONTENT => true,
505 http::StatusCode::NOT_FOUND => false,
506 status => panic!("Unexpected status: {status}"),
507 }
508 }
509 })
510 .await;
511 database
512 }
513
514 pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
515 let status = query_v1(endpoint, &format!("drop database {database}"))
516 .await
517 .status();
518 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
519 }
520
521 pub(crate) async fn onboarding_v2(endpoint: &str) {
522 let mut body = std::collections::HashMap::new();
523 body.insert("username", "my-user");
524 body.insert("password", "my-password");
525 body.insert("org", ORG);
526 body.insert("bucket", BUCKET);
527 body.insert("token", TOKEN);
528
529 let client = reqwest::Client::builder()
530 .danger_accept_invalid_certs(true)
531 .build()
532 .unwrap();
533
534 let res = client
535 .post(format!("{endpoint}/api/v2/setup"))
536 .json(&body)
537 .header("accept", "application/json")
538 .send()
539 .await
540 .unwrap();
541
542 let status = res.status();
543
544 assert!(
545 status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
546 "UnexpectedStatus: {status}"
547 );
548 }
549
550 pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
551 strip_timestamp(timestamp.to_rfc3339_opts(format, true))
552 }
553
554 fn strip_timestamp(timestamp: String) -> String {
556 #[expect(
557 clippy::string_slice,
558 reason = "last two bytes are always ASCII ('0Z' or '.Z'), guaranteed char boundaries"
559 )]
560 let strip_one = || format!("{}Z", ×tamp[..timestamp.len() - 2]);
561 match timestamp {
562 _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
563 _ if timestamp.ends_with(".Z") => strip_one(),
564 _ => timestamp,
565 }
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use serde::{Deserialize, Serialize};
572
573 use super::*;
574 use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
575
576 #[derive(Deserialize, Serialize, Debug, Clone, Default)]
577 #[serde(deny_unknown_fields)]
578 pub struct InfluxDbTestConfig {
579 #[serde(flatten)]
580 pub influxdb1_settings: Option<InfluxDb1Settings>,
581 #[serde(flatten)]
582 pub influxdb2_settings: Option<InfluxDb2Settings>,
583 }
584
585 #[test]
586 fn test_influxdb_settings_both() {
587 let config = indoc::indoc! {r#"
588 bucket: "my-bucket"
589 org: "my-org"
590 token: "my-token"
591 database: "my-database"
592 "#};
593 let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
594 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
595 assert_eq!(
596 settings.expect_err("expected error").to_string(),
597 "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
598 );
599 }
600
601 #[test]
602 fn test_influxdb_settings_missing() {
603 let config = "{}";
604 let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
605 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
606 assert_eq!(
607 settings.expect_err("expected error").to_string(),
608 "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
609 );
610 }
611
612 #[test]
613 fn test_influxdb1_settings() {
614 let config = indoc::indoc! {r#"
615 database: "my-database"
616 "#};
617 let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
618 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
619 }
620
621 #[test]
622 fn test_influxdb2_settings() {
623 let config = indoc::indoc! {r#"
624 bucket: "my-bucket"
625 org: "my-org"
626 token: "my-token"
627 "#};
628 let config: InfluxDbTestConfig = serde_yaml::from_str(config).unwrap();
629 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
630 }
631
632 #[test]
633 fn test_influxdb1_test_write_uri() {
634 let settings = InfluxDb1Settings {
635 consistency: Some("quorum".to_owned()),
636 database: "vector_db".to_owned(),
637 retention_policy_name: Some("autogen".to_owned()),
638 username: Some("writer".to_owned()),
639 password: Some("secret".to_owned().into()),
640 };
641
642 let uri = settings
643 .write_uri("http://localhost:8086".to_owned())
644 .unwrap();
645 assert_eq!(
646 "http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns",
647 uri.to_string()
648 )
649 }
650
651 #[test]
652 fn test_influxdb2_test_write_uri() {
653 let settings = InfluxDb2Settings {
654 org: "my-org".to_owned(),
655 bucket: "my-bucket".to_owned(),
656 token: "my-token".to_owned().into(),
657 };
658
659 let uri = settings
660 .write_uri("http://localhost:9999".to_owned())
661 .unwrap();
662 assert_eq!(
663 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
664 uri.to_string()
665 )
666 }
667
668 #[test]
669 fn test_influxdb1_test_healthcheck_uri() {
670 let settings = InfluxDb1Settings {
671 consistency: Some("quorum".to_owned()),
672 database: "vector_db".to_owned(),
673 retention_policy_name: Some("autogen".to_owned()),
674 username: Some("writer".to_owned()),
675 password: Some("secret".to_owned().into()),
676 };
677
678 let uri = settings
679 .healthcheck_uri("http://localhost:8086".to_owned())
680 .unwrap();
681 assert_eq!("http://localhost:8086/ping", uri.to_string())
682 }
683
684 #[test]
685 fn test_influxdb2_test_healthcheck_uri() {
686 let settings = InfluxDb2Settings {
687 org: "my-org".to_owned(),
688 bucket: "my-bucket".to_owned(),
689 token: "my-token".to_owned().into(),
690 };
691
692 let uri = settings
693 .healthcheck_uri("http://localhost:9999".to_owned())
694 .unwrap();
695 assert_eq!("http://localhost:9999/ping", uri.to_string())
696 }
697
698 #[test]
699 fn test_encode_tags() {
700 let mut value = BytesMut::new();
701 encode_tags(tags(), &mut value);
702
703 assert_eq!(value, "normal_tag=value,true_tag=true");
704
705 let tags_to_escape = vec![
706 ("tag".to_owned(), "val=ue".to_owned()),
707 ("name escape".to_owned(), "true".to_owned()),
708 ("value_escape".to_owned(), "value escape".to_owned()),
709 ("a_first_place".to_owned(), "10".to_owned()),
710 ]
711 .into_iter()
712 .collect();
713
714 let mut value = BytesMut::new();
715 encode_tags(tags_to_escape, &mut value);
716 assert_eq!(
717 value,
718 "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
719 );
720 }
721
722 #[test]
723 fn tags_order() {
724 let mut value = BytesMut::new();
725 encode_tags(
726 vec![
727 ("a", "value"),
728 ("b", "value"),
729 ("c", "value"),
730 ("d", "value"),
731 ("e", "value"),
732 ]
733 .into_iter()
734 .map(|(k, v)| (k.to_owned(), v.to_owned()))
735 .collect(),
736 &mut value,
737 );
738 assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
739 }
740
741 #[test]
742 fn test_encode_fields_v1() {
743 let fields = vec![
744 ("field_string".into(), Field::String("string value".into())),
745 (
746 "field_string_escape".into(),
747 Field::String("string\\val\"ue".into()),
748 ),
749 ("field_float".into(), Field::Float(123.45)),
750 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
751 ("field_int".into(), Field::Int(657646)),
752 ("field_bool_true".into(), Field::Bool(true)),
753 ("field_bool_false".into(), Field::Bool(false)),
754 ("escape key".into(), Field::Float(10.0)),
755 ]
756 .into_iter()
757 .collect();
758
759 let mut value = BytesMut::new();
760 encode_fields(ProtocolVersion::V1, fields, &mut value);
761 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
762 assert_fields(
763 value,
764 [
765 "escape\\ key=10",
766 "field_float=123.45",
767 "field_string=\"string value\"",
768 "field_string_escape=\"string\\\\val\\\"ue\"",
769 "field_unsigned_int=657i",
770 "field_int=657646i",
771 "field_bool_true=true",
772 "field_bool_false=false",
773 ]
774 .to_vec(),
775 )
776 }
777
778 #[test]
779 fn test_encode_fields() {
780 let fields = vec![
781 ("field_string".into(), Field::String("string value".into())),
782 (
783 "field_string_escape".into(),
784 Field::String("string\\val\"ue".into()),
785 ),
786 ("field_float".into(), Field::Float(123.45)),
787 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
788 ("field_int".into(), Field::Int(657646)),
789 ("field_bool_true".into(), Field::Bool(true)),
790 ("field_bool_false".into(), Field::Bool(false)),
791 ("escape key".into(), Field::Float(10.0)),
792 ]
793 .into_iter()
794 .collect();
795
796 let mut value = BytesMut::new();
797 encode_fields(ProtocolVersion::V2, fields, &mut value);
798 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
799 assert_fields(
800 value,
801 [
802 "escape\\ key=10",
803 "field_float=123.45",
804 "field_string=\"string value\"",
805 "field_string_escape=\"string\\\\val\\\"ue\"",
806 "field_unsigned_int=657u",
807 "field_int=657646i",
808 "field_bool_true=true",
809 "field_bool_false=false",
810 ]
811 .to_vec(),
812 )
813 }
814
815 #[test]
816 fn test_encode_string() {
817 let mut value = BytesMut::new();
818 encode_string("measurement_name", &mut value);
819 assert_eq!(value, "measurement_name");
820
821 let mut value = BytesMut::new();
822 encode_string("measurement name", &mut value);
823 assert_eq!(value, "measurement\\ name");
824
825 let mut value = BytesMut::new();
826 encode_string("measurement=name", &mut value);
827 assert_eq!(value, "measurement\\=name");
828
829 let mut value = BytesMut::new();
830 encode_string("measurement,name", &mut value);
831 assert_eq!(value, "measurement\\,name");
832 }
833
834 #[test]
835 fn test_encode_timestamp() {
836 let start = Utc::now()
837 .timestamp_nanos_opt()
838 .expect("Timestamp out of range");
839 assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
840 assert!(encode_timestamp(None) >= start)
841 }
842
843 #[test]
844 fn test_encode_uri_valid() {
845 let uri = encode_uri(
846 "http://localhost:9999",
847 "api/v2/write",
848 &[
849 ("org", Some("my-org".to_owned())),
850 ("bucket", Some("my-bucket".to_owned())),
851 ("precision", Some("ns".to_owned())),
852 ],
853 )
854 .unwrap();
855 assert_eq!(
856 uri,
857 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
858 );
859
860 let uri = encode_uri(
861 "http://localhost:9999/",
862 "api/v2/write",
863 &[
864 ("org", Some("my-org".to_owned())),
865 ("bucket", Some("my-bucket".to_owned())),
866 ],
867 )
868 .unwrap();
869 assert_eq!(
870 uri,
871 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
872 );
873
874 let uri = encode_uri(
875 "http://localhost:9999",
876 "api/v2/write",
877 &[
878 ("org", Some("Organization name".to_owned())),
879 ("bucket", Some("Bucket=name".to_owned())),
880 ("none", None),
881 ],
882 )
883 .unwrap();
884 assert_eq!(
885 uri,
886 "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
887 );
888 }
889
890 #[test]
891 fn test_encode_uri_invalid() {
892 encode_uri(
893 "localhost:9999",
894 "api/v2/write",
895 &[
896 ("org", Some("my-org".to_owned())),
897 ("bucket", Some("my-bucket".to_owned())),
898 ],
899 )
900 .unwrap_err();
901 }
902}
903
904#[cfg(feature = "influxdb-integration-tests")]
905#[cfg(test)]
906mod integration_tests {
907 use crate::{
908 config::ProxyConfig,
909 http::HttpClient,
910 sinks::influxdb::{
911 InfluxDb1Settings, InfluxDb2Settings, healthcheck,
912 test_util::{BUCKET, ORG, TOKEN, address_v1, address_v2, next_database, onboarding_v2},
913 },
914 };
915
916 #[tokio::test]
917 async fn influxdb2_healthchecks_ok() {
918 let endpoint = address_v2();
919 onboarding_v2(&endpoint).await;
920
921 let endpoint = address_v2();
922 let influxdb1_settings = None;
923 let influxdb2_settings = Some(InfluxDb2Settings {
924 org: ORG.to_string(),
925 bucket: BUCKET.to_string(),
926 token: TOKEN.to_string().into(),
927 });
928 let proxy = ProxyConfig::default();
929 let client = HttpClient::new(None, &proxy).unwrap();
930
931 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
932 .unwrap()
933 .await
934 .unwrap()
935 }
936
937 #[tokio::test]
938 #[should_panic]
939 async fn influxdb2_healthchecks_fail() {
940 let endpoint = "http://127.0.0.1:9999".to_string();
941 onboarding_v2(&endpoint).await;
942
943 let influxdb1_settings = None;
944 let influxdb2_settings = Some(InfluxDb2Settings {
945 org: ORG.to_string(),
946 bucket: BUCKET.to_string(),
947 token: TOKEN.to_string().into(),
948 });
949 let proxy = ProxyConfig::default();
950 let client = HttpClient::new(None, &proxy).unwrap();
951
952 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
953 .unwrap()
954 .await
955 .unwrap();
956 }
957
958 #[tokio::test]
959 async fn influxdb1_healthchecks_ok() {
960 let endpoint = address_v1(false);
961
962 let influxdb1_settings = Some(InfluxDb1Settings {
963 database: next_database(),
964 consistency: None,
965 retention_policy_name: None,
966 username: None,
967 password: None,
968 });
969 let influxdb2_settings = None;
970 let proxy = ProxyConfig::default();
971 let client = HttpClient::new(None, &proxy).unwrap();
972
973 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
974 .unwrap()
975 .await
976 .unwrap();
977 }
978
979 #[tokio::test]
980 #[should_panic]
981 async fn influxdb1_healthchecks_fail() {
982 let endpoint = "http://127.0.0.1:8086".to_string();
983 let influxdb1_settings = Some(InfluxDb1Settings {
984 database: next_database(),
985 consistency: None,
986 retention_policy_name: None,
987 username: None,
988 password: None,
989 });
990 let influxdb2_settings = None;
991 let proxy = ProxyConfig::default();
992 let client = HttpClient::new(None, &proxy).unwrap();
993
994 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
995 .unwrap()
996 .await
997 .unwrap();
998 }
999}