1use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "parquet")]
12use super::format::{ParquetSerializer, ParquetSerializerConfig};
13#[cfg(feature = "syslog")]
14use super::format::{SyslogSerializer, SyslogSerializerConfig};
15use super::{
16 chunking::Chunker,
17 format::{
18 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
19 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
20 GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
21 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
22 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
23 RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
24 },
25 framing::{
26 CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
27 VarintLengthDelimitedEncoderConfig,
28 },
29};
30
31#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(tag = "codec", rename_all = "snake_case")]
35#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
36pub enum SerializerConfig {
37 Avro {
41 avro: AvroSerializerOptions,
43 },
44
45 Cef(
48 CefSerializerConfig,
50 ),
51
52 Csv(CsvSerializerConfig),
57
58 Gelf(GelfSerializerConfig),
75
76 Json(JsonSerializerConfig),
80
81 Logfmt,
85
86 Native,
93
94 NativeJson,
101
102 #[cfg(feature = "opentelemetry")]
110 Otlp,
111
112 Protobuf(ProtobufSerializerConfig),
116
117 RawMessage,
125
126 Text(TextSerializerConfig),
135
136 #[cfg(feature = "syslog")]
139 Syslog(SyslogSerializerConfig),
140}
141
142impl Default for SerializerConfig {
143 fn default() -> Self {
144 Self::Json(JsonSerializerConfig::default())
145 }
146}
147
148#[cfg(feature = "arrow")]
153#[configurable_component]
154#[derive(Clone, Debug)]
155#[serde(tag = "codec", rename_all = "snake_case")]
156#[configurable(metadata(
157 docs::enum_tag_description = "The codec to use for batch encoding events."
158))]
159pub enum BatchSerializerConfig {
160 #[serde(rename = "arrow_stream")]
167 ArrowStream(ArrowStreamSerializerConfig),
168 #[cfg(feature = "parquet")]
172 #[serde(rename = "parquet")]
173 Parquet(ParquetSerializerConfig),
174}
175
176#[cfg(feature = "arrow")]
177impl BatchSerializerConfig {
178 pub fn build_batch_serializer(
180 &self,
181 ) -> Result<super::BatchSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
182 match self {
183 BatchSerializerConfig::ArrowStream(arrow_config) => {
184 let serializer = ArrowStreamSerializer::new(arrow_config.clone())?;
185 Ok(super::BatchSerializer::Arrow(serializer))
186 }
187 #[cfg(feature = "parquet")]
188 BatchSerializerConfig::Parquet(parquet_config) => {
189 let serializer = ParquetSerializer::new(parquet_config.clone())?;
190 Ok(super::BatchSerializer::Parquet(Box::new(serializer)))
191 }
192 }
193 }
194
195 pub fn input_type(&self) -> DataType {
197 match self {
198 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
199 #[cfg(feature = "parquet")]
200 BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(),
201 }
202 }
203
204 pub fn schema_requirement(&self) -> schema::Requirement {
206 match self {
207 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
208 #[cfg(feature = "parquet")]
209 BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(),
210 }
211 }
212}
213
214impl From<AvroSerializerConfig> for SerializerConfig {
215 fn from(config: AvroSerializerConfig) -> Self {
216 Self::Avro { avro: config.avro }
217 }
218}
219
220impl From<CefSerializerConfig> for SerializerConfig {
221 fn from(config: CefSerializerConfig) -> Self {
222 Self::Cef(config)
223 }
224}
225
226impl From<CsvSerializerConfig> for SerializerConfig {
227 fn from(config: CsvSerializerConfig) -> Self {
228 Self::Csv(config)
229 }
230}
231
232impl From<GelfSerializerConfig> for SerializerConfig {
233 fn from(config: GelfSerializerConfig) -> Self {
234 Self::Gelf(config)
235 }
236}
237
238impl From<JsonSerializerConfig> for SerializerConfig {
239 fn from(config: JsonSerializerConfig) -> Self {
240 Self::Json(config)
241 }
242}
243
244impl From<LogfmtSerializerConfig> for SerializerConfig {
245 fn from(_: LogfmtSerializerConfig) -> Self {
246 Self::Logfmt
247 }
248}
249
250impl From<NativeSerializerConfig> for SerializerConfig {
251 fn from(_: NativeSerializerConfig) -> Self {
252 Self::Native
253 }
254}
255
256impl From<NativeJsonSerializerConfig> for SerializerConfig {
257 fn from(_: NativeJsonSerializerConfig) -> Self {
258 Self::NativeJson
259 }
260}
261
262#[cfg(feature = "opentelemetry")]
263impl From<OtlpSerializerConfig> for SerializerConfig {
264 fn from(_: OtlpSerializerConfig) -> Self {
265 Self::Otlp
266 }
267}
268
269impl From<ProtobufSerializerConfig> for SerializerConfig {
270 fn from(config: ProtobufSerializerConfig) -> Self {
271 Self::Protobuf(config)
272 }
273}
274
275impl From<RawMessageSerializerConfig> for SerializerConfig {
276 fn from(_: RawMessageSerializerConfig) -> Self {
277 Self::RawMessage
278 }
279}
280
281impl From<TextSerializerConfig> for SerializerConfig {
282 fn from(config: TextSerializerConfig) -> Self {
283 Self::Text(config)
284 }
285}
286
287impl SerializerConfig {
288 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
290 match self {
291 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
292 AvroSerializerConfig::new(avro.schema.clone()).build()?,
293 )),
294 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
295 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
296 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
297 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
298 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
299 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
300 SerializerConfig::NativeJson => {
301 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
302 }
303 #[cfg(feature = "opentelemetry")]
304 SerializerConfig::Otlp => {
305 Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
306 }
307 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
308 SerializerConfig::RawMessage => {
309 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
310 }
311 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
312 #[cfg(feature = "syslog")]
313 SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
314 }
315 }
316
317 pub fn default_stream_framing(&self) -> FramingConfig {
319 match self {
320 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
332 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
333 }
334 #[cfg(feature = "opentelemetry")]
335 SerializerConfig::Otlp => FramingConfig::Bytes,
336 SerializerConfig::Protobuf(_) => {
337 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
338 }
339 SerializerConfig::Cef(_)
340 | SerializerConfig::Csv(_)
341 | SerializerConfig::Json(_)
342 | SerializerConfig::Logfmt
343 | SerializerConfig::NativeJson
344 | SerializerConfig::RawMessage
345 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
346 #[cfg(feature = "syslog")]
347 SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
348 SerializerConfig::Gelf(_) => {
349 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
350 }
351 }
352 }
353
354 pub fn input_type(&self) -> DataType {
356 match self {
357 SerializerConfig::Avro { avro } => {
358 AvroSerializerConfig::new(avro.schema.clone()).input_type()
359 }
360 SerializerConfig::Cef(config) => config.input_type(),
361 SerializerConfig::Csv(config) => config.input_type(),
362 SerializerConfig::Gelf(config) => config.input_type(),
363 SerializerConfig::Json(config) => config.input_type(),
364 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
365 SerializerConfig::Native => NativeSerializerConfig.input_type(),
366 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
367 #[cfg(feature = "opentelemetry")]
368 SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
369 SerializerConfig::Protobuf(config) => config.input_type(),
370 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
371 SerializerConfig::Text(config) => config.input_type(),
372 #[cfg(feature = "syslog")]
373 SerializerConfig::Syslog(config) => config.input_type(),
374 }
375 }
376
377 pub fn schema_requirement(&self) -> schema::Requirement {
379 match self {
380 SerializerConfig::Avro { avro } => {
381 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
382 }
383 SerializerConfig::Cef(config) => config.schema_requirement(),
384 SerializerConfig::Csv(config) => config.schema_requirement(),
385 SerializerConfig::Gelf(config) => config.schema_requirement(),
386 SerializerConfig::Json(config) => config.schema_requirement(),
387 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
388 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
389 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
390 #[cfg(feature = "opentelemetry")]
391 SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
392 SerializerConfig::Protobuf(config) => config.schema_requirement(),
393 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
394 SerializerConfig::Text(config) => config.schema_requirement(),
395 #[cfg(feature = "syslog")]
396 SerializerConfig::Syslog(config) => config.schema_requirement(),
397 }
398 }
399}
400
401#[derive(Debug, Clone)]
403pub enum Serializer {
404 Avro(AvroSerializer),
406 Cef(CefSerializer),
408 Csv(CsvSerializer),
410 Gelf(GelfSerializer),
412 Json(JsonSerializer),
414 Logfmt(LogfmtSerializer),
416 Native(NativeSerializer),
418 NativeJson(NativeJsonSerializer),
420 #[cfg(feature = "opentelemetry")]
422 Otlp(OtlpSerializer),
423 Protobuf(ProtobufSerializer),
425 RawMessage(RawMessageSerializer),
427 Text(TextSerializer),
429 #[cfg(feature = "syslog")]
431 Syslog(SyslogSerializer),
432}
433
434impl Serializer {
435 pub fn supports_json(&self) -> bool {
437 match self {
438 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
439 Serializer::Avro(_)
440 | Serializer::Cef(_)
441 | Serializer::Csv(_)
442 | Serializer::Logfmt(_)
443 | Serializer::Text(_)
444 | Serializer::Native(_)
445 | Serializer::Protobuf(_)
446 | Serializer::RawMessage(_) => false,
447 #[cfg(feature = "syslog")]
448 Serializer::Syslog(_) => false,
449 #[cfg(feature = "opentelemetry")]
450 Serializer::Otlp(_) => false,
451 }
452 }
453
454 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
461 match self {
462 Serializer::Gelf(serializer) => serializer.to_json_value(event),
463 Serializer::Json(serializer) => serializer.to_json_value(event),
464 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
465 Serializer::Avro(_)
466 | Serializer::Cef(_)
467 | Serializer::Csv(_)
468 | Serializer::Logfmt(_)
469 | Serializer::Text(_)
470 | Serializer::Native(_)
471 | Serializer::Protobuf(_)
472 | Serializer::RawMessage(_) => {
473 panic!("Serializer does not support JSON")
474 }
475 #[cfg(feature = "syslog")]
476 Serializer::Syslog(_) => {
477 panic!("Serializer does not support JSON")
478 }
479 #[cfg(feature = "opentelemetry")]
480 Serializer::Otlp(_) => {
481 panic!("Serializer does not support JSON")
482 }
483 }
484 }
485
486 pub fn chunker(&self) -> Option<Chunker> {
488 match self {
489 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
490 _ => None,
491 }
492 }
493
494 pub const fn is_binary(&self) -> bool {
499 match self {
500 Serializer::RawMessage(_)
501 | Serializer::Avro(_)
502 | Serializer::Native(_)
503 | Serializer::Protobuf(_) => true,
504 #[cfg(feature = "opentelemetry")]
505 Serializer::Otlp(_) => true,
506 #[cfg(feature = "syslog")]
507 Serializer::Syslog(_) => false,
508 Serializer::Cef(_)
509 | Serializer::Csv(_)
510 | Serializer::Logfmt(_)
511 | Serializer::Gelf(_)
512 | Serializer::Json(_)
513 | Serializer::Text(_)
514 | Serializer::NativeJson(_) => false,
515 }
516 }
517}
518
519impl From<AvroSerializer> for Serializer {
520 fn from(serializer: AvroSerializer) -> Self {
521 Self::Avro(serializer)
522 }
523}
524
525impl From<CefSerializer> for Serializer {
526 fn from(serializer: CefSerializer) -> Self {
527 Self::Cef(serializer)
528 }
529}
530
531impl From<CsvSerializer> for Serializer {
532 fn from(serializer: CsvSerializer) -> Self {
533 Self::Csv(serializer)
534 }
535}
536
537impl From<GelfSerializer> for Serializer {
538 fn from(serializer: GelfSerializer) -> Self {
539 Self::Gelf(serializer)
540 }
541}
542
543impl From<JsonSerializer> for Serializer {
544 fn from(serializer: JsonSerializer) -> Self {
545 Self::Json(serializer)
546 }
547}
548
549impl From<LogfmtSerializer> for Serializer {
550 fn from(serializer: LogfmtSerializer) -> Self {
551 Self::Logfmt(serializer)
552 }
553}
554
555impl From<NativeSerializer> for Serializer {
556 fn from(serializer: NativeSerializer) -> Self {
557 Self::Native(serializer)
558 }
559}
560
561impl From<NativeJsonSerializer> for Serializer {
562 fn from(serializer: NativeJsonSerializer) -> Self {
563 Self::NativeJson(serializer)
564 }
565}
566
567#[cfg(feature = "opentelemetry")]
568impl From<OtlpSerializer> for Serializer {
569 fn from(serializer: OtlpSerializer) -> Self {
570 Self::Otlp(serializer)
571 }
572}
573
574impl From<ProtobufSerializer> for Serializer {
575 fn from(serializer: ProtobufSerializer) -> Self {
576 Self::Protobuf(serializer)
577 }
578}
579
580impl From<RawMessageSerializer> for Serializer {
581 fn from(serializer: RawMessageSerializer) -> Self {
582 Self::RawMessage(serializer)
583 }
584}
585
586impl From<TextSerializer> for Serializer {
587 fn from(serializer: TextSerializer) -> Self {
588 Self::Text(serializer)
589 }
590}
591#[cfg(feature = "syslog")]
592impl From<SyslogSerializer> for Serializer {
593 fn from(serializer: SyslogSerializer) -> Self {
594 Self::Syslog(serializer)
595 }
596}
597
598impl tokio_util::codec::Encoder<Event> for Serializer {
599 type Error = vector_common::Error;
600
601 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
602 match self {
603 Serializer::Avro(serializer) => serializer.encode(event, buffer),
604 Serializer::Cef(serializer) => serializer.encode(event, buffer),
605 Serializer::Csv(serializer) => serializer.encode(event, buffer),
606 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
607 Serializer::Json(serializer) => serializer.encode(event, buffer),
608 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
609 Serializer::Native(serializer) => serializer.encode(event, buffer),
610 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
611 #[cfg(feature = "opentelemetry")]
612 Serializer::Otlp(serializer) => serializer.encode(event, buffer),
613 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
614 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
615 Serializer::Text(serializer) => serializer.encode(event, buffer),
616 #[cfg(feature = "syslog")]
617 Serializer::Syslog(serializer) => serializer.encode(event, buffer),
618 }
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn test_serializer_config_default() {
628 let config = SerializerConfig::default();
630 assert!(matches!(config, SerializerConfig::Json(_)));
631 }
632
633 #[test]
634 fn test_serializer_is_binary() {
635 let json_config = JsonSerializerConfig::default();
637 let json_serializer = Serializer::Json(json_config.build());
638 assert!(!json_serializer.is_binary());
639
640 let native_serializer = Serializer::Native(NativeSerializerConfig.build());
641 assert!(native_serializer.is_binary());
642
643 let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
644 assert!(raw_message_serializer.is_binary());
645 }
646
647 #[test]
648 fn test_serializer_supports_json() {
649 let json_config = JsonSerializerConfig::default();
651 let json_serializer = Serializer::Json(json_config.build());
652 assert!(json_serializer.supports_json());
653
654 let text_config = TextSerializerConfig::default();
655 let text_serializer = Serializer::Text(text_config.build());
656 assert!(!text_serializer.supports_json());
657 }
658
659 #[test]
660 fn test_serializer_config_build() {
661 let config = SerializerConfig::Json(JsonSerializerConfig::default());
663 let serializer = config.build();
664 assert!(serializer.is_ok());
665 assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
666 }
667
668 #[test]
669 fn test_serializer_config_default_framing() {
670 let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
672 assert!(matches!(
673 json_config.default_stream_framing(),
674 FramingConfig::NewlineDelimited
675 ));
676
677 let native_config = SerializerConfig::Native;
678 assert!(matches!(
679 native_config.default_stream_framing(),
680 FramingConfig::LengthDelimited(_)
681 ));
682 }
683}