Skip to main content

vector/transforms/
aws_ec2_metadata.rs

1use std::{
2    collections::HashSet,
3    error, fmt,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::Body;
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20    configurable::configurable_component,
21    lookup::{
22        OwnedTargetPath,
23        lookup_v2::{OptionalTargetPath, OwnedSegment},
24        owned_value_path,
25    },
26};
27use vrl::value::{Kind, kind::Collection};
28
29use crate::{
30    config::{
31        DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
32    },
33    cpu_time::spawn_timed,
34    event::Event,
35    http::HttpClient,
36    internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
37    schema,
38    transforms::{TaskTransform, Transform},
39};
40
41const ACCOUNT_ID_KEY: &str = "account-id";
42const AMI_ID_KEY: &str = "ami-id";
43const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
44const INSTANCE_ID_KEY: &str = "instance-id";
45const INSTANCE_TYPE_KEY: &str = "instance-type";
46const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
47const LOCAL_IPV4_KEY: &str = "local-ipv4";
48const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
49const PUBLIC_IPV4_KEY: &str = "public-ipv4";
50const REGION_KEY: &str = "region";
51const SUBNET_ID_KEY: &str = "subnet-id";
52const VPC_ID_KEY: &str = "vpc-id";
53const ROLE_NAME_KEY: &str = "role-name";
54const TAGS_KEY: &str = "tags";
55
56static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
57    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
58static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
59    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
60static LOCAL_IPV4: LazyLock<PathAndQuery> =
61    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
62static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
63    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
64static PUBLIC_IPV4: LazyLock<PathAndQuery> =
65    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
66static ROLE_NAME: LazyLock<PathAndQuery> =
67    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
68static MAC: LazyLock<PathAndQuery> =
69    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
70static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
71    LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
72static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
73    AMI_ID_KEY,
74    AVAILABILITY_ZONE_KEY,
75    INSTANCE_ID_KEY,
76    INSTANCE_TYPE_KEY,
77    LOCAL_HOSTNAME_KEY,
78    LOCAL_IPV4_KEY,
79    PUBLIC_HOSTNAME_KEY,
80    PUBLIC_IPV4_KEY,
81    REGION_KEY,
82    SUBNET_ID_KEY,
83    VPC_ID_KEY,
84    ROLE_NAME_KEY,
85];
86static API_TOKEN: LazyLock<PathAndQuery> =
87    LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
88static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
89
90/// Configuration for the `aws_ec2_metadata` transform.
91#[serde_as]
92#[configurable_component(transform(
93    "aws_ec2_metadata",
94    "Parse metadata emitted by AWS EC2 instances."
95))]
96#[derive(Clone, Debug, Derivative)]
97#[derivative(Default)]
98pub struct Ec2Metadata {
99    /// Overrides the default EC2 metadata endpoint.
100    #[serde(alias = "host", default = "default_endpoint")]
101    #[derivative(Default(value = "default_endpoint()"))]
102    endpoint: String,
103
104    /// Sets a prefix for all event fields added by the transform.
105    #[configurable(metadata(
106        docs::examples = "",
107        docs::examples = "ec2",
108        docs::examples = "aws.ec2",
109    ))]
110    namespace: Option<OptionalTargetPath>,
111
112    /// The interval between querying for updated metadata, in seconds.
113    #[serde(default = "default_refresh_interval_secs")]
114    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
115    #[derivative(Default(value = "default_refresh_interval_secs()"))]
116    refresh_interval_secs: Duration,
117
118    /// A list of metadata fields to include in each transformed event.
119    #[serde(default = "default_fields")]
120    #[derivative(Default(value = "default_fields()"))]
121    #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
122    fields: Vec<String>,
123
124    /// A list of instance tags to include in each transformed event.
125    #[serde(default = "default_tags")]
126    #[derivative(Default(value = "default_tags()"))]
127    #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
128    tags: Vec<String>,
129
130    /// The timeout for querying the EC2 metadata endpoint, in seconds.
131    #[serde(default = "default_refresh_timeout_secs")]
132    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
133    #[derivative(Default(value = "default_refresh_timeout_secs()"))]
134    refresh_timeout_secs: Duration,
135
136    #[configurable(derived)]
137    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
138    proxy: ProxyConfig,
139
140    /// Requires the transform to be able to successfully query the EC2 metadata before starting to process the data.
141    #[serde(default = "default_required")]
142    #[derivative(Default(value = "default_required()"))]
143    required: bool,
144}
145
146fn default_endpoint() -> String {
147    String::from("http://169.254.169.254")
148}
149
150const fn default_refresh_interval_secs() -> Duration {
151    Duration::from_secs(10)
152}
153
154const fn default_refresh_timeout_secs() -> Duration {
155    Duration::from_secs(1)
156}
157
158fn default_fields() -> Vec<String> {
159    DEFAULT_FIELD_ALLOWLIST
160        .iter()
161        .map(|s| s.to_string())
162        .collect()
163}
164
165const fn default_tags() -> Vec<String> {
166    Vec::<String>::new()
167}
168
169const fn default_required() -> bool {
170    true
171}
172
173#[derive(Clone, Debug)]
174pub struct Ec2MetadataTransform {
175    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
176}
177
178#[derive(Debug, Clone)]
179struct MetadataKey {
180    log_path: OwnedTargetPath,
181    metric_tag: String,
182}
183
184#[derive(Debug)]
185struct Keys {
186    account_id_key: MetadataKey,
187    ami_id_key: MetadataKey,
188    availability_zone_key: MetadataKey,
189    instance_id_key: MetadataKey,
190    instance_type_key: MetadataKey,
191    local_hostname_key: MetadataKey,
192    local_ipv4_key: MetadataKey,
193    public_hostname_key: MetadataKey,
194    public_ipv4_key: MetadataKey,
195    region_key: MetadataKey,
196    subnet_id_key: MetadataKey,
197    vpc_id_key: MetadataKey,
198    role_name_key: MetadataKey,
199    tags_key: MetadataKey,
200}
201
202impl_generate_config_from_default!(Ec2Metadata);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "aws_ec2_metadata")]
206impl TransformConfig for Ec2Metadata {
207    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
208        let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
209
210        let keys = Keys::new(self.namespace.clone());
211        let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
212        let refresh_interval = self.refresh_interval_secs;
213        let fields = self.fields.clone();
214        let tags = self.tags.clone();
215        let refresh_timeout = self.refresh_timeout_secs;
216        let required = self.required;
217
218        let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
219        let http_client = HttpClient::new(None, &proxy)?;
220
221        let mut client = MetadataClient::new(
222            http_client,
223            host,
224            keys,
225            Arc::clone(&state),
226            refresh_interval,
227            refresh_timeout,
228            fields,
229            tags,
230        );
231
232        // If initial metadata is not required, log and proceed. Otherwise return error.
233        if let Err(error) = client.refresh_metadata().await {
234            if required {
235                return Err(error);
236            } else {
237                emit!(AwsEc2MetadataRefreshError { error });
238            }
239        }
240
241        // The metadata-refresh loop runs as its own tokio task, so the main
242        // transform task's CPU-time wrapper does not see it. Spawn the
243        // background task with the same component-tagged counter so its CPU
244        // is attributed to this transform.
245        spawn_timed(
246            async move {
247                client.run().await;
248            }
249            // TODO: Once #1338 is done we can fetch the current span
250            .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
251            context.cpu_ns.clone(),
252        );
253
254        Ok(Transform::event_task(Ec2MetadataTransform { state }))
255    }
256
257    fn input(&self) -> Input {
258        Input::new(DataType::Metric | DataType::Log)
259    }
260
261    fn outputs(
262        &self,
263        _: &TransformContext,
264        input_definitions: &[(OutputId, schema::Definition)],
265    ) -> Vec<TransformOutput> {
266        let added_keys = Keys::new(self.namespace.clone());
267
268        let paths = [
269            &added_keys.account_id_key.log_path,
270            &added_keys.ami_id_key.log_path,
271            &added_keys.availability_zone_key.log_path,
272            &added_keys.instance_id_key.log_path,
273            &added_keys.instance_type_key.log_path,
274            &added_keys.local_hostname_key.log_path,
275            &added_keys.local_ipv4_key.log_path,
276            &added_keys.public_hostname_key.log_path,
277            &added_keys.public_ipv4_key.log_path,
278            &added_keys.region_key.log_path,
279            &added_keys.subnet_id_key.log_path,
280            &added_keys.vpc_id_key.log_path,
281            &added_keys.role_name_key.log_path,
282            &added_keys.tags_key.log_path,
283        ];
284
285        let schema_definition = input_definitions
286            .iter()
287            .map(|(output, definition)| {
288                let mut schema_definition = definition.clone();
289
290                // If the event is not an object, it will be converted to an object in this transform
291                if !schema_definition.event_kind().contains_object() {
292                    *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
293                }
294
295                for path in paths {
296                    schema_definition =
297                        schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
298                }
299
300                (output.clone(), schema_definition)
301            })
302            .collect();
303
304        vec![TransformOutput::new(
305            DataType::Metric | DataType::Log,
306            schema_definition,
307        )]
308    }
309}
310
311impl TaskTransform<Event> for Ec2MetadataTransform {
312    fn transform(
313        self: Box<Self>,
314        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
315    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
316    where
317        Self: 'static,
318    {
319        let mut inner = self;
320        Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
321    }
322}
323
324impl Ec2MetadataTransform {
325    fn transform_one(&mut self, mut event: Event) -> Event {
326        let state = self.state.load();
327        match event {
328            Event::Log(ref mut log) => {
329                state.iter().for_each(|(k, v)| {
330                    log.insert(&k.log_path, v.clone());
331                });
332            }
333            Event::Metric(ref mut metric) => {
334                state.iter().for_each(|(k, v)| {
335                    metric
336                        .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
337                });
338            }
339            Event::Trace(_) => panic!("Traces are not supported."),
340        }
341        event
342    }
343}
344
345struct MetadataClient {
346    client: HttpClient<Body>,
347    host: Uri,
348    token: Option<(Bytes, Instant)>,
349    keys: Keys,
350    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
351    refresh_interval: Duration,
352    refresh_timeout: Duration,
353    fields: HashSet<String>,
354    tags: HashSet<String>,
355}
356
357#[derive(Debug, Deserialize)]
358#[serde(rename_all = "camelCase")]
359#[allow(dead_code)] // deserialize all fields
360struct IdentityDocument {
361    account_id: String,
362    architecture: String,
363    image_id: String,
364    instance_id: String,
365    instance_type: String,
366    private_ip: String,
367    region: String,
368    version: String,
369}
370
371impl MetadataClient {
372    #[allow(clippy::too_many_arguments)]
373    pub fn new(
374        client: HttpClient<Body>,
375        host: Uri,
376        keys: Keys,
377        state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
378        refresh_interval: Duration,
379        refresh_timeout: Duration,
380        fields: Vec<String>,
381        tags: Vec<String>,
382    ) -> Self {
383        Self {
384            client,
385            host,
386            token: None,
387            keys,
388            state,
389            refresh_interval,
390            refresh_timeout,
391            fields: fields.into_iter().collect(),
392            tags: tags.into_iter().collect(),
393        }
394    }
395
396    async fn run(&mut self) {
397        loop {
398            match self.refresh_metadata().await {
399                Ok(_) => {
400                    emit!(AwsEc2MetadataRefreshSuccessful);
401                }
402                Err(error) => {
403                    emit!(AwsEc2MetadataRefreshError { error });
404                }
405            }
406
407            sleep(self.refresh_interval).await;
408        }
409    }
410
411    pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
412        if let Some((token, next_refresh)) = self.token.clone() {
413            // If the next refresh is greater (in the future) than
414            // the current time we can return the token since its still valid
415            // otherwise lets refresh it.
416            if next_refresh > Instant::now() {
417                return Ok(token);
418            }
419        }
420
421        let mut parts = self.host.clone().into_parts();
422        parts.path_and_query = Some(API_TOKEN.clone());
423        let uri = Uri::from_parts(parts)?;
424
425        let req = Request::put(uri)
426            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
427            .body(Body::empty())?;
428
429        let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
430            .await?
431            .map_err(crate::Error::from)
432            .and_then(|res| match res.status() {
433                StatusCode::OK => Ok(res),
434                status_code => Err(UnexpectedHttpStatusError {
435                    status: status_code,
436                }
437                .into()),
438            })?;
439
440        let token = http_body::Body::collect(res.into_body()).await?.to_bytes();
441
442        let next_refresh = Instant::now() + Duration::from_secs(21600);
443        self.token = Some((token.clone(), next_refresh));
444
445        Ok(token)
446    }
447
448    pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
449        self.get_metadata(&DYNAMIC_DOCUMENT)
450            .await?
451            .map(|body| {
452                serde_json::from_slice(&body[..])
453                    .context(ParseIdentityDocumentSnafu {})
454                    .map_err(Into::into)
455            })
456            .transpose()
457    }
458
459    pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
460        let mut new_state = vec![];
461
462        // Fetch all resources, _then_ add them to the state map.
463        if let Some(document) = self.get_document().await? {
464            if self.fields.contains(ACCOUNT_ID_KEY) {
465                new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
466            }
467
468            if self.fields.contains(AMI_ID_KEY) {
469                new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
470            }
471
472            if self.fields.contains(INSTANCE_ID_KEY) {
473                new_state.push((
474                    self.keys.instance_id_key.clone(),
475                    document.instance_id.into(),
476                ));
477            }
478
479            if self.fields.contains(INSTANCE_TYPE_KEY) {
480                new_state.push((
481                    self.keys.instance_type_key.clone(),
482                    document.instance_type.into(),
483                ));
484            }
485
486            if self.fields.contains(REGION_KEY) {
487                new_state.push((self.keys.region_key.clone(), document.region.into()));
488            }
489
490            if self.fields.contains(AVAILABILITY_ZONE_KEY)
491                && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
492            {
493                new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
494            }
495
496            if self.fields.contains(LOCAL_HOSTNAME_KEY)
497                && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
498            {
499                new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
500            }
501
502            if self.fields.contains(LOCAL_IPV4_KEY)
503                && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
504            {
505                new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
506            }
507
508            if self.fields.contains(PUBLIC_HOSTNAME_KEY)
509                && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
510            {
511                new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
512            }
513
514            if self.fields.contains(PUBLIC_IPV4_KEY)
515                && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
516            {
517                new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
518            }
519
520            if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
521                && let Some(mac) = self.get_metadata(&MAC).await?
522            {
523                let mac = String::from_utf8_lossy(&mac[..]);
524
525                if self.fields.contains(SUBNET_ID_KEY) {
526                    let subnet_path =
527                        format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
528
529                    let subnet_path = subnet_path.parse().context(ParsePathSnafu {
530                        value: subnet_path.clone(),
531                    })?;
532
533                    if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
534                        new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
535                    }
536                }
537
538                if self.fields.contains(VPC_ID_KEY) {
539                    let vpc_path =
540                        format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
541
542                    let vpc_path = vpc_path.parse().context(ParsePathSnafu {
543                        value: vpc_path.clone(),
544                    })?;
545
546                    if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
547                        new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
548                    }
549                }
550            }
551
552            if self.fields.contains(ROLE_NAME_KEY)
553                && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
554            {
555                let role_names = String::from_utf8_lossy(&role_names[..]);
556
557                for (i, role_name) in role_names.lines().enumerate() {
558                    new_state.push((
559                        MetadataKey {
560                            log_path: self
561                                .keys
562                                .role_name_key
563                                .log_path
564                                .with_index_appended(i as isize),
565                            metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
566                        },
567                        role_name.to_string().into(),
568                    ));
569                }
570            }
571
572            for tag in self.tags.clone() {
573                let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
574
575                let tag_path = tag_path.parse().context(ParsePathSnafu {
576                    value: tag_path.clone(),
577                })?;
578
579                if let Some(tag_content) = self.get_metadata(&tag_path).await? {
580                    new_state.push((
581                        MetadataKey {
582                            log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
583                            metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
584                        },
585                        tag_content,
586                    ));
587                }
588            }
589
590            self.state.store(Arc::new(new_state));
591        }
592
593        Ok(())
594    }
595
596    async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
597        let token = self
598            .get_token()
599            .await
600            .with_context(|_| FetchTokenSnafu {})?;
601
602        let mut parts = self.host.clone().into_parts();
603
604        parts.path_and_query = Some(path.clone());
605
606        let uri = Uri::from_parts(parts)?;
607
608        debug!(message = "Sending metadata request.", %uri);
609
610        let req = Request::get(uri)
611            .header(TOKEN_HEADER.as_ref(), token.as_ref())
612            .body(Body::empty())?;
613
614        match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
615            .await?
616            .map_err(crate::Error::from)
617            .and_then(|res| match res.status() {
618                StatusCode::OK => Ok(Some(res)),
619                StatusCode::NOT_FOUND => Ok(None),
620                status_code => Err(UnexpectedHttpStatusError {
621                    status: status_code,
622                }
623                .into()),
624            })? {
625            Some(res) => {
626                let body = http_body::Body::collect(res.into_body()).await?.to_bytes();
627                Ok(Some(body))
628            }
629            None => Ok(None),
630        }
631    }
632}
633
634// This creates a simplified string from the namespace. Since the namespace is technically
635// a target path, it can contain syntax that is undesirable for a metric tag (such as prefix, quotes, etc)
636// This is mainly used for backwards compatibility.
637// see: https://github.com/vectordotdev/vector/issues/14931
638fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
639    let mut output = String::new();
640    for segment in &namespace.path.segments {
641        if !output.is_empty() {
642            output += ".";
643        }
644        match segment {
645            OwnedSegment::Field(field) => {
646                output += field;
647            }
648            OwnedSegment::Index(i) => {
649                output += &i.to_string();
650            }
651        }
652    }
653    output
654}
655
656fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
657    if let Some(namespace) = namespace {
658        MetadataKey {
659            log_path: namespace.with_field_appended(key),
660            metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
661        }
662    } else {
663        MetadataKey {
664            log_path: OwnedTargetPath::event(owned_value_path!(key)),
665            metric_tag: key.to_owned(),
666        }
667    }
668}
669
670impl Keys {
671    pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
672        let namespace = namespace.and_then(|namespace| namespace.path);
673
674        Keys {
675            account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
676            ami_id_key: create_key(&namespace, AMI_ID_KEY),
677            availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
678            instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
679            instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
680            local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
681            local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
682            public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
683            public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
684            region_key: create_key(&namespace, REGION_KEY),
685            subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
686            vpc_id_key: create_key(&namespace, VPC_ID_KEY),
687            role_name_key: create_key(&namespace, ROLE_NAME_KEY),
688            tags_key: create_key(&namespace, TAGS_KEY),
689        }
690    }
691}
692
693#[derive(Debug)]
694struct UnexpectedHttpStatusError {
695    status: http::StatusCode,
696}
697
698impl fmt::Display for UnexpectedHttpStatusError {
699    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
700        write!(f, "got unexpected status code: {}", self.status)
701    }
702}
703
704impl error::Error for UnexpectedHttpStatusError {}
705
706#[derive(Debug, snafu::Snafu)]
707enum Ec2MetadataError {
708    #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
709    FetchToken { source: crate::Error },
710    #[snafu(display("Unable to parse identity document: {}.", source))]
711    ParseIdentityDocument { source: serde_json::Error },
712    #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
713    ParsePath {
714        value: String,
715        source: http::uri::InvalidUri,
716    },
717}
718
719#[cfg(test)]
720mod test {
721    use vector_lib::lookup::OwnedTargetPath;
722    use vrl::{owned_value_path, value::Kind};
723
724    use crate::{
725        config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
726        transforms::aws_ec2_metadata::Ec2Metadata,
727    };
728
729    #[tokio::test]
730    async fn schema_def_with_string_input() {
731        let transform_config = Ec2Metadata {
732            namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
733            ..Default::default()
734        };
735
736        let input_definition =
737            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
738
739        let mut outputs = transform_config.outputs(
740            &Default::default(),
741            &[(OutputId::dummy(), input_definition)],
742        );
743        assert_eq!(outputs.len(), 1);
744        let output = outputs.pop().unwrap();
745        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
746        assert!(actual_schema_def.event_kind().is_object());
747    }
748}
749
750#[cfg(feature = "aws-ec2-metadata-integration-tests")]
751#[cfg(test)]
752mod integration_tests {
753    use tokio::sync::mpsc;
754    use tokio_stream::wrappers::ReceiverStream;
755    use vector_lib::{
756        assert_event_data_eq,
757        lookup::{
758            PathPrefix, event_path,
759            lookup_v2::{OwnedSegment, OwnedValuePath},
760        },
761    };
762    use vrl::value::{ObjectMap, Value};
763    use warp::Filter;
764
765    use super::*;
766    use crate::{
767        event::{LogEvent, Metric, metric},
768        test_util::{addr::next_addr, components::assert_transform_compliance},
769        transforms::test::create_topology,
770    };
771
772    fn ec2_metadata_address() -> String {
773        std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
774    }
775
776    fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
777        vec![
778            (
779                vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
780                "us-east-1a",
781            ),
782            (
783                vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
784                "192.0.2.54",
785            ),
786            (
787                vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
788                "ec2-192-0-2-54.compute-1.amazonaws.com",
789            ),
790            (
791                vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
792                "172.16.34.43",
793            ),
794            (
795                vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
796                "ip-172-16-34-43.ec2.internal",
797            ),
798            (
799                vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
800                "i-1234567890abcdef0",
801            ),
802            (
803                vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
804                "0123456789",
805            ),
806            (
807                vec![OwnedSegment::field(AMI_ID_KEY)].into(),
808                "ami-0b69ea66ff7391e80",
809            ),
810            (
811                vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
812                "m4.xlarge",
813            ),
814            (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
815            (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
816            (
817                vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
818                "subnet-0ac62554",
819            ),
820            (owned_value_path!("role-name", 0), "baskinc-role"),
821            (owned_value_path!("tags", "Name"), "test-instance"),
822            (owned_value_path!("tags", "Test"), "test-tag"),
823        ]
824    }
825
826    fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
827        vec![
828            (AVAILABILITY_ZONE_KEY, "us-east-1a"),
829            (PUBLIC_IPV4_KEY, "192.0.2.54"),
830            (
831                PUBLIC_HOSTNAME_KEY,
832                "ec2-192-0-2-54.compute-1.amazonaws.com",
833            ),
834            (LOCAL_IPV4_KEY, "172.16.34.43"),
835            (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
836            (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
837            (ACCOUNT_ID_KEY, "0123456789"),
838            (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
839            (INSTANCE_TYPE_KEY, "m4.xlarge"),
840            (REGION_KEY, "us-east-1"),
841            (VPC_ID_KEY, "vpc-d295a6a7"),
842            (SUBNET_ID_KEY, "subnet-0ac62554"),
843            ("role-name[0]", "baskinc-role"),
844            ("tags[Name]", "test-instance"),
845            ("tags[Test]", "test-tag"),
846        ]
847    }
848
849    fn make_metric() -> Metric {
850        Metric::new(
851            "event",
852            metric::MetricKind::Incremental,
853            metric::MetricValue::Counter { value: 1.0 },
854        )
855    }
856
857    #[test]
858    fn generate_config() {
859        crate::test_util::test_generate_config::<Ec2Metadata>();
860    }
861
862    #[tokio::test]
863    async fn enrich_log() {
864        assert_transform_compliance(async {
865            let mut fields = default_fields();
866            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
867
868            let tags = vec![
869                String::from("Name"),
870                String::from("Test"),
871                String::from("MISSING_TAG"),
872            ];
873
874            let transform_config = Ec2Metadata {
875                endpoint: ec2_metadata_address(),
876                fields,
877                tags,
878                ..Default::default()
879            };
880
881            let (tx, rx) = mpsc::channel(1);
882            let (topology, mut out) =
883                create_topology(ReceiverStream::new(rx), transform_config).await;
884
885            // We need to sleep to let the background task fetch the data.
886            sleep(Duration::from_secs(1)).await;
887
888            let log = LogEvent::default();
889            let mut expected_log = log.clone();
890            for (k, v) in expected_log_fields().iter().cloned() {
891                expected_log.insert((PathPrefix::Event, &k), v);
892            }
893
894            tx.send(log.into()).await.unwrap();
895
896            let event = out.recv().await.unwrap();
897            assert_event_data_eq!(event.into_log(), expected_log);
898
899            drop(tx);
900            topology.stop().await;
901            assert_eq!(out.recv().await, None);
902        })
903        .await;
904    }
905
906    #[tokio::test(flavor = "multi_thread")]
907    async fn timeout() {
908        let (_guard, addr) = next_addr();
909
910        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
911            tokio::time::sleep(Duration::from_secs(3)).await;
912            Ok("I waited 3 seconds!")
913        }
914
915        let slow = warp::any().and_then(sleepy);
916        let server = warp::serve(slow).bind(addr);
917        let _server = tokio::spawn(server);
918
919        let config = Ec2Metadata {
920            endpoint: format!("http://{addr}"),
921            refresh_timeout_secs: Duration::from_secs(1),
922            ..Default::default()
923        };
924
925        match config.build(&TransformContext::default()).await {
926            Ok(_) => panic!("expected timeout failure"),
927            // cannot create tokio::time::error::Elapsed to compare with since constructor is
928            // private
929            Err(err) => assert_eq!(
930                err.to_string(),
931                "Unable to fetch metadata authentication token: deadline has elapsed."
932            ),
933        }
934    }
935
936    // validates the configuration setting 'required'=false allows vector to run
937    #[tokio::test(flavor = "multi_thread")]
938    async fn not_required() {
939        let (_guard, addr) = next_addr();
940
941        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
942            tokio::time::sleep(Duration::from_secs(3)).await;
943            Ok("I waited 3 seconds!")
944        }
945
946        let slow = warp::any().and_then(sleepy);
947        let server = warp::serve(slow).bind(addr);
948        let _server = tokio::spawn(server);
949
950        let config = Ec2Metadata {
951            endpoint: format!("http://{addr}"),
952            refresh_timeout_secs: Duration::from_secs(1),
953            required: false,
954            ..Default::default()
955        };
956
957        assert!(
958            config.build(&TransformContext::default()).await.is_ok(),
959            "expected no failure because 'required' config value set to false"
960        );
961    }
962
963    #[tokio::test]
964    async fn enrich_metric() {
965        assert_transform_compliance(async {
966            let mut fields = default_fields();
967            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
968
969            let tags = vec![
970                String::from("Name"),
971                String::from("Test"),
972                String::from("MISSING_TAG"),
973            ];
974
975            let transform_config = Ec2Metadata {
976                endpoint: ec2_metadata_address(),
977                fields,
978                tags,
979                ..Default::default()
980            };
981
982            let (tx, rx) = mpsc::channel(1);
983            let (topology, mut out) =
984                create_topology(ReceiverStream::new(rx), transform_config).await;
985
986            // We need to sleep to let the background task fetch the data.
987            sleep(Duration::from_secs(1)).await;
988
989            let metric = make_metric();
990            let mut expected_metric = metric.clone();
991            for (k, v) in expected_metric_fields().iter() {
992                expected_metric.replace_tag(k.to_string(), v.to_string());
993            }
994
995            tx.send(metric.into()).await.unwrap();
996
997            let event = out.recv().await.unwrap();
998            assert_event_data_eq!(event.into_metric(), expected_metric);
999
1000            drop(tx);
1001            topology.stop().await;
1002            assert_eq!(out.recv().await, None);
1003        })
1004        .await;
1005    }
1006
1007    #[tokio::test]
1008    async fn fields_log() {
1009        assert_transform_compliance(async {
1010            let transform_config = Ec2Metadata {
1011                endpoint: ec2_metadata_address(),
1012                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1013                tags: vec![
1014                    String::from("Name"),
1015                    String::from("Test"),
1016                    String::from("MISSING_TAG"),
1017                ],
1018                ..Default::default()
1019            };
1020
1021            let (tx, rx) = mpsc::channel(1);
1022            let (topology, mut out) =
1023                create_topology(ReceiverStream::new(rx), transform_config).await;
1024
1025            // We need to sleep to let the background task fetch the data.
1026            sleep(Duration::from_secs(1)).await;
1027
1028            let log = LogEvent::default();
1029            let mut expected_log = log.clone();
1030            expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1031            expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1032            expected_log.insert(
1033                format!("\"{TAGS_KEY}\"").as_str(),
1034                ObjectMap::from([
1035                    ("Name".into(), Value::from("test-instance")),
1036                    ("Test".into(), Value::from("test-tag")),
1037                ]),
1038            );
1039
1040            tx.send(log.into()).await.unwrap();
1041
1042            let event = out.recv().await.unwrap();
1043            assert_event_data_eq!(event.into_log(), expected_log);
1044
1045            drop(tx);
1046            topology.stop().await;
1047            assert_eq!(out.recv().await, None);
1048        })
1049        .await;
1050    }
1051
1052    #[tokio::test]
1053    async fn fields_metric() {
1054        assert_transform_compliance(async {
1055            let transform_config = Ec2Metadata {
1056                endpoint: ec2_metadata_address(),
1057                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1058                tags: vec![
1059                    String::from("Name"),
1060                    String::from("Test"),
1061                    String::from("MISSING_TAG"),
1062                ],
1063                ..Default::default()
1064            };
1065
1066            let (tx, rx) = mpsc::channel(1);
1067            let (topology, mut out) =
1068                create_topology(ReceiverStream::new(rx), transform_config).await;
1069
1070            // We need to sleep to let the background task fetch the data.
1071            sleep(Duration::from_secs(1)).await;
1072
1073            let metric = make_metric();
1074            let mut expected_metric = metric.clone();
1075            expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1076            expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1077            expected_metric.replace_tag(
1078                format!("{}[{}]", TAGS_KEY, "Name"),
1079                "test-instance".to_string(),
1080            );
1081            expected_metric
1082                .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1083
1084            tx.send(metric.into()).await.unwrap();
1085
1086            let event = out.recv().await.unwrap();
1087            assert_event_data_eq!(event.into_metric(), expected_metric);
1088
1089            drop(tx);
1090            topology.stop().await;
1091            assert_eq!(out.recv().await, None);
1092        })
1093        .await;
1094    }
1095
1096    #[tokio::test]
1097    async fn namespace_log() {
1098        {
1099            assert_transform_compliance(async {
1100                let transform_config = Ec2Metadata {
1101                    endpoint: ec2_metadata_address(),
1102                    namespace: Some(
1103                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1104                    ),
1105                    ..Default::default()
1106                };
1107
1108                let (tx, rx) = mpsc::channel(1);
1109                let (topology, mut out) =
1110                    create_topology(ReceiverStream::new(rx), transform_config).await;
1111
1112                // We need to sleep to let the background task fetch the data.
1113                sleep(Duration::from_secs(1)).await;
1114
1115                let log = LogEvent::default();
1116
1117                tx.send(log.into()).await.unwrap();
1118
1119                let event = out.recv().await.unwrap();
1120
1121                assert_eq!(
1122                    event.as_log().get("ec2.metadata.\"availability-zone\""),
1123                    Some(&"us-east-1a".into())
1124                );
1125
1126                drop(tx);
1127                topology.stop().await;
1128                assert_eq!(out.recv().await, None);
1129            })
1130            .await;
1131        }
1132
1133        {
1134            assert_transform_compliance(async {
1135                // Set an empty namespace to ensure we don't prepend one.
1136                let transform_config = Ec2Metadata {
1137                    endpoint: ec2_metadata_address(),
1138                    namespace: Some(OptionalTargetPath::none()),
1139                    ..Default::default()
1140                };
1141
1142                let (tx, rx) = mpsc::channel(1);
1143                let (topology, mut out) =
1144                    create_topology(ReceiverStream::new(rx), transform_config).await;
1145
1146                // We need to sleep to let the background task fetch the data.
1147                sleep(Duration::from_secs(1)).await;
1148
1149                let log = LogEvent::default();
1150
1151                tx.send(log.into()).await.unwrap();
1152
1153                let event = out.recv().await.unwrap();
1154                assert_eq!(
1155                    event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1156                    Some(&"us-east-1a".into())
1157                );
1158
1159                drop(tx);
1160                topology.stop().await;
1161                assert_eq!(out.recv().await, None);
1162            })
1163            .await;
1164        }
1165    }
1166
1167    #[tokio::test]
1168    async fn namespace_metric() {
1169        {
1170            assert_transform_compliance(async {
1171                let transform_config = Ec2Metadata {
1172                    endpoint: ec2_metadata_address(),
1173                    namespace: Some(
1174                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1175                    ),
1176                    ..Default::default()
1177                };
1178
1179                let (tx, rx) = mpsc::channel(1);
1180                let (topology, mut out) =
1181                    create_topology(ReceiverStream::new(rx), transform_config).await;
1182
1183                // We need to sleep to let the background task fetch the data.
1184                sleep(Duration::from_secs(1)).await;
1185
1186                let metric = make_metric();
1187
1188                tx.send(metric.into()).await.unwrap();
1189
1190                let event = out.recv().await.unwrap();
1191                assert_eq!(
1192                    event
1193                        .as_metric()
1194                        .tag_value("ec2.metadata.availability-zone"),
1195                    Some("us-east-1a".to_string())
1196                );
1197
1198                drop(tx);
1199                topology.stop().await;
1200                assert_eq!(out.recv().await, None);
1201            })
1202            .await;
1203        }
1204
1205        {
1206            assert_transform_compliance(async {
1207                // Set an empty namespace to ensure we don't prepend one.
1208                let transform_config = Ec2Metadata {
1209                    endpoint: ec2_metadata_address(),
1210                    namespace: Some(OptionalTargetPath::none()),
1211                    ..Default::default()
1212                };
1213
1214                let (tx, rx) = mpsc::channel(1);
1215                let (topology, mut out) =
1216                    create_topology(ReceiverStream::new(rx), transform_config).await;
1217
1218                // We need to sleep to let the background task fetch the data.
1219                sleep(Duration::from_secs(1)).await;
1220
1221                let metric = make_metric();
1222
1223                tx.send(metric.into()).await.unwrap();
1224
1225                let event = out.recv().await.unwrap();
1226                assert_eq!(
1227                    event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1228                    Some("us-east-1a".to_string())
1229                );
1230
1231                drop(tx);
1232                topology.stop().await;
1233                assert_eq!(out.recv().await, None);
1234            })
1235            .await;
1236        }
1237    }
1238}