Skip to main content

codecs/encoding/
serializer.rs

1//! Serializer configuration and implementation for encoding structured events as bytes.
2
3use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "parquet")]
12use super::format::{ParquetSerializer, ParquetSerializerConfig};
13#[cfg(feature = "syslog")]
14use super::format::{SyslogSerializer, SyslogSerializerConfig};
15use super::{
16    chunking::Chunker,
17    format::{
18        AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
19        CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
20        GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
21        LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
22        NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
23        RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
24    },
25    framing::{
26        CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
27        VarintLengthDelimitedEncoderConfig,
28    },
29};
30
31/// Serializer configuration.
32#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(tag = "codec", rename_all = "snake_case")]
35#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
36pub enum SerializerConfig {
37    /// Encodes an event as an [Apache Avro][apache_avro] message.
38    ///
39    /// [apache_avro]: https://avro.apache.org/
40    Avro {
41        /// Apache Avro-specific encoder options.
42        avro: AvroSerializerOptions,
43    },
44
45    /// Encodes an event as a CEF (Common Event Format) formatted message.
46    ///
47    Cef(
48        /// Options for the CEF encoder.
49        CefSerializerConfig,
50    ),
51
52    /// Encodes an event as a CSV message.
53    ///
54    /// This codec must be configured with fields to encode.
55    ///
56    Csv(CsvSerializerConfig),
57
58    /// Encodes an event as a [GELF][gelf] message.
59    ///
60    /// This codec is experimental for the following reason:
61    ///
62    /// The GELF specification is more strict than the actual Graylog receiver.
63    /// Vector's encoder currently adheres more strictly to the GELF spec, with
64    /// the exception that some characters such as `@`  are allowed in field names.
65    ///
66    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
67    /// by Graylog and is much more relaxed than the GELF spec.
68    ///
69    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
70    /// the codec might continue to relax the enforcement of the specification.
71    ///
72    /// [gelf]: https://docs.graylog.org/docs/gelf
73    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
74    Gelf(GelfSerializerConfig),
75
76    /// Encodes an event as [JSON][json].
77    ///
78    /// [json]: https://www.json.org/
79    Json(JsonSerializerConfig),
80
81    /// Encodes an event as a [logfmt][logfmt] message.
82    ///
83    /// [logfmt]: https://brandur.org/logfmt
84    Logfmt,
85
86    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
87    ///
88    /// This codec is **[experimental][experimental]**.
89    ///
90    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
91    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
92    Native,
93
94    /// Encodes an event in the [native JSON format][vector_native_json].
95    ///
96    /// This codec is **[experimental][experimental]**.
97    ///
98    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
99    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
100    NativeJson,
101
102    /// Encodes an event in the [OTLP (OpenTelemetry Protocol)][otlp] format.
103    ///
104    /// This codec uses protobuf encoding, which is the recommended format for OTLP.
105    /// The output is suitable for sending to OTLP-compatible endpoints with
106    /// `content-type: application/x-protobuf`.
107    ///
108    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
109    #[cfg(feature = "opentelemetry")]
110    Otlp,
111
112    /// Encodes an event as a [Protobuf][protobuf] message.
113    ///
114    /// [protobuf]: https://protobuf.dev/
115    Protobuf(ProtobufSerializerConfig),
116
117    /// No encoding.
118    ///
119    /// This encoding uses the `message` field of a log event.
120    ///
121    /// Be careful if you are modifying your log events (for example, by using a `remap`
122    /// transform) and removing the message field while doing additional parsing on it, as this
123    /// could lead to the encoding emitting empty strings for the given event.
124    RawMessage,
125
126    /// Plain text encoding.
127    ///
128    /// This encoding uses the `message` field of a log event. For metrics, it uses an
129    /// encoding that resembles the Prometheus export format.
130    ///
131    /// Be careful if you are modifying your log events (for example, by using a `remap`
132    /// transform) and removing the message field while doing additional parsing on it, as this
133    /// could lead to the encoding emitting empty strings for the given event.
134    Text(TextSerializerConfig),
135
136    /// Syslog encoding
137    /// RFC 3164 and 5424 are supported
138    #[cfg(feature = "syslog")]
139    Syslog(SyslogSerializerConfig),
140}
141
142impl Default for SerializerConfig {
143    fn default() -> Self {
144        Self::Json(JsonSerializerConfig::default())
145    }
146}
147
148/// Batch serializer configuration.
149///
150/// Only available when the `arrow` feature is enabled (the `parquet` feature
151/// implies `arrow`); all batch serializers produce columnar Arrow/Parquet output.
152#[cfg(feature = "arrow")]
153#[configurable_component]
154#[derive(Clone, Debug)]
155#[serde(tag = "codec", rename_all = "snake_case")]
156#[configurable(metadata(
157    docs::enum_tag_description = "The codec to use for batch encoding events."
158))]
159pub enum BatchSerializerConfig {
160    /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
161    ///
162    /// This is the streaming variant of the Arrow IPC format, which writes
163    /// a continuous stream of record batches.
164    ///
165    /// [apache_arrow]: https://arrow.apache.org/
166    #[serde(rename = "arrow_stream")]
167    ArrowStream(ArrowStreamSerializerConfig),
168    /// Encodes events in [Apache Parquet][apache_parquet] columnar format.
169    ///
170    /// [apache_parquet]: https://parquet.apache.org/
171    #[cfg(feature = "parquet")]
172    #[serde(rename = "parquet")]
173    Parquet(ParquetSerializerConfig),
174}
175
176#[cfg(feature = "arrow")]
177impl BatchSerializerConfig {
178    /// Build the batch serializer from this configuration.
179    pub fn build_batch_serializer(
180        &self,
181    ) -> Result<super::BatchSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
182        match self {
183            BatchSerializerConfig::ArrowStream(arrow_config) => {
184                let serializer = ArrowStreamSerializer::new(arrow_config.clone())?;
185                Ok(super::BatchSerializer::Arrow(serializer))
186            }
187            #[cfg(feature = "parquet")]
188            BatchSerializerConfig::Parquet(parquet_config) => {
189                let serializer = ParquetSerializer::new(parquet_config.clone())?;
190                Ok(super::BatchSerializer::Parquet(Box::new(serializer)))
191            }
192        }
193    }
194
195    /// The data type of events that are accepted by this batch serializer.
196    pub fn input_type(&self) -> DataType {
197        match self {
198            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
199            #[cfg(feature = "parquet")]
200            BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(),
201        }
202    }
203
204    /// The schema required by the batch serializer.
205    pub fn schema_requirement(&self) -> schema::Requirement {
206        match self {
207            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
208            #[cfg(feature = "parquet")]
209            BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(),
210        }
211    }
212}
213
214impl From<AvroSerializerConfig> for SerializerConfig {
215    fn from(config: AvroSerializerConfig) -> Self {
216        Self::Avro { avro: config.avro }
217    }
218}
219
220impl From<CefSerializerConfig> for SerializerConfig {
221    fn from(config: CefSerializerConfig) -> Self {
222        Self::Cef(config)
223    }
224}
225
226impl From<CsvSerializerConfig> for SerializerConfig {
227    fn from(config: CsvSerializerConfig) -> Self {
228        Self::Csv(config)
229    }
230}
231
232impl From<GelfSerializerConfig> for SerializerConfig {
233    fn from(config: GelfSerializerConfig) -> Self {
234        Self::Gelf(config)
235    }
236}
237
238impl From<JsonSerializerConfig> for SerializerConfig {
239    fn from(config: JsonSerializerConfig) -> Self {
240        Self::Json(config)
241    }
242}
243
244impl From<LogfmtSerializerConfig> for SerializerConfig {
245    fn from(_: LogfmtSerializerConfig) -> Self {
246        Self::Logfmt
247    }
248}
249
250impl From<NativeSerializerConfig> for SerializerConfig {
251    fn from(_: NativeSerializerConfig) -> Self {
252        Self::Native
253    }
254}
255
256impl From<NativeJsonSerializerConfig> for SerializerConfig {
257    fn from(_: NativeJsonSerializerConfig) -> Self {
258        Self::NativeJson
259    }
260}
261
262#[cfg(feature = "opentelemetry")]
263impl From<OtlpSerializerConfig> for SerializerConfig {
264    fn from(_: OtlpSerializerConfig) -> Self {
265        Self::Otlp
266    }
267}
268
269impl From<ProtobufSerializerConfig> for SerializerConfig {
270    fn from(config: ProtobufSerializerConfig) -> Self {
271        Self::Protobuf(config)
272    }
273}
274
275impl From<RawMessageSerializerConfig> for SerializerConfig {
276    fn from(_: RawMessageSerializerConfig) -> Self {
277        Self::RawMessage
278    }
279}
280
281impl From<TextSerializerConfig> for SerializerConfig {
282    fn from(config: TextSerializerConfig) -> Self {
283        Self::Text(config)
284    }
285}
286
287impl SerializerConfig {
288    /// Build the `Serializer` from this configuration.
289    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
290        match self {
291            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
292                AvroSerializerConfig::new(avro.schema.clone()).build()?,
293            )),
294            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
295            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
296            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
297            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
298            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
299            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
300            SerializerConfig::NativeJson => {
301                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
302            }
303            #[cfg(feature = "opentelemetry")]
304            SerializerConfig::Otlp => {
305                Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
306            }
307            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
308            SerializerConfig::RawMessage => {
309                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
310            }
311            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
312            #[cfg(feature = "syslog")]
313            SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
314        }
315    }
316
317    /// Return an appropriate default framer for the given serializer.
318    pub fn default_stream_framing(&self) -> FramingConfig {
319        match self {
320            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
321            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
322            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
323            // our length delimited framer obviously will not do.
324            //
325            // This is OK for now, because the Avro serializer is more ceremonial than anything
326            // else, existing to curry serializer config options to Pulsar's native client, not to
327            // actually serialize the bytes themselves... but we're still exposing this method and
328            // we should do so accurately, even if practically it doesn't need to be.
329            //
330            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
331            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
332                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
333            }
334            #[cfg(feature = "opentelemetry")]
335            SerializerConfig::Otlp => FramingConfig::Bytes,
336            SerializerConfig::Protobuf(_) => {
337                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
338            }
339            SerializerConfig::Cef(_)
340            | SerializerConfig::Csv(_)
341            | SerializerConfig::Json(_)
342            | SerializerConfig::Logfmt
343            | SerializerConfig::NativeJson
344            | SerializerConfig::RawMessage
345            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
346            #[cfg(feature = "syslog")]
347            SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
348            SerializerConfig::Gelf(_) => {
349                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
350            }
351        }
352    }
353
354    /// The data type of events that are accepted by this `Serializer`.
355    pub fn input_type(&self) -> DataType {
356        match self {
357            SerializerConfig::Avro { avro } => {
358                AvroSerializerConfig::new(avro.schema.clone()).input_type()
359            }
360            SerializerConfig::Cef(config) => config.input_type(),
361            SerializerConfig::Csv(config) => config.input_type(),
362            SerializerConfig::Gelf(config) => config.input_type(),
363            SerializerConfig::Json(config) => config.input_type(),
364            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
365            SerializerConfig::Native => NativeSerializerConfig.input_type(),
366            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
367            #[cfg(feature = "opentelemetry")]
368            SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
369            SerializerConfig::Protobuf(config) => config.input_type(),
370            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
371            SerializerConfig::Text(config) => config.input_type(),
372            #[cfg(feature = "syslog")]
373            SerializerConfig::Syslog(config) => config.input_type(),
374        }
375    }
376
377    /// The schema required by the serializer.
378    pub fn schema_requirement(&self) -> schema::Requirement {
379        match self {
380            SerializerConfig::Avro { avro } => {
381                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
382            }
383            SerializerConfig::Cef(config) => config.schema_requirement(),
384            SerializerConfig::Csv(config) => config.schema_requirement(),
385            SerializerConfig::Gelf(config) => config.schema_requirement(),
386            SerializerConfig::Json(config) => config.schema_requirement(),
387            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
388            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
389            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
390            #[cfg(feature = "opentelemetry")]
391            SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
392            SerializerConfig::Protobuf(config) => config.schema_requirement(),
393            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
394            SerializerConfig::Text(config) => config.schema_requirement(),
395            #[cfg(feature = "syslog")]
396            SerializerConfig::Syslog(config) => config.schema_requirement(),
397        }
398    }
399}
400
401/// Serialize structured events as bytes.
402#[derive(Debug, Clone)]
403pub enum Serializer {
404    /// Uses an `AvroSerializer` for serialization.
405    Avro(AvroSerializer),
406    /// Uses a `CefSerializer` for serialization.
407    Cef(CefSerializer),
408    /// Uses a `CsvSerializer` for serialization.
409    Csv(CsvSerializer),
410    /// Uses a `GelfSerializer` for serialization.
411    Gelf(GelfSerializer),
412    /// Uses a `JsonSerializer` for serialization.
413    Json(JsonSerializer),
414    /// Uses a `LogfmtSerializer` for serialization.
415    Logfmt(LogfmtSerializer),
416    /// Uses a `NativeSerializer` for serialization.
417    Native(NativeSerializer),
418    /// Uses a `NativeJsonSerializer` for serialization.
419    NativeJson(NativeJsonSerializer),
420    /// Uses an `OtlpSerializer` for serialization.
421    #[cfg(feature = "opentelemetry")]
422    Otlp(OtlpSerializer),
423    /// Uses a `ProtobufSerializer` for serialization.
424    Protobuf(ProtobufSerializer),
425    /// Uses a `RawMessageSerializer` for serialization.
426    RawMessage(RawMessageSerializer),
427    /// Uses a `TextSerializer` for serialization.
428    Text(TextSerializer),
429    /// Uses a `SyslogSerializer` for serialization.
430    #[cfg(feature = "syslog")]
431    Syslog(SyslogSerializer),
432}
433
434impl Serializer {
435    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
436    pub fn supports_json(&self) -> bool {
437        match self {
438            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
439            Serializer::Avro(_)
440            | Serializer::Cef(_)
441            | Serializer::Csv(_)
442            | Serializer::Logfmt(_)
443            | Serializer::Text(_)
444            | Serializer::Native(_)
445            | Serializer::Protobuf(_)
446            | Serializer::RawMessage(_) => false,
447            #[cfg(feature = "syslog")]
448            Serializer::Syslog(_) => false,
449            #[cfg(feature = "opentelemetry")]
450            Serializer::Otlp(_) => false,
451        }
452    }
453
454    /// Encode event and represent it as JSON value.
455    ///
456    /// # Panics
457    ///
458    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
459    /// if you need to determine the capability to encode to JSON at runtime.
460    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
461        match self {
462            Serializer::Gelf(serializer) => serializer.to_json_value(event),
463            Serializer::Json(serializer) => serializer.to_json_value(event),
464            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
465            Serializer::Avro(_)
466            | Serializer::Cef(_)
467            | Serializer::Csv(_)
468            | Serializer::Logfmt(_)
469            | Serializer::Text(_)
470            | Serializer::Native(_)
471            | Serializer::Protobuf(_)
472            | Serializer::RawMessage(_) => {
473                panic!("Serializer does not support JSON")
474            }
475            #[cfg(feature = "syslog")]
476            Serializer::Syslog(_) => {
477                panic!("Serializer does not support JSON")
478            }
479            #[cfg(feature = "opentelemetry")]
480            Serializer::Otlp(_) => {
481                panic!("Serializer does not support JSON")
482            }
483        }
484    }
485
486    /// Returns the chunking implementation for the serializer, if any is supported.
487    pub fn chunker(&self) -> Option<Chunker> {
488        match self {
489            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
490            _ => None,
491        }
492    }
493
494    /// Returns whether the serializer produces binary output.
495    ///
496    /// Binary serializers produce raw bytes that should not be interpreted as text,
497    /// while text serializers produce UTF-8 encoded strings.
498    pub const fn is_binary(&self) -> bool {
499        match self {
500            Serializer::RawMessage(_)
501            | Serializer::Avro(_)
502            | Serializer::Native(_)
503            | Serializer::Protobuf(_) => true,
504            #[cfg(feature = "opentelemetry")]
505            Serializer::Otlp(_) => true,
506            #[cfg(feature = "syslog")]
507            Serializer::Syslog(_) => false,
508            Serializer::Cef(_)
509            | Serializer::Csv(_)
510            | Serializer::Logfmt(_)
511            | Serializer::Gelf(_)
512            | Serializer::Json(_)
513            | Serializer::Text(_)
514            | Serializer::NativeJson(_) => false,
515        }
516    }
517}
518
519impl From<AvroSerializer> for Serializer {
520    fn from(serializer: AvroSerializer) -> Self {
521        Self::Avro(serializer)
522    }
523}
524
525impl From<CefSerializer> for Serializer {
526    fn from(serializer: CefSerializer) -> Self {
527        Self::Cef(serializer)
528    }
529}
530
531impl From<CsvSerializer> for Serializer {
532    fn from(serializer: CsvSerializer) -> Self {
533        Self::Csv(serializer)
534    }
535}
536
537impl From<GelfSerializer> for Serializer {
538    fn from(serializer: GelfSerializer) -> Self {
539        Self::Gelf(serializer)
540    }
541}
542
543impl From<JsonSerializer> for Serializer {
544    fn from(serializer: JsonSerializer) -> Self {
545        Self::Json(serializer)
546    }
547}
548
549impl From<LogfmtSerializer> for Serializer {
550    fn from(serializer: LogfmtSerializer) -> Self {
551        Self::Logfmt(serializer)
552    }
553}
554
555impl From<NativeSerializer> for Serializer {
556    fn from(serializer: NativeSerializer) -> Self {
557        Self::Native(serializer)
558    }
559}
560
561impl From<NativeJsonSerializer> for Serializer {
562    fn from(serializer: NativeJsonSerializer) -> Self {
563        Self::NativeJson(serializer)
564    }
565}
566
567#[cfg(feature = "opentelemetry")]
568impl From<OtlpSerializer> for Serializer {
569    fn from(serializer: OtlpSerializer) -> Self {
570        Self::Otlp(serializer)
571    }
572}
573
574impl From<ProtobufSerializer> for Serializer {
575    fn from(serializer: ProtobufSerializer) -> Self {
576        Self::Protobuf(serializer)
577    }
578}
579
580impl From<RawMessageSerializer> for Serializer {
581    fn from(serializer: RawMessageSerializer) -> Self {
582        Self::RawMessage(serializer)
583    }
584}
585
586impl From<TextSerializer> for Serializer {
587    fn from(serializer: TextSerializer) -> Self {
588        Self::Text(serializer)
589    }
590}
591#[cfg(feature = "syslog")]
592impl From<SyslogSerializer> for Serializer {
593    fn from(serializer: SyslogSerializer) -> Self {
594        Self::Syslog(serializer)
595    }
596}
597
598impl tokio_util::codec::Encoder<Event> for Serializer {
599    type Error = vector_common::Error;
600
601    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
602        match self {
603            Serializer::Avro(serializer) => serializer.encode(event, buffer),
604            Serializer::Cef(serializer) => serializer.encode(event, buffer),
605            Serializer::Csv(serializer) => serializer.encode(event, buffer),
606            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
607            Serializer::Json(serializer) => serializer.encode(event, buffer),
608            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
609            Serializer::Native(serializer) => serializer.encode(event, buffer),
610            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
611            #[cfg(feature = "opentelemetry")]
612            Serializer::Otlp(serializer) => serializer.encode(event, buffer),
613            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
614            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
615            Serializer::Text(serializer) => serializer.encode(event, buffer),
616            #[cfg(feature = "syslog")]
617            Serializer::Syslog(serializer) => serializer.encode(event, buffer),
618        }
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[test]
627    fn test_serializer_config_default() {
628        // SerializerConfig should default to Json
629        let config = SerializerConfig::default();
630        assert!(matches!(config, SerializerConfig::Json(_)));
631    }
632
633    #[test]
634    fn test_serializer_is_binary() {
635        // Test that is_binary correctly identifies binary serializers
636        let json_config = JsonSerializerConfig::default();
637        let json_serializer = Serializer::Json(json_config.build());
638        assert!(!json_serializer.is_binary());
639
640        let native_serializer = Serializer::Native(NativeSerializerConfig.build());
641        assert!(native_serializer.is_binary());
642
643        let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
644        assert!(raw_message_serializer.is_binary());
645    }
646
647    #[test]
648    fn test_serializer_supports_json() {
649        // Test that supports_json correctly identifies JSON-capable serializers
650        let json_config = JsonSerializerConfig::default();
651        let json_serializer = Serializer::Json(json_config.build());
652        assert!(json_serializer.supports_json());
653
654        let text_config = TextSerializerConfig::default();
655        let text_serializer = Serializer::Text(text_config.build());
656        assert!(!text_serializer.supports_json());
657    }
658
659    #[test]
660    fn test_serializer_config_build() {
661        // Test that SerializerConfig can be built successfully
662        let config = SerializerConfig::Json(JsonSerializerConfig::default());
663        let serializer = config.build();
664        assert!(serializer.is_ok());
665        assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
666    }
667
668    #[test]
669    fn test_serializer_config_default_framing() {
670        // Test that default framing is appropriate for each serializer type
671        let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
672        assert!(matches!(
673            json_config.default_stream_framing(),
674            FramingConfig::NewlineDelimited
675        ));
676
677        let native_config = SerializerConfig::Native;
678        assert!(matches!(
679            native_config.default_stream_framing(),
680            FramingConfig::LengthDelimited(_)
681        ));
682    }
683}