1use std::net::SocketAddr;
2
3use crate::{
4 config::{
5 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
6 SourceContext, SourceOutput,
7 },
8 http::KeepaliveConfig,
9 serde::bool_or_struct,
10 sources::{
11 Source,
12 http_server::{build_param_matcher, remove_duplicates},
13 opentelemetry::{
14 grpc::Service,
15 http::{build_warp_filter, run_http_server},
16 },
17 util::grpc::{GrpcKeepaliveConfig, run_grpc_server_with_routes},
18 },
19};
20use futures::FutureExt;
21use futures_util::{TryFutureExt, future::join};
22use tonic::transport::server::RoutesBuilder;
23use vector_config::indexmap::IndexSet;
24use vector_lib::{
25 codecs::decoding::{OtlpDeserializer, OtlpSignalType},
26 config::{LegacyKey, LogNamespace, log_schema},
27 configurable::configurable_component,
28 internal_event::{BytesReceived, EventsReceived, Protocol},
29 lookup::{OwnedTargetPath, owned_value_path},
30 opentelemetry::{
31 logs::{
32 ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
33 RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
34 },
35 proto::collector::{
36 logs::v1::logs_service_server::LogsServiceServer,
37 metrics::v1::metrics_service_server::MetricsServiceServer,
38 trace::v1::trace_service_server::TraceServiceServer,
39 },
40 },
41 schema::Definition,
42 tls::{MaybeTlsSettings, TlsEnableableConfig},
43};
44use vrl::value::{Kind, kind::Collection};
45
46pub const LOGS: &str = "logs";
47pub const METRICS: &str = "metrics";
48pub const TRACES: &str = "traces";
49
50#[configurable_component]
52#[derive(Clone, Debug, Default, PartialEq, Eq)]
53#[serde(deny_unknown_fields)]
54pub struct OtlpDecodingConfig {
55 #[serde(default)]
60 pub logs: bool,
61
62 #[serde(default)]
67 pub metrics: bool,
68
69 #[serde(default)]
74 pub traces: bool,
75}
76
77impl From<bool> for OtlpDecodingConfig {
78 fn from(value: bool) -> Self {
84 Self {
85 logs: value,
86 metrics: value,
87 traces: value,
88 }
89 }
90}
91
92impl OtlpDecodingConfig {
93 pub const fn any_enabled(&self) -> bool {
95 self.logs || self.metrics || self.traces
96 }
97
98 pub const fn all_enabled(&self) -> bool {
100 self.logs && self.metrics && self.traces
101 }
102
103 pub const fn is_mixed(&self) -> bool {
105 self.any_enabled() && !self.all_enabled()
106 }
107}
108
109#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields)]
113pub struct OpentelemetryConfig {
114 #[configurable(derived)]
115 pub grpc: GrpcConfig,
116
117 #[configurable(derived)]
118 pub http: HttpConfig,
119
120 #[configurable(derived)]
121 #[serde(default, deserialize_with = "bool_or_struct")]
122 pub acknowledgements: SourceAcknowledgementsConfig,
123
124 #[configurable(metadata(docs::hidden))]
126 #[serde(default)]
127 pub log_namespace: Option<bool>,
128
129 #[serde(default, deserialize_with = "bool_or_struct")]
158 pub use_otlp_decoding: OtlpDecodingConfig,
159}
160
161#[configurable_component]
163#[configurable(metadata(docs::examples = "example_grpc_config()"))]
164#[derive(Clone, Debug)]
165#[serde(deny_unknown_fields)]
166pub struct GrpcConfig {
167 #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
171 pub address: SocketAddr,
172
173 #[configurable(derived)]
174 #[serde(default, skip_serializing_if = "Option::is_none")]
175 pub tls: Option<TlsEnableableConfig>,
176
177 #[configurable(derived)]
178 #[serde(default)]
179 pub keepalive: GrpcKeepaliveConfig,
180}
181
182fn example_grpc_config() -> GrpcConfig {
183 GrpcConfig {
184 address: "0.0.0.0:4317".parse().unwrap(),
185 tls: None,
186 keepalive: GrpcKeepaliveConfig::default(),
187 }
188}
189
190#[configurable_component]
192#[configurable(metadata(docs::examples = "example_http_config()"))]
193#[derive(Clone, Debug)]
194#[serde(deny_unknown_fields)]
195pub struct HttpConfig {
196 #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
200 pub address: SocketAddr,
201
202 #[configurable(derived)]
203 #[serde(default, skip_serializing_if = "Option::is_none")]
204 pub tls: Option<TlsEnableableConfig>,
205
206 #[configurable(derived)]
207 #[serde(default)]
208 pub keepalive: KeepaliveConfig,
209
210 #[serde(default)]
219 #[configurable(metadata(docs::examples = "User-Agent"))]
220 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
221 #[configurable(metadata(docs::examples = "X-*"))]
222 #[configurable(metadata(docs::examples = "*"))]
223 pub headers: Vec<String>,
224}
225
226fn example_http_config() -> HttpConfig {
227 HttpConfig {
228 address: "0.0.0.0:4318".parse().unwrap(),
229 tls: None,
230 keepalive: KeepaliveConfig::default(),
231 headers: vec![],
232 }
233}
234
235impl GenerateConfig for OpentelemetryConfig {
236 fn generate_config() -> toml::Value {
237 toml::Value::try_from(Self {
238 grpc: example_grpc_config(),
239 http: example_http_config(),
240 acknowledgements: Default::default(),
241 log_namespace: None,
242 use_otlp_decoding: OtlpDecodingConfig::default(),
243 })
244 .unwrap()
245 }
246}
247
248impl OpentelemetryConfig {
249 pub(crate) fn get_signal_deserializer(
250 &self,
251 signal_type: OtlpSignalType,
252 ) -> vector_common::Result<Option<OtlpDeserializer>> {
253 let should_use_otlp = match signal_type {
254 OtlpSignalType::Logs => self.use_otlp_decoding.logs,
255 OtlpSignalType::Metrics => self.use_otlp_decoding.metrics,
256 OtlpSignalType::Traces => self.use_otlp_decoding.traces,
257 };
258
259 if should_use_otlp {
260 Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
261 signal_type,
262 ]))))
263 } else {
264 Ok(None)
265 }
266 }
267}
268
269#[async_trait::async_trait]
270#[typetag::serde(name = "opentelemetry")]
271impl SourceConfig for OpentelemetryConfig {
272 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
273 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
274 let events_received = register!(EventsReceived);
275 let log_namespace = cx.log_namespace(self.log_namespace);
276
277 let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
278
279 if self.use_otlp_decoding.is_mixed() {
281 info!(
282 message = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format.",
283 logs_otlp = self.use_otlp_decoding.logs,
284 metrics_otlp = self.use_otlp_decoding.metrics,
285 traces_otlp = self.use_otlp_decoding.traces,
286 );
287 }
288
289 let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
290 let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
291 let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
292
293 let log_service = LogsServiceServer::new(Service {
297 pipeline: cx.out.clone(),
298 acknowledgements,
299 log_namespace,
300 events_received: events_received.clone(),
301 deserializer: logs_deserializer.clone(),
302 })
303 .max_decoding_message_size(usize::MAX);
304
305 let metrics_service = MetricsServiceServer::new(Service {
306 pipeline: cx.out.clone(),
307 acknowledgements,
308 log_namespace,
309 events_received: events_received.clone(),
310 deserializer: metrics_deserializer.clone(),
311 })
312 .max_decoding_message_size(usize::MAX);
313
314 let trace_service = TraceServiceServer::new(Service {
315 pipeline: cx.out.clone(),
316 acknowledgements,
317 log_namespace,
318 events_received: events_received.clone(),
319 deserializer: traces_deserializer.clone(),
320 })
321 .max_decoding_message_size(usize::MAX);
322
323 let mut builder = RoutesBuilder::default();
324 builder
325 .add_service(log_service)
326 .add_service(metrics_service)
327 .add_service(trace_service);
328
329 let grpc_source = run_grpc_server_with_routes(
330 self.grpc.address,
331 grpc_tls_settings,
332 builder.routes(),
333 self.grpc.keepalive.clone(),
334 cx.shutdown.clone(),
335 )
336 .map_err(|error| {
337 error!(message = "OpenTelemetry source gRPC server failed.", %error);
338 });
339
340 let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
341 let protocol = http_tls_settings.http_protocol_name();
342 let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
343 let headers =
344 build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
345
346 let filters = build_warp_filter(
347 acknowledgements,
348 log_namespace,
349 cx.out,
350 bytes_received,
351 events_received,
352 headers,
353 logs_deserializer,
354 metrics_deserializer,
355 traces_deserializer,
356 );
357
358 let http_source = run_http_server(
359 self.http.address,
360 http_tls_settings,
361 filters,
362 cx.shutdown,
363 self.http.keepalive.clone(),
364 )
365 .map_err(|error| {
366 error!(message = "OpenTelemetry source HTTP server failed.", %error);
367 });
368
369 Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
370 }
371
372 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
375 let log_namespace = global_log_namespace.merge(self.log_namespace);
376 let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
377 .with_source_metadata(
378 Self::NAME,
379 Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
380 &owned_value_path!(RESOURCE_KEY),
381 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
382 None,
383 )
384 .with_source_metadata(
385 Self::NAME,
386 Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
387 &owned_value_path!(ATTRIBUTES_KEY),
388 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
389 None,
390 )
391 .with_source_metadata(
392 Self::NAME,
393 Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
394 &owned_value_path!(TRACE_ID_KEY),
395 Kind::bytes().or_undefined(),
396 None,
397 )
398 .with_source_metadata(
399 Self::NAME,
400 Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
401 &owned_value_path!(SPAN_ID_KEY),
402 Kind::bytes().or_undefined(),
403 None,
404 )
405 .with_source_metadata(
406 Self::NAME,
407 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
408 &owned_value_path!(SEVERITY_TEXT_KEY),
409 Kind::bytes().or_undefined(),
410 Some("severity"),
411 )
412 .with_source_metadata(
413 Self::NAME,
414 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
415 &owned_value_path!(SEVERITY_NUMBER_KEY),
416 Kind::integer().or_undefined(),
417 None,
418 )
419 .with_source_metadata(
420 Self::NAME,
421 Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
422 &owned_value_path!(FLAGS_KEY),
423 Kind::integer().or_undefined(),
424 None,
425 )
426 .with_source_metadata(
427 Self::NAME,
428 Some(LegacyKey::Overwrite(owned_value_path!(
429 DROPPED_ATTRIBUTES_COUNT_KEY
430 ))),
431 &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
432 Kind::integer(),
433 None,
434 )
435 .with_source_metadata(
436 Self::NAME,
437 Some(LegacyKey::Overwrite(owned_value_path!(
438 OBSERVED_TIMESTAMP_KEY
439 ))),
440 &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
441 Kind::timestamp(),
442 None,
443 )
444 .with_source_metadata(
445 Self::NAME,
446 None,
447 &owned_value_path!("timestamp"),
448 Kind::timestamp(),
449 Some("timestamp"),
450 )
451 .with_standard_vector_source_metadata();
452
453 let schema_definition = match log_namespace {
454 LogNamespace::Vector => {
455 schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
456 }
457 LogNamespace::Legacy => {
458 schema_definition.with_meaning(log_schema().owned_message_path(), "message")
459 }
460 };
461
462 let logs_output = if self.use_otlp_decoding.logs {
463 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS)
464 } else {
465 SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS)
466 };
467
468 let metrics_output = if self.use_otlp_decoding.metrics {
469 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
470 } else {
471 SourceOutput::new_metrics().with_port(METRICS)
472 };
473
474 vec![
475 logs_output,
476 metrics_output,
477 SourceOutput::new_traces().with_port(TRACES),
478 ]
479 }
480
481 fn resources(&self) -> Vec<Resource> {
482 vec![
483 Resource::tcp(self.grpc.address),
484 Resource::tcp(self.http.address),
485 ]
486 }
487
488 fn can_acknowledge(&self) -> bool {
489 true
490 }
491}