Skip to main content

vector/sources/opentelemetry/
config.rs

1use std::net::SocketAddr;
2
3use crate::{
4    config::{
5        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
6        SourceContext, SourceOutput,
7    },
8    http::KeepaliveConfig,
9    serde::bool_or_struct,
10    sources::{
11        Source,
12        http_server::{build_param_matcher, remove_duplicates},
13        opentelemetry::{
14            grpc::Service,
15            http::{build_warp_filter, run_http_server},
16        },
17        util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes},
18    },
19};
20use futures::FutureExt;
21use futures_util::{TryFutureExt, future::join};
22use tonic::transport::server::RoutesBuilder;
23use vector_config::indexmap::IndexSet;
24use vector_lib::{
25    codecs::decoding::{OtlpDeserializer, OtlpSignalType},
26    config::{LegacyKey, LogNamespace, log_schema},
27    configurable::configurable_component,
28    internal_event::{BytesReceived, EventsReceived, Protocol},
29    lookup::{OwnedTargetPath, owned_value_path},
30    opentelemetry::{
31        logs::{
32            ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
33            RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
34        },
35        proto::collector::{
36            logs::v1::logs_service_server::LogsServiceServer,
37            metrics::v1::metrics_service_server::MetricsServiceServer,
38            trace::v1::trace_service_server::TraceServiceServer,
39        },
40    },
41    schema::Definition,
42    tls::{MaybeTlsSettings, TlsEnableableConfig},
43};
44use vrl::value::{Kind, kind::Collection};
45
46pub const LOGS: &str = "logs";
47pub const METRICS: &str = "metrics";
48pub const TRACES: &str = "traces";
49
50/// Configuration for OTLP decoding behavior.
51#[configurable_component]
52#[derive(Clone, Debug, Default, PartialEq, Eq)]
53#[serde(deny_unknown_fields)]
54pub struct OtlpDecodingConfig {
55    /// Whether to use OTLP decoding for logs.
56    ///
57    /// When `true`, logs preserve their OTLP format.
58    /// When `false` (default), logs are converted to Vector's native format.
59    #[serde(default)]
60    pub logs: bool,
61
62    /// Whether to use OTLP decoding for metrics.
63    ///
64    /// When `true`, metrics preserve their OTLP format but are processed as logs.
65    /// When `false` (default), metrics are converted to Vector's native metric format.
66    #[serde(default)]
67    pub metrics: bool,
68
69    /// Whether to use OTLP decoding for traces.
70    ///
71    /// When `true`, traces preserve their OTLP format.
72    /// When `false` (default), traces are converted to Vector's native format.
73    #[serde(default)]
74    pub traces: bool,
75}
76
77impl From<bool> for OtlpDecodingConfig {
78    /// Converts a boolean value to an OtlpDecodingConfig.
79    ///
80    /// This provides backward compatibility with the previous boolean configuration.
81    /// - `true` enables OTLP decoding for all signals
82    /// - `false` disables OTLP decoding for all signals (uses Vector native format)
83    fn from(value: bool) -> Self {
84        Self {
85            logs: value,
86            metrics: value,
87            traces: value,
88        }
89    }
90}
91
92impl OtlpDecodingConfig {
93    /// Returns true if any signal is configured to use OTLP decoding.
94    pub const fn any_enabled(&self) -> bool {
95        self.logs || self.metrics || self.traces
96    }
97
98    /// Returns true if all signals are configured to use OTLP decoding.
99    pub const fn all_enabled(&self) -> bool {
100        self.logs && self.metrics && self.traces
101    }
102
103    /// Returns true if signals have mixed configuration (some enabled, some disabled).
104    pub const fn is_mixed(&self) -> bool {
105        self.any_enabled() && !self.all_enabled()
106    }
107}
108
109/// Configuration for the `opentelemetry` source.
110#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields)]
113pub struct OpentelemetryConfig {
114    #[configurable(derived)]
115    pub grpc: GrpcConfig,
116
117    #[configurable(derived)]
118    pub http: HttpConfig,
119
120    #[configurable(derived)]
121    #[serde(default, deserialize_with = "bool_or_struct")]
122    pub acknowledgements: SourceAcknowledgementsConfig,
123
124    /// The namespace to use for logs. This overrides the global setting.
125    #[configurable(metadata(docs::hidden))]
126    #[serde(default)]
127    pub log_namespace: Option<bool>,
128
129    /// Configuration for OTLP decoding behavior.
130    ///
131    /// This configuration controls how OpenTelemetry Protocol (OTLP) data is decoded for each
132    /// signal type (logs, metrics, traces). When a signal is configured to use OTLP decoding, the raw OTLP format is
133    /// preserved, allowing the data to be forwarded to downstream OTLP collectors without transformation.
134    /// Otherwise, the signal is converted to Vector's native event format.
135    ///
136    /// Simple boolean form:
137    ///
138    /// ```yaml
139    /// use_otlp_decoding: true  # All signals preserve OTLP format
140    /// # or
141    /// use_otlp_decoding: false # All signals use Vector native format (default)
142    /// ```
143    ///
144    /// Per-signal configuration:
145    ///
146    /// ```yaml
147    /// use_otlp_decoding:
148    ///   logs: false     # Convert to Vector native format
149    ///   metrics: false  # Convert to Vector native format
150    ///   traces: true    # Preserve OTLP format
151    /// ```
152    ///
153    /// **Note:** When OTLP decoding is enabled for metrics:
154    /// - Metrics are parsed as logs while preserving the OTLP format
155    /// - Vector's metric transforms will NOT be compatible with this output
156    /// - The events can be forwarded directly (passthrough) to a downstream OTLP collector
157    #[serde(default, deserialize_with = "bool_or_struct")]
158    pub use_otlp_decoding: OtlpDecodingConfig,
159}
160
161/// Configuration for the `opentelemetry` gRPC server.
162#[configurable_component]
163#[configurable(metadata(docs::examples = "example_grpc_config()"))]
164#[derive(Clone, Debug)]
165#[serde(deny_unknown_fields)]
166pub struct GrpcConfig {
167    /// The socket address to listen for connections on.
168    ///
169    /// It _must_ include a port.
170    #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
171    pub address: SocketAddr,
172
173    #[configurable(derived)]
174    #[serde(default, skip_serializing_if = "Option::is_none")]
175    pub tls: Option<TlsEnableableConfig>,
176
177    #[configurable(derived)]
178    #[serde(default)]
179    pub keepalive: GrpcKeepaliveConfig,
180}
181
182fn example_grpc_config() -> GrpcConfig {
183    GrpcConfig {
184        address: "0.0.0.0:4317".parse().unwrap(),
185        tls: None,
186        keepalive: GrpcKeepaliveConfig::default(),
187    }
188}
189
190/// Configuration for the `opentelemetry` HTTP server.
191#[configurable_component]
192#[configurable(metadata(docs::examples = "example_http_config()"))]
193#[derive(Clone, Debug)]
194#[serde(deny_unknown_fields)]
195pub struct HttpConfig {
196    /// The socket address to listen for connections on.
197    ///
198    /// It _must_ include a port.
199    #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
200    pub address: SocketAddr,
201
202    #[configurable(derived)]
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub tls: Option<TlsEnableableConfig>,
205
206    #[configurable(derived)]
207    #[serde(default)]
208    pub keepalive: KeepaliveConfig,
209
210    /// A list of HTTP headers to include in the event.
211    ///
212    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
213    ///
214    /// Specifying "*" results in all headers included in the event.
215    ///
216    /// For log events in legacy namespace mode, headers are not included if a field with a conflicting name exists.
217    /// For metrics and traces, headers are always added to event metadata.
218    #[serde(default)]
219    #[configurable(metadata(docs::examples = "User-Agent"))]
220    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
221    #[configurable(metadata(docs::examples = "X-*"))]
222    #[configurable(metadata(docs::examples = "*"))]
223    pub headers: Vec<String>,
224}
225
226fn example_http_config() -> HttpConfig {
227    HttpConfig {
228        address: "0.0.0.0:4318".parse().unwrap(),
229        tls: None,
230        keepalive: KeepaliveConfig::default(),
231        headers: vec![],
232    }
233}
234
235impl GenerateConfig for OpentelemetryConfig {
236    fn generate_config() -> toml::Value {
237        toml::Value::try_from(Self {
238            grpc: example_grpc_config(),
239            http: example_http_config(),
240            acknowledgements: Default::default(),
241            log_namespace: None,
242            use_otlp_decoding: OtlpDecodingConfig::default(),
243        })
244        .unwrap()
245    }
246}
247
248impl OpentelemetryConfig {
249    pub(crate) fn get_signal_deserializer(
250        &self,
251        signal_type: OtlpSignalType,
252    ) -> vector_common::Result<Option<OtlpDeserializer>> {
253        let should_use_otlp = match signal_type {
254            OtlpSignalType::Logs => self.use_otlp_decoding.logs,
255            OtlpSignalType::Metrics => self.use_otlp_decoding.metrics,
256            OtlpSignalType::Traces => self.use_otlp_decoding.traces,
257        };
258
259        if should_use_otlp {
260            Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
261                signal_type,
262            ]))))
263        } else {
264            Ok(None)
265        }
266    }
267}
268
269#[async_trait::async_trait]
270#[typetag::serde(name = "opentelemetry")]
271impl SourceConfig for OpentelemetryConfig {
272    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
273        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
274        let events_received = register!(EventsReceived);
275        let log_namespace = cx.log_namespace(self.log_namespace);
276
277        let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
278
279        // Log info message when using mixed OTLP decoding formats
280        if self.use_otlp_decoding.is_mixed() {
281            info!(
282                message = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format.",
283                logs_otlp = self.use_otlp_decoding.logs,
284                metrics_otlp = self.use_otlp_decoding.metrics,
285                traces_otlp = self.use_otlp_decoding.traces,
286            );
287        }
288
289        let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
290        let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
291        let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
292
293        // Compression negotiation (gzip, zstd) is handled centrally by
294        // `DecompressionAndMetricsLayer` in `sources::util::grpc`, so these
295        // services deliberately do not call `.accept_compressed(..)`.
296        let log_service = LogsServiceServer::new(Service {
297            pipeline: cx.out.clone(),
298            acknowledgements,
299            log_namespace,
300            events_received: events_received.clone(),
301            deserializer: logs_deserializer.clone(),
302        })
303        .max_decoding_message_size(usize::MAX);
304
305        let metrics_service = MetricsServiceServer::new(Service {
306            pipeline: cx.out.clone(),
307            acknowledgements,
308            log_namespace,
309            events_received: events_received.clone(),
310            deserializer: metrics_deserializer.clone(),
311        })
312        .max_decoding_message_size(usize::MAX);
313
314        let trace_service = TraceServiceServer::new(Service {
315            pipeline: cx.out.clone(),
316            acknowledgements,
317            log_namespace,
318            events_received: events_received.clone(),
319            deserializer: traces_deserializer.clone(),
320        })
321        .max_decoding_message_size(usize::MAX);
322
323        let mut builder = RoutesBuilder::default();
324        builder
325            .add_service(log_service)
326            .add_service(metrics_service)
327            .add_service(trace_service);
328
329        let grpc_source = run_grpc_server_with_routes(
330            self.grpc.address,
331            grpc_tls_settings,
332            builder.routes(),
333            self.grpc.keepalive.clone(),
334            cx.shutdown.clone(),
335        )
336        .map_err(|error| {
337            error!(message = "OpenTelemetry source gRPC server failed.", %error);
338        });
339
340        let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
341        let protocol = http_tls_settings.http_protocol_name();
342        let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
343        let headers =
344            build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
345
346        let filters = build_warp_filter(
347            acknowledgements,
348            log_namespace,
349            cx.out,
350            bytes_received,
351            events_received,
352            headers,
353            logs_deserializer,
354            metrics_deserializer,
355            traces_deserializer,
356        );
357
358        let http_source = run_http_server(
359            self.http.address,
360            http_tls_settings,
361            filters,
362            cx.shutdown,
363            self.http.keepalive.clone(),
364        )
365        .map_err(|error| {
366            error!(message = "OpenTelemetry source HTTP server failed.", %error);
367        });
368
369        Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
370    }
371
372    // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number",
373    // as both are optional and can be converted to/from.
374    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
375        let log_namespace = global_log_namespace.merge(self.log_namespace);
376        let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
377            .with_source_metadata(
378                Self::NAME,
379                Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
380                &owned_value_path!(RESOURCE_KEY),
381                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
382                None,
383            )
384            .with_source_metadata(
385                Self::NAME,
386                Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
387                &owned_value_path!(ATTRIBUTES_KEY),
388                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
389                None,
390            )
391            .with_source_metadata(
392                Self::NAME,
393                Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
394                &owned_value_path!(TRACE_ID_KEY),
395                Kind::bytes().or_undefined(),
396                None,
397            )
398            .with_source_metadata(
399                Self::NAME,
400                Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
401                &owned_value_path!(SPAN_ID_KEY),
402                Kind::bytes().or_undefined(),
403                None,
404            )
405            .with_source_metadata(
406                Self::NAME,
407                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
408                &owned_value_path!(SEVERITY_TEXT_KEY),
409                Kind::bytes().or_undefined(),
410                Some("severity"),
411            )
412            .with_source_metadata(
413                Self::NAME,
414                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
415                &owned_value_path!(SEVERITY_NUMBER_KEY),
416                Kind::integer().or_undefined(),
417                None,
418            )
419            .with_source_metadata(
420                Self::NAME,
421                Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
422                &owned_value_path!(FLAGS_KEY),
423                Kind::integer().or_undefined(),
424                None,
425            )
426            .with_source_metadata(
427                Self::NAME,
428                Some(LegacyKey::Overwrite(owned_value_path!(
429                    DROPPED_ATTRIBUTES_COUNT_KEY
430                ))),
431                &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
432                Kind::integer(),
433                None,
434            )
435            .with_source_metadata(
436                Self::NAME,
437                Some(LegacyKey::Overwrite(owned_value_path!(
438                    OBSERVED_TIMESTAMP_KEY
439                ))),
440                &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
441                Kind::timestamp(),
442                None,
443            )
444            .with_source_metadata(
445                Self::NAME,
446                None,
447                &owned_value_path!("timestamp"),
448                Kind::timestamp(),
449                Some("timestamp"),
450            )
451            .with_standard_vector_source_metadata();
452
453        let schema_definition = match log_namespace {
454            LogNamespace::Vector => {
455                schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
456            }
457            LogNamespace::Legacy => {
458                schema_definition.with_meaning(log_schema().owned_message_path(), "message")
459            }
460        };
461
462        let logs_output = if self.use_otlp_decoding.logs {
463            SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS)
464        } else {
465            SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS)
466        };
467
468        let metrics_output = if self.use_otlp_decoding.metrics {
469            SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
470        } else {
471            SourceOutput::new_metrics().with_port(METRICS)
472        };
473
474        vec![
475            logs_output,
476            metrics_output,
477            SourceOutput::new_traces().with_port(TRACES),
478        ]
479    }
480
481    fn resources(&self) -> Vec<Resource> {
482        vec![
483            Resource::tcp(self.grpc.address),
484            Resource::tcp(self.http.address),
485        ]
486    }
487
488    fn can_acknowledge(&self) -> bool {
489        true
490    }
491}