1use std::{
2 collections::HashSet,
3 error, fmt,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::Body;
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20 configurable::configurable_component,
21 lookup::{
22 OwnedTargetPath,
23 lookup_v2::{OptionalTargetPath, OwnedSegment},
24 owned_value_path,
25 },
26};
27use vrl::value::{Kind, kind::Collection};
28
29use crate::{
30 config::{
31 DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
32 },
33 cpu_time::spawn_timed,
34 event::Event,
35 http::HttpClient,
36 internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
37 schema,
38 transforms::{TaskTransform, Transform},
39};
40
41const ACCOUNT_ID_KEY: &str = "account-id";
42const AMI_ID_KEY: &str = "ami-id";
43const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
44const INSTANCE_ID_KEY: &str = "instance-id";
45const INSTANCE_TYPE_KEY: &str = "instance-type";
46const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
47const LOCAL_IPV4_KEY: &str = "local-ipv4";
48const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
49const PUBLIC_IPV4_KEY: &str = "public-ipv4";
50const REGION_KEY: &str = "region";
51const SUBNET_ID_KEY: &str = "subnet-id";
52const VPC_ID_KEY: &str = "vpc-id";
53const ROLE_NAME_KEY: &str = "role-name";
54const TAGS_KEY: &str = "tags";
55
56static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
57 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
58static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
59 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
60static LOCAL_IPV4: LazyLock<PathAndQuery> =
61 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
62static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
63 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
64static PUBLIC_IPV4: LazyLock<PathAndQuery> =
65 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
66static ROLE_NAME: LazyLock<PathAndQuery> =
67 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
68static MAC: LazyLock<PathAndQuery> =
69 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
70static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
71 LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
72static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
73 AMI_ID_KEY,
74 AVAILABILITY_ZONE_KEY,
75 INSTANCE_ID_KEY,
76 INSTANCE_TYPE_KEY,
77 LOCAL_HOSTNAME_KEY,
78 LOCAL_IPV4_KEY,
79 PUBLIC_HOSTNAME_KEY,
80 PUBLIC_IPV4_KEY,
81 REGION_KEY,
82 SUBNET_ID_KEY,
83 VPC_ID_KEY,
84 ROLE_NAME_KEY,
85];
86static API_TOKEN: LazyLock<PathAndQuery> =
87 LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
88static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
89
90#[serde_as]
92#[configurable_component(transform(
93 "aws_ec2_metadata",
94 "Parse metadata emitted by AWS EC2 instances."
95))]
96#[derive(Clone, Debug, Derivative)]
97#[derivative(Default)]
98pub struct Ec2Metadata {
99 #[serde(alias = "host", default = "default_endpoint")]
101 #[derivative(Default(value = "default_endpoint()"))]
102 endpoint: String,
103
104 #[configurable(metadata(
106 docs::examples = "",
107 docs::examples = "ec2",
108 docs::examples = "aws.ec2",
109 ))]
110 namespace: Option<OptionalTargetPath>,
111
112 #[serde(default = "default_refresh_interval_secs")]
114 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
115 #[derivative(Default(value = "default_refresh_interval_secs()"))]
116 refresh_interval_secs: Duration,
117
118 #[serde(default = "default_fields")]
120 #[derivative(Default(value = "default_fields()"))]
121 #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
122 fields: Vec<String>,
123
124 #[serde(default = "default_tags")]
126 #[derivative(Default(value = "default_tags()"))]
127 #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
128 tags: Vec<String>,
129
130 #[serde(default = "default_refresh_timeout_secs")]
132 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
133 #[derivative(Default(value = "default_refresh_timeout_secs()"))]
134 refresh_timeout_secs: Duration,
135
136 #[configurable(derived)]
137 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
138 proxy: ProxyConfig,
139
140 #[serde(default = "default_required")]
142 #[derivative(Default(value = "default_required()"))]
143 required: bool,
144}
145
146fn default_endpoint() -> String {
147 String::from("http://169.254.169.254")
148}
149
150const fn default_refresh_interval_secs() -> Duration {
151 Duration::from_secs(10)
152}
153
154const fn default_refresh_timeout_secs() -> Duration {
155 Duration::from_secs(1)
156}
157
158fn default_fields() -> Vec<String> {
159 DEFAULT_FIELD_ALLOWLIST
160 .iter()
161 .map(|s| s.to_string())
162 .collect()
163}
164
165const fn default_tags() -> Vec<String> {
166 Vec::<String>::new()
167}
168
169const fn default_required() -> bool {
170 true
171}
172
173#[derive(Clone, Debug)]
174pub struct Ec2MetadataTransform {
175 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
176}
177
178#[derive(Debug, Clone)]
179struct MetadataKey {
180 log_path: OwnedTargetPath,
181 metric_tag: String,
182}
183
184#[derive(Debug)]
185struct Keys {
186 account_id_key: MetadataKey,
187 ami_id_key: MetadataKey,
188 availability_zone_key: MetadataKey,
189 instance_id_key: MetadataKey,
190 instance_type_key: MetadataKey,
191 local_hostname_key: MetadataKey,
192 local_ipv4_key: MetadataKey,
193 public_hostname_key: MetadataKey,
194 public_ipv4_key: MetadataKey,
195 region_key: MetadataKey,
196 subnet_id_key: MetadataKey,
197 vpc_id_key: MetadataKey,
198 role_name_key: MetadataKey,
199 tags_key: MetadataKey,
200}
201
202impl_generate_config_from_default!(Ec2Metadata);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "aws_ec2_metadata")]
206impl TransformConfig for Ec2Metadata {
207 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
208 let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
209
210 let keys = Keys::new(self.namespace.clone());
211 let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
212 let refresh_interval = self.refresh_interval_secs;
213 let fields = self.fields.clone();
214 let tags = self.tags.clone();
215 let refresh_timeout = self.refresh_timeout_secs;
216 let required = self.required;
217
218 let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
219 let http_client = HttpClient::new(None, &proxy)?;
220
221 let mut client = MetadataClient::new(
222 http_client,
223 host,
224 keys,
225 Arc::clone(&state),
226 refresh_interval,
227 refresh_timeout,
228 fields,
229 tags,
230 );
231
232 if let Err(error) = client.refresh_metadata().await {
234 if required {
235 return Err(error);
236 } else {
237 emit!(AwsEc2MetadataRefreshError { error });
238 }
239 }
240
241 spawn_timed(
246 async move {
247 client.run().await;
248 }
249 .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
251 context.cpu_ns.clone(),
252 );
253
254 Ok(Transform::event_task(Ec2MetadataTransform { state }))
255 }
256
257 fn input(&self) -> Input {
258 Input::new(DataType::Metric | DataType::Log)
259 }
260
261 fn outputs(
262 &self,
263 _: &TransformContext,
264 input_definitions: &[(OutputId, schema::Definition)],
265 ) -> Vec<TransformOutput> {
266 let added_keys = Keys::new(self.namespace.clone());
267
268 let paths = [
269 &added_keys.account_id_key.log_path,
270 &added_keys.ami_id_key.log_path,
271 &added_keys.availability_zone_key.log_path,
272 &added_keys.instance_id_key.log_path,
273 &added_keys.instance_type_key.log_path,
274 &added_keys.local_hostname_key.log_path,
275 &added_keys.local_ipv4_key.log_path,
276 &added_keys.public_hostname_key.log_path,
277 &added_keys.public_ipv4_key.log_path,
278 &added_keys.region_key.log_path,
279 &added_keys.subnet_id_key.log_path,
280 &added_keys.vpc_id_key.log_path,
281 &added_keys.role_name_key.log_path,
282 &added_keys.tags_key.log_path,
283 ];
284
285 let schema_definition = input_definitions
286 .iter()
287 .map(|(output, definition)| {
288 let mut schema_definition = definition.clone();
289
290 if !schema_definition.event_kind().contains_object() {
292 *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
293 }
294
295 for path in paths {
296 schema_definition =
297 schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
298 }
299
300 (output.clone(), schema_definition)
301 })
302 .collect();
303
304 vec![TransformOutput::new(
305 DataType::Metric | DataType::Log,
306 schema_definition,
307 )]
308 }
309}
310
311impl TaskTransform<Event> for Ec2MetadataTransform {
312 fn transform(
313 self: Box<Self>,
314 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
315 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
316 where
317 Self: 'static,
318 {
319 let mut inner = self;
320 Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
321 }
322}
323
324impl Ec2MetadataTransform {
325 fn transform_one(&mut self, mut event: Event) -> Event {
326 let state = self.state.load();
327 match event {
328 Event::Log(ref mut log) => {
329 state.iter().for_each(|(k, v)| {
330 log.insert(&k.log_path, v.clone());
331 });
332 }
333 Event::Metric(ref mut metric) => {
334 state.iter().for_each(|(k, v)| {
335 metric
336 .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
337 });
338 }
339 Event::Trace(_) => panic!("Traces are not supported."),
340 }
341 event
342 }
343}
344
345struct MetadataClient {
346 client: HttpClient<Body>,
347 host: Uri,
348 token: Option<(Bytes, Instant)>,
349 keys: Keys,
350 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
351 refresh_interval: Duration,
352 refresh_timeout: Duration,
353 fields: HashSet<String>,
354 tags: HashSet<String>,
355}
356
357#[derive(Debug, Deserialize)]
358#[serde(rename_all = "camelCase")]
359#[allow(dead_code)] struct IdentityDocument {
361 account_id: String,
362 architecture: String,
363 image_id: String,
364 instance_id: String,
365 instance_type: String,
366 private_ip: String,
367 region: String,
368 version: String,
369}
370
371impl MetadataClient {
372 #[allow(clippy::too_many_arguments)]
373 pub fn new(
374 client: HttpClient<Body>,
375 host: Uri,
376 keys: Keys,
377 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
378 refresh_interval: Duration,
379 refresh_timeout: Duration,
380 fields: Vec<String>,
381 tags: Vec<String>,
382 ) -> Self {
383 Self {
384 client,
385 host,
386 token: None,
387 keys,
388 state,
389 refresh_interval,
390 refresh_timeout,
391 fields: fields.into_iter().collect(),
392 tags: tags.into_iter().collect(),
393 }
394 }
395
396 async fn run(&mut self) {
397 loop {
398 match self.refresh_metadata().await {
399 Ok(_) => {
400 emit!(AwsEc2MetadataRefreshSuccessful);
401 }
402 Err(error) => {
403 emit!(AwsEc2MetadataRefreshError { error });
404 }
405 }
406
407 sleep(self.refresh_interval).await;
408 }
409 }
410
411 pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
412 if let Some((token, next_refresh)) = self.token.clone() {
413 if next_refresh > Instant::now() {
417 return Ok(token);
418 }
419 }
420
421 let mut parts = self.host.clone().into_parts();
422 parts.path_and_query = Some(API_TOKEN.clone());
423 let uri = Uri::from_parts(parts)?;
424
425 let req = Request::put(uri)
426 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
427 .body(Body::empty())?;
428
429 let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
430 .await?
431 .map_err(crate::Error::from)
432 .and_then(|res| match res.status() {
433 StatusCode::OK => Ok(res),
434 status_code => Err(UnexpectedHttpStatusError {
435 status: status_code,
436 }
437 .into()),
438 })?;
439
440 let token = http_body::Body::collect(res.into_body()).await?.to_bytes();
441
442 let next_refresh = Instant::now() + Duration::from_secs(21600);
443 self.token = Some((token.clone(), next_refresh));
444
445 Ok(token)
446 }
447
448 pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
449 self.get_metadata(&DYNAMIC_DOCUMENT)
450 .await?
451 .map(|body| {
452 serde_json::from_slice(&body[..])
453 .context(ParseIdentityDocumentSnafu {})
454 .map_err(Into::into)
455 })
456 .transpose()
457 }
458
459 pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
460 let mut new_state = vec![];
461
462 if let Some(document) = self.get_document().await? {
464 if self.fields.contains(ACCOUNT_ID_KEY) {
465 new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
466 }
467
468 if self.fields.contains(AMI_ID_KEY) {
469 new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
470 }
471
472 if self.fields.contains(INSTANCE_ID_KEY) {
473 new_state.push((
474 self.keys.instance_id_key.clone(),
475 document.instance_id.into(),
476 ));
477 }
478
479 if self.fields.contains(INSTANCE_TYPE_KEY) {
480 new_state.push((
481 self.keys.instance_type_key.clone(),
482 document.instance_type.into(),
483 ));
484 }
485
486 if self.fields.contains(REGION_KEY) {
487 new_state.push((self.keys.region_key.clone(), document.region.into()));
488 }
489
490 if self.fields.contains(AVAILABILITY_ZONE_KEY)
491 && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
492 {
493 new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
494 }
495
496 if self.fields.contains(LOCAL_HOSTNAME_KEY)
497 && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
498 {
499 new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
500 }
501
502 if self.fields.contains(LOCAL_IPV4_KEY)
503 && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
504 {
505 new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
506 }
507
508 if self.fields.contains(PUBLIC_HOSTNAME_KEY)
509 && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
510 {
511 new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
512 }
513
514 if self.fields.contains(PUBLIC_IPV4_KEY)
515 && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
516 {
517 new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
518 }
519
520 if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
521 && let Some(mac) = self.get_metadata(&MAC).await?
522 {
523 let mac = String::from_utf8_lossy(&mac[..]);
524
525 if self.fields.contains(SUBNET_ID_KEY) {
526 let subnet_path =
527 format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
528
529 let subnet_path = subnet_path.parse().context(ParsePathSnafu {
530 value: subnet_path.clone(),
531 })?;
532
533 if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
534 new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
535 }
536 }
537
538 if self.fields.contains(VPC_ID_KEY) {
539 let vpc_path =
540 format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
541
542 let vpc_path = vpc_path.parse().context(ParsePathSnafu {
543 value: vpc_path.clone(),
544 })?;
545
546 if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
547 new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
548 }
549 }
550 }
551
552 if self.fields.contains(ROLE_NAME_KEY)
553 && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
554 {
555 let role_names = String::from_utf8_lossy(&role_names[..]);
556
557 for (i, role_name) in role_names.lines().enumerate() {
558 new_state.push((
559 MetadataKey {
560 log_path: self
561 .keys
562 .role_name_key
563 .log_path
564 .with_index_appended(i as isize),
565 metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
566 },
567 role_name.to_string().into(),
568 ));
569 }
570 }
571
572 for tag in self.tags.clone() {
573 let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
574
575 let tag_path = tag_path.parse().context(ParsePathSnafu {
576 value: tag_path.clone(),
577 })?;
578
579 if let Some(tag_content) = self.get_metadata(&tag_path).await? {
580 new_state.push((
581 MetadataKey {
582 log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
583 metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
584 },
585 tag_content,
586 ));
587 }
588 }
589
590 self.state.store(Arc::new(new_state));
591 }
592
593 Ok(())
594 }
595
596 async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
597 let token = self
598 .get_token()
599 .await
600 .with_context(|_| FetchTokenSnafu {})?;
601
602 let mut parts = self.host.clone().into_parts();
603
604 parts.path_and_query = Some(path.clone());
605
606 let uri = Uri::from_parts(parts)?;
607
608 debug!(message = "Sending metadata request.", %uri);
609
610 let req = Request::get(uri)
611 .header(TOKEN_HEADER.as_ref(), token.as_ref())
612 .body(Body::empty())?;
613
614 match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
615 .await?
616 .map_err(crate::Error::from)
617 .and_then(|res| match res.status() {
618 StatusCode::OK => Ok(Some(res)),
619 StatusCode::NOT_FOUND => Ok(None),
620 status_code => Err(UnexpectedHttpStatusError {
621 status: status_code,
622 }
623 .into()),
624 })? {
625 Some(res) => {
626 let body = http_body::Body::collect(res.into_body()).await?.to_bytes();
627 Ok(Some(body))
628 }
629 None => Ok(None),
630 }
631 }
632}
633
634fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
639 let mut output = String::new();
640 for segment in &namespace.path.segments {
641 if !output.is_empty() {
642 output += ".";
643 }
644 match segment {
645 OwnedSegment::Field(field) => {
646 output += field;
647 }
648 OwnedSegment::Index(i) => {
649 output += &i.to_string();
650 }
651 }
652 }
653 output
654}
655
656fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
657 if let Some(namespace) = namespace {
658 MetadataKey {
659 log_path: namespace.with_field_appended(key),
660 metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
661 }
662 } else {
663 MetadataKey {
664 log_path: OwnedTargetPath::event(owned_value_path!(key)),
665 metric_tag: key.to_owned(),
666 }
667 }
668}
669
670impl Keys {
671 pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
672 let namespace = namespace.and_then(|namespace| namespace.path);
673
674 Keys {
675 account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
676 ami_id_key: create_key(&namespace, AMI_ID_KEY),
677 availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
678 instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
679 instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
680 local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
681 local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
682 public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
683 public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
684 region_key: create_key(&namespace, REGION_KEY),
685 subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
686 vpc_id_key: create_key(&namespace, VPC_ID_KEY),
687 role_name_key: create_key(&namespace, ROLE_NAME_KEY),
688 tags_key: create_key(&namespace, TAGS_KEY),
689 }
690 }
691}
692
693#[derive(Debug)]
694struct UnexpectedHttpStatusError {
695 status: http::StatusCode,
696}
697
698impl fmt::Display for UnexpectedHttpStatusError {
699 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700 write!(f, "got unexpected status code: {}", self.status)
701 }
702}
703
704impl error::Error for UnexpectedHttpStatusError {}
705
706#[derive(Debug, snafu::Snafu)]
707enum Ec2MetadataError {
708 #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
709 FetchToken { source: crate::Error },
710 #[snafu(display("Unable to parse identity document: {}.", source))]
711 ParseIdentityDocument { source: serde_json::Error },
712 #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
713 ParsePath {
714 value: String,
715 source: http::uri::InvalidUri,
716 },
717}
718
719#[cfg(test)]
720mod test {
721 use vector_lib::lookup::OwnedTargetPath;
722 use vrl::{owned_value_path, value::Kind};
723
724 use crate::{
725 config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
726 transforms::aws_ec2_metadata::Ec2Metadata,
727 };
728
729 #[tokio::test]
730 async fn schema_def_with_string_input() {
731 let transform_config = Ec2Metadata {
732 namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
733 ..Default::default()
734 };
735
736 let input_definition =
737 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
738
739 let mut outputs = transform_config.outputs(
740 &Default::default(),
741 &[(OutputId::dummy(), input_definition)],
742 );
743 assert_eq!(outputs.len(), 1);
744 let output = outputs.pop().unwrap();
745 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
746 assert!(actual_schema_def.event_kind().is_object());
747 }
748}
749
750#[cfg(feature = "aws-ec2-metadata-integration-tests")]
751#[cfg(test)]
752mod integration_tests {
753 use tokio::sync::mpsc;
754 use tokio_stream::wrappers::ReceiverStream;
755 use vector_lib::{
756 assert_event_data_eq,
757 lookup::{
758 PathPrefix, event_path,
759 lookup_v2::{OwnedSegment, OwnedValuePath},
760 },
761 };
762 use vrl::value::{ObjectMap, Value};
763 use warp::Filter;
764
765 use super::*;
766 use crate::{
767 event::{LogEvent, Metric, metric},
768 test_util::{addr::next_addr, components::assert_transform_compliance},
769 transforms::test::create_topology,
770 };
771
772 fn ec2_metadata_address() -> String {
773 std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
774 }
775
776 fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
777 vec![
778 (
779 vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
780 "us-east-1a",
781 ),
782 (
783 vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
784 "192.0.2.54",
785 ),
786 (
787 vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
788 "ec2-192-0-2-54.compute-1.amazonaws.com",
789 ),
790 (
791 vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
792 "172.16.34.43",
793 ),
794 (
795 vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
796 "ip-172-16-34-43.ec2.internal",
797 ),
798 (
799 vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
800 "i-1234567890abcdef0",
801 ),
802 (
803 vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
804 "0123456789",
805 ),
806 (
807 vec![OwnedSegment::field(AMI_ID_KEY)].into(),
808 "ami-0b69ea66ff7391e80",
809 ),
810 (
811 vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
812 "m4.xlarge",
813 ),
814 (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
815 (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
816 (
817 vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
818 "subnet-0ac62554",
819 ),
820 (owned_value_path!("role-name", 0), "baskinc-role"),
821 (owned_value_path!("tags", "Name"), "test-instance"),
822 (owned_value_path!("tags", "Test"), "test-tag"),
823 ]
824 }
825
826 fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
827 vec![
828 (AVAILABILITY_ZONE_KEY, "us-east-1a"),
829 (PUBLIC_IPV4_KEY, "192.0.2.54"),
830 (
831 PUBLIC_HOSTNAME_KEY,
832 "ec2-192-0-2-54.compute-1.amazonaws.com",
833 ),
834 (LOCAL_IPV4_KEY, "172.16.34.43"),
835 (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
836 (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
837 (ACCOUNT_ID_KEY, "0123456789"),
838 (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
839 (INSTANCE_TYPE_KEY, "m4.xlarge"),
840 (REGION_KEY, "us-east-1"),
841 (VPC_ID_KEY, "vpc-d295a6a7"),
842 (SUBNET_ID_KEY, "subnet-0ac62554"),
843 ("role-name[0]", "baskinc-role"),
844 ("tags[Name]", "test-instance"),
845 ("tags[Test]", "test-tag"),
846 ]
847 }
848
849 fn make_metric() -> Metric {
850 Metric::new(
851 "event",
852 metric::MetricKind::Incremental,
853 metric::MetricValue::Counter { value: 1.0 },
854 )
855 }
856
857 #[test]
858 fn generate_config() {
859 crate::test_util::test_generate_config::<Ec2Metadata>();
860 }
861
862 #[tokio::test]
863 async fn enrich_log() {
864 assert_transform_compliance(async {
865 let mut fields = default_fields();
866 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
867
868 let tags = vec![
869 String::from("Name"),
870 String::from("Test"),
871 String::from("MISSING_TAG"),
872 ];
873
874 let transform_config = Ec2Metadata {
875 endpoint: ec2_metadata_address(),
876 fields,
877 tags,
878 ..Default::default()
879 };
880
881 let (tx, rx) = mpsc::channel(1);
882 let (topology, mut out) =
883 create_topology(ReceiverStream::new(rx), transform_config).await;
884
885 sleep(Duration::from_secs(1)).await;
887
888 let log = LogEvent::default();
889 let mut expected_log = log.clone();
890 for (k, v) in expected_log_fields().iter().cloned() {
891 expected_log.insert((PathPrefix::Event, &k), v);
892 }
893
894 tx.send(log.into()).await.unwrap();
895
896 let event = out.recv().await.unwrap();
897 assert_event_data_eq!(event.into_log(), expected_log);
898
899 drop(tx);
900 topology.stop().await;
901 assert_eq!(out.recv().await, None);
902 })
903 .await;
904 }
905
906 #[tokio::test(flavor = "multi_thread")]
907 async fn timeout() {
908 let (_guard, addr) = next_addr();
909
910 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
911 tokio::time::sleep(Duration::from_secs(3)).await;
912 Ok("I waited 3 seconds!")
913 }
914
915 let slow = warp::any().and_then(sleepy);
916 let server = warp::serve(slow).bind(addr);
917 let _server = tokio::spawn(server);
918
919 let config = Ec2Metadata {
920 endpoint: format!("http://{addr}"),
921 refresh_timeout_secs: Duration::from_secs(1),
922 ..Default::default()
923 };
924
925 match config.build(&TransformContext::default()).await {
926 Ok(_) => panic!("expected timeout failure"),
927 Err(err) => assert_eq!(
930 err.to_string(),
931 "Unable to fetch metadata authentication token: deadline has elapsed."
932 ),
933 }
934 }
935
936 #[tokio::test(flavor = "multi_thread")]
938 async fn not_required() {
939 let (_guard, addr) = next_addr();
940
941 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
942 tokio::time::sleep(Duration::from_secs(3)).await;
943 Ok("I waited 3 seconds!")
944 }
945
946 let slow = warp::any().and_then(sleepy);
947 let server = warp::serve(slow).bind(addr);
948 let _server = tokio::spawn(server);
949
950 let config = Ec2Metadata {
951 endpoint: format!("http://{addr}"),
952 refresh_timeout_secs: Duration::from_secs(1),
953 required: false,
954 ..Default::default()
955 };
956
957 assert!(
958 config.build(&TransformContext::default()).await.is_ok(),
959 "expected no failure because 'required' config value set to false"
960 );
961 }
962
963 #[tokio::test]
964 async fn enrich_metric() {
965 assert_transform_compliance(async {
966 let mut fields = default_fields();
967 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
968
969 let tags = vec![
970 String::from("Name"),
971 String::from("Test"),
972 String::from("MISSING_TAG"),
973 ];
974
975 let transform_config = Ec2Metadata {
976 endpoint: ec2_metadata_address(),
977 fields,
978 tags,
979 ..Default::default()
980 };
981
982 let (tx, rx) = mpsc::channel(1);
983 let (topology, mut out) =
984 create_topology(ReceiverStream::new(rx), transform_config).await;
985
986 sleep(Duration::from_secs(1)).await;
988
989 let metric = make_metric();
990 let mut expected_metric = metric.clone();
991 for (k, v) in expected_metric_fields().iter() {
992 expected_metric.replace_tag(k.to_string(), v.to_string());
993 }
994
995 tx.send(metric.into()).await.unwrap();
996
997 let event = out.recv().await.unwrap();
998 assert_event_data_eq!(event.into_metric(), expected_metric);
999
1000 drop(tx);
1001 topology.stop().await;
1002 assert_eq!(out.recv().await, None);
1003 })
1004 .await;
1005 }
1006
1007 #[tokio::test]
1008 async fn fields_log() {
1009 assert_transform_compliance(async {
1010 let transform_config = Ec2Metadata {
1011 endpoint: ec2_metadata_address(),
1012 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1013 tags: vec![
1014 String::from("Name"),
1015 String::from("Test"),
1016 String::from("MISSING_TAG"),
1017 ],
1018 ..Default::default()
1019 };
1020
1021 let (tx, rx) = mpsc::channel(1);
1022 let (topology, mut out) =
1023 create_topology(ReceiverStream::new(rx), transform_config).await;
1024
1025 sleep(Duration::from_secs(1)).await;
1027
1028 let log = LogEvent::default();
1029 let mut expected_log = log.clone();
1030 expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1031 expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1032 expected_log.insert(
1033 format!("\"{TAGS_KEY}\"").as_str(),
1034 ObjectMap::from([
1035 ("Name".into(), Value::from("test-instance")),
1036 ("Test".into(), Value::from("test-tag")),
1037 ]),
1038 );
1039
1040 tx.send(log.into()).await.unwrap();
1041
1042 let event = out.recv().await.unwrap();
1043 assert_event_data_eq!(event.into_log(), expected_log);
1044
1045 drop(tx);
1046 topology.stop().await;
1047 assert_eq!(out.recv().await, None);
1048 })
1049 .await;
1050 }
1051
1052 #[tokio::test]
1053 async fn fields_metric() {
1054 assert_transform_compliance(async {
1055 let transform_config = Ec2Metadata {
1056 endpoint: ec2_metadata_address(),
1057 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1058 tags: vec![
1059 String::from("Name"),
1060 String::from("Test"),
1061 String::from("MISSING_TAG"),
1062 ],
1063 ..Default::default()
1064 };
1065
1066 let (tx, rx) = mpsc::channel(1);
1067 let (topology, mut out) =
1068 create_topology(ReceiverStream::new(rx), transform_config).await;
1069
1070 sleep(Duration::from_secs(1)).await;
1072
1073 let metric = make_metric();
1074 let mut expected_metric = metric.clone();
1075 expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1076 expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1077 expected_metric.replace_tag(
1078 format!("{}[{}]", TAGS_KEY, "Name"),
1079 "test-instance".to_string(),
1080 );
1081 expected_metric
1082 .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1083
1084 tx.send(metric.into()).await.unwrap();
1085
1086 let event = out.recv().await.unwrap();
1087 assert_event_data_eq!(event.into_metric(), expected_metric);
1088
1089 drop(tx);
1090 topology.stop().await;
1091 assert_eq!(out.recv().await, None);
1092 })
1093 .await;
1094 }
1095
1096 #[tokio::test]
1097 async fn namespace_log() {
1098 {
1099 assert_transform_compliance(async {
1100 let transform_config = Ec2Metadata {
1101 endpoint: ec2_metadata_address(),
1102 namespace: Some(
1103 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1104 ),
1105 ..Default::default()
1106 };
1107
1108 let (tx, rx) = mpsc::channel(1);
1109 let (topology, mut out) =
1110 create_topology(ReceiverStream::new(rx), transform_config).await;
1111
1112 sleep(Duration::from_secs(1)).await;
1114
1115 let log = LogEvent::default();
1116
1117 tx.send(log.into()).await.unwrap();
1118
1119 let event = out.recv().await.unwrap();
1120
1121 assert_eq!(
1122 event.as_log().get("ec2.metadata.\"availability-zone\""),
1123 Some(&"us-east-1a".into())
1124 );
1125
1126 drop(tx);
1127 topology.stop().await;
1128 assert_eq!(out.recv().await, None);
1129 })
1130 .await;
1131 }
1132
1133 {
1134 assert_transform_compliance(async {
1135 let transform_config = Ec2Metadata {
1137 endpoint: ec2_metadata_address(),
1138 namespace: Some(OptionalTargetPath::none()),
1139 ..Default::default()
1140 };
1141
1142 let (tx, rx) = mpsc::channel(1);
1143 let (topology, mut out) =
1144 create_topology(ReceiverStream::new(rx), transform_config).await;
1145
1146 sleep(Duration::from_secs(1)).await;
1148
1149 let log = LogEvent::default();
1150
1151 tx.send(log.into()).await.unwrap();
1152
1153 let event = out.recv().await.unwrap();
1154 assert_eq!(
1155 event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1156 Some(&"us-east-1a".into())
1157 );
1158
1159 drop(tx);
1160 topology.stop().await;
1161 assert_eq!(out.recv().await, None);
1162 })
1163 .await;
1164 }
1165 }
1166
1167 #[tokio::test]
1168 async fn namespace_metric() {
1169 {
1170 assert_transform_compliance(async {
1171 let transform_config = Ec2Metadata {
1172 endpoint: ec2_metadata_address(),
1173 namespace: Some(
1174 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1175 ),
1176 ..Default::default()
1177 };
1178
1179 let (tx, rx) = mpsc::channel(1);
1180 let (topology, mut out) =
1181 create_topology(ReceiverStream::new(rx), transform_config).await;
1182
1183 sleep(Duration::from_secs(1)).await;
1185
1186 let metric = make_metric();
1187
1188 tx.send(metric.into()).await.unwrap();
1189
1190 let event = out.recv().await.unwrap();
1191 assert_eq!(
1192 event
1193 .as_metric()
1194 .tag_value("ec2.metadata.availability-zone"),
1195 Some("us-east-1a".to_string())
1196 );
1197
1198 drop(tx);
1199 topology.stop().await;
1200 assert_eq!(out.recv().await, None);
1201 })
1202 .await;
1203 }
1204
1205 {
1206 assert_transform_compliance(async {
1207 let transform_config = Ec2Metadata {
1209 endpoint: ec2_metadata_address(),
1210 namespace: Some(OptionalTargetPath::none()),
1211 ..Default::default()
1212 };
1213
1214 let (tx, rx) = mpsc::channel(1);
1215 let (topology, mut out) =
1216 create_topology(ReceiverStream::new(rx), transform_config).await;
1217
1218 sleep(Duration::from_secs(1)).await;
1220
1221 let metric = make_metric();
1222
1223 tx.send(metric.into()).await.unwrap();
1224
1225 let event = out.recv().await.unwrap();
1226 assert_eq!(
1227 event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1228 Some("us-east-1a".to_string())
1229 );
1230
1231 drop(tx);
1232 topology.stop().await;
1233 assert_eq!(out.recv().await, None);
1234 })
1235 .await;
1236 }
1237 }
1238}