Skip to main content

vector/sources/
syslog.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{
14        BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15        decoding::{Deserializer, Framer},
16    },
17    config::{LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
20    ipallowlist::IpAllowlistConfig,
21    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
22};
23use vrl::event_path;
24
25#[cfg(unix)]
26use crate::sources::util::build_unix_stream_source;
27use crate::{
28    SourceSender,
29    codecs::Decoder,
30    config::{
31        DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
32    },
33    event::Event,
34    internal_events::{
35        SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
36    },
37    net,
38    shutdown::ShutdownSignal,
39    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
40    tcp::TcpKeepaliveConfig,
41    tls::{MaybeTlsSettings, TlsSourceConfig},
42};
43
44/// Configuration for the `syslog` source.
45#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
46#[derive(Clone, Debug)]
47pub struct SyslogConfig {
48    #[serde(flatten)]
49    mode: Mode,
50
51    /// The maximum buffer size of incoming messages, in bytes.
52    ///
53    /// Messages larger than this are truncated.
54    #[serde(default = "crate::serde::default_max_length")]
55    #[configurable(metadata(docs::type_unit = "bytes"))]
56    max_length: usize,
57
58    /// Overrides the name of the log field used to add the peer host to each event.
59    ///
60    /// If using TCP or UDP, the value is the peer host's address, including the port. For example, `1.2.3.4:9000`. If using
61    /// UDS, the value is the socket path itself.
62    ///
63    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
64    ///
65    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
66    host_key: Option<OptionalValuePath>,
67
68    /// The namespace to use for logs. This overrides the global setting.
69    #[configurable(metadata(docs::hidden))]
70    #[serde(default)]
71    pub log_namespace: Option<bool>,
72}
73
74/// Listener mode for the `syslog` source.
75#[configurable_component]
76#[derive(Clone, Debug)]
77#[serde(tag = "mode", rename_all = "snake_case")]
78#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
79#[allow(clippy::large_enum_variant)]
80pub enum Mode {
81    /// Listen on TCP.
82    Tcp {
83        #[configurable(derived)]
84        address: SocketListenAddr,
85
86        #[configurable(derived)]
87        keepalive: Option<TcpKeepaliveConfig>,
88
89        #[configurable(derived)]
90        permit_origin: Option<IpAllowlistConfig>,
91
92        #[configurable(derived)]
93        tls: Option<TlsSourceConfig>,
94
95        /// The size of the receive buffer used for each connection.
96        ///
97        /// This should not typically needed to be changed.
98        #[configurable(metadata(docs::type_unit = "bytes"))]
99        receive_buffer_bytes: Option<usize>,
100
101        /// The maximum number of TCP connections that are allowed at any given time.
102        connection_limit: Option<u32>,
103    },
104
105    /// Listen on UDP.
106    Udp {
107        #[configurable(derived)]
108        address: SocketListenAddr,
109
110        /// The size of the receive buffer used for the listening socket.
111        ///
112        /// This should not typically needed to be changed.
113        #[configurable(metadata(docs::type_unit = "bytes"))]
114        receive_buffer_bytes: Option<usize>,
115    },
116
117    /// Listen on UDS (Unix domain socket). This only supports Unix stream sockets.
118    ///
119    /// For Unix datagram sockets, use the `socket` source instead.
120    #[cfg(unix)]
121    Unix {
122        /// The Unix socket path.
123        ///
124        /// This should be an absolute path.
125        #[configurable(metadata(docs::examples = "/path/to/socket"))]
126        path: PathBuf,
127
128        /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
129        ///
130        /// The file mode value can be specified in any numeric format supported by your configuration
131        /// language, but it is most intuitive to use an octal number.
132        socket_file_mode: Option<u32>,
133    },
134}
135
136impl SyslogConfig {
137    #[cfg(test)]
138    pub fn from_mode(mode: Mode) -> Self {
139        Self {
140            mode,
141            host_key: None,
142            max_length: crate::serde::default_max_length(),
143            log_namespace: None,
144        }
145    }
146}
147
148impl Default for SyslogConfig {
149    fn default() -> Self {
150        Self {
151            mode: Mode::Tcp {
152                address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
153                keepalive: None,
154                permit_origin: None,
155                tls: None,
156                receive_buffer_bytes: None,
157                connection_limit: None,
158            },
159            host_key: None,
160            max_length: crate::serde::default_max_length(),
161            log_namespace: None,
162        }
163    }
164}
165
166impl GenerateConfig for SyslogConfig {
167    fn generate_config() -> toml::Value {
168        toml::Value::try_from(SyslogConfig::default()).unwrap()
169    }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "syslog")]
174impl SourceConfig for SyslogConfig {
175    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
176        let log_namespace = cx.log_namespace(self.log_namespace);
177        let host_key = self
178            .host_key
179            .clone()
180            .and_then(|k| k.path)
181            .or(log_schema().host_key().cloned());
182
183        match self.mode.clone() {
184            Mode::Tcp {
185                address,
186                keepalive,
187                permit_origin,
188                tls,
189                receive_buffer_bytes,
190                connection_limit,
191            } => {
192                let source = SyslogTcpSource {
193                    max_length: self.max_length,
194                    host_key,
195                    log_namespace,
196                };
197                let shutdown_secs = Duration::from_secs(30);
198                let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
199                let tls_client_metadata_key = tls
200                    .as_ref()
201                    .and_then(|tls| tls.client_metadata_key.clone())
202                    .and_then(|k| k.path);
203                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
204                source.run(
205                    address,
206                    keepalive,
207                    shutdown_secs,
208                    tls,
209                    tls_client_metadata_key,
210                    receive_buffer_bytes,
211                    None,
212                    cx,
213                    false.into(),
214                    connection_limit,
215                    permit_origin.map(Into::into),
216                    SyslogConfig::NAME,
217                    log_namespace,
218                )
219            }
220            Mode::Udp {
221                address,
222                receive_buffer_bytes,
223            } => Ok(udp(
224                address,
225                self.max_length,
226                host_key,
227                receive_buffer_bytes,
228                cx.shutdown,
229                log_namespace,
230                cx.out,
231            )),
232            #[cfg(unix)]
233            Mode::Unix {
234                path,
235                socket_file_mode,
236            } => {
237                let decoder = Decoder::new(
238                    Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
239                        self.max_length,
240                    )),
241                    Deserializer::Syslog(
242                        SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
243                    ),
244                );
245
246                build_unix_stream_source(
247                    path,
248                    socket_file_mode,
249                    decoder,
250                    move |events, host| handle_events(events, &host_key, host, log_namespace),
251                    cx.shutdown,
252                    cx.out,
253                )
254            }
255        }
256    }
257
258    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
259        let log_namespace = global_log_namespace.merge(self.log_namespace);
260        let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
261            .schema_definition(log_namespace)
262            .with_standard_vector_source_metadata();
263
264        vec![SourceOutput::new_maybe_logs(
265            DataType::Log,
266            schema_definition,
267        )]
268    }
269
270    fn resources(&self) -> Vec<Resource> {
271        match self.mode.clone() {
272            Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
273            Mode::Udp { address, .. } => vec![address.as_udp_resource()],
274            #[cfg(unix)]
275            Mode::Unix { .. } => vec![],
276        }
277    }
278
279    fn can_acknowledge(&self) -> bool {
280        false
281    }
282}
283
284#[derive(Debug, Clone)]
285struct SyslogTcpSource {
286    max_length: usize,
287    host_key: Option<OwnedValuePath>,
288    log_namespace: LogNamespace,
289}
290
291impl TcpSource for SyslogTcpSource {
292    type Error = vector_lib::codecs::decoding::Error;
293    type Item = SmallVec<[Event; 1]>;
294    type Decoder = Decoder;
295    type Acker = TcpNullAcker;
296
297    fn decoder(&self) -> Self::Decoder {
298        Decoder::new(
299            Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
300            Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
301        )
302    }
303
304    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
305        handle_events(
306            events,
307            &self.host_key,
308            Some(host.ip().to_string().into()),
309            self.log_namespace,
310        );
311    }
312
313    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
314        TcpNullAcker
315    }
316}
317
318pub fn udp(
319    addr: SocketListenAddr,
320    _max_length: usize,
321    host_key: Option<OwnedValuePath>,
322    receive_buffer_bytes: Option<usize>,
323    shutdown: ShutdownSignal,
324    log_namespace: LogNamespace,
325    mut out: SourceSender,
326) -> super::Source {
327    Box::pin(async move {
328        let listenfd = ListenFd::from_env();
329        let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
330            emit!(SocketBindError {
331                mode: SocketMode::Udp,
332                error: &error,
333            })
334        })?;
335
336        if let Some(receive_buffer_bytes) = receive_buffer_bytes
337            && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
338        {
339            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
340        }
341
342        info!(
343            message = "Listening.",
344            addr = %addr,
345            r#type = "udp"
346        );
347
348        let bytes_received = register!(BytesReceived::from(Protocol::UDP));
349
350        let mut stream = UdpFramed::new(
351            socket,
352            Decoder::new(
353                Framer::Bytes(BytesDecoder::new()),
354                Deserializer::Syslog(
355                    SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
356                ),
357            ),
358        )
359        .take_until(shutdown)
360        .filter_map(|frame| {
361            let host_key = host_key.clone();
362            let bytes_received = bytes_received.clone();
363            async move {
364                match frame {
365                    Ok(((mut events, byte_size), received_from)) => {
366                        let count = events.len();
367                        bytes_received.emit(ByteSize(byte_size));
368                        emit!(SocketEventsReceived {
369                            mode: SocketMode::Udp,
370                            byte_size: events.estimated_json_encoded_size_of(),
371                            count,
372                        });
373                        let received_from = received_from.ip().to_string().into();
374                        handle_events(&mut events, &host_key, Some(received_from), log_namespace);
375                        Some(events.remove(0))
376                    }
377                    Err(error) => {
378                        emit!(SocketReceiveError {
379                            mode: SocketMode::Udp,
380                            error: &error,
381                        });
382                        None
383                    }
384                }
385            }
386        })
387        .boxed();
388
389        match out.send_event_stream(&mut stream).await {
390            Ok(()) => {
391                debug!("Finished sending.");
392                Ok(())
393            }
394            Err(_) => {
395                let (count, _) = stream.size_hint();
396                emit!(StreamClosedError { count });
397                Err(())
398            }
399        }
400    })
401}
402
403fn handle_events(
404    events: &mut [Event],
405    host_key: &Option<OwnedValuePath>,
406    default_host: Option<Bytes>,
407    log_namespace: LogNamespace,
408) {
409    for event in events {
410        enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
411    }
412}
413
414fn enrich_syslog_event(
415    event: &mut Event,
416    host_key: &Option<OwnedValuePath>,
417    default_host: Option<Bytes>,
418    log_namespace: LogNamespace,
419) {
420    let log = event.as_mut_log();
421
422    if let Some(default_host) = &default_host {
423        log_namespace.insert_source_metadata(
424            SyslogConfig::NAME,
425            log,
426            Some(LegacyKey::Overwrite(path!("source_ip"))),
427            path!("source_ip"),
428            default_host.clone(),
429        );
430    }
431
432    let parsed_hostname = log
433        .get(event_path!("hostname"))
434        .map(|hostname| hostname.coerce_to_bytes());
435
436    if let Some(parsed_host) = parsed_hostname.or(default_host) {
437        let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
438
439        log_namespace.insert_source_metadata(
440            SyslogConfig::NAME,
441            log,
442            legacy_host_key,
443            path!("host"),
444            parsed_host,
445        );
446    }
447
448    log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
449
450    if log_namespace == LogNamespace::Legacy {
451        let timestamp = log
452            .get(event_path!("timestamp"))
453            .and_then(|timestamp| timestamp.as_timestamp().cloned())
454            .unwrap_or_else(Utc::now);
455        log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
456    }
457
458    trace!(
459        message = "Processing one event.",
460        event = ?event
461    );
462}
463
464#[cfg(test)]
465mod test {
466    use std::{collections::HashMap, fmt, str::FromStr};
467
468    use chrono::prelude::*;
469    use indoc::indoc;
470    use rand::{Rng, rng};
471    use serde::Deserialize;
472    use tokio::time::{Duration, Instant, sleep};
473    use tokio_util::codec::BytesCodec;
474    use vector_lib::{
475        assert_event_data_eq,
476        codecs::decoding::format::Deserializer,
477        config::ComponentKey,
478        lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
479        schema::Definition,
480    };
481    use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
482
483    use super::*;
484    use crate::{
485        config::log_schema,
486        event::{Event, LogEvent},
487        test_util::{
488            CountReceiver,
489            addr::next_addr,
490            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
491            random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
492        },
493    };
494
495    fn event_from_bytes(
496        host_key: &str,
497        default_host: Option<Bytes>,
498        bytes: Bytes,
499        log_namespace: LogNamespace,
500    ) -> Option<Event> {
501        let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
502        let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
503        handle_events(
504            &mut events,
505            &Some(owned_value_path!(host_key)),
506            default_host,
507            log_namespace,
508        );
509        Some(events.remove(0))
510    }
511
512    #[test]
513    fn generate_config() {
514        crate::test_util::test_generate_config::<SyslogConfig>();
515    }
516
517    #[test]
518    fn output_schema_definition_vector_namespace() {
519        let config = SyslogConfig {
520            log_namespace: Some(true),
521            ..Default::default()
522        };
523
524        let definitions = config
525            .outputs(LogNamespace::Vector)
526            .remove(0)
527            .schema_definition(true);
528
529        let expected_definition =
530            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
531                .with_meaning(OwnedTargetPath::event_root(), "message")
532                .with_metadata_field(
533                    &owned_value_path!("vector", "source_type"),
534                    Kind::bytes(),
535                    None,
536                )
537                .with_metadata_field(
538                    &owned_value_path!("vector", "ingest_timestamp"),
539                    Kind::timestamp(),
540                    None,
541                )
542                .with_metadata_field(
543                    &owned_value_path!("syslog", "timestamp"),
544                    Kind::timestamp(),
545                    Some("timestamp"),
546                )
547                .with_metadata_field(
548                    &owned_value_path!("syslog", "hostname"),
549                    Kind::bytes().or_undefined(),
550                    Some("host"),
551                )
552                .with_metadata_field(
553                    &owned_value_path!("syslog", "source_ip"),
554                    Kind::bytes().or_undefined(),
555                    None,
556                )
557                .with_metadata_field(
558                    &owned_value_path!("syslog", "severity"),
559                    Kind::bytes().or_undefined(),
560                    Some("severity"),
561                )
562                .with_metadata_field(
563                    &owned_value_path!("syslog", "facility"),
564                    Kind::bytes().or_undefined(),
565                    None,
566                )
567                .with_metadata_field(
568                    &owned_value_path!("syslog", "version"),
569                    Kind::integer().or_undefined(),
570                    None,
571                )
572                .with_metadata_field(
573                    &owned_value_path!("syslog", "appname"),
574                    Kind::bytes().or_undefined(),
575                    Some("service"),
576                )
577                .with_metadata_field(
578                    &owned_value_path!("syslog", "msgid"),
579                    Kind::bytes().or_undefined(),
580                    None,
581                )
582                .with_metadata_field(
583                    &owned_value_path!("syslog", "procid"),
584                    Kind::integer().or_bytes().or_undefined(),
585                    None,
586                )
587                .with_metadata_field(
588                    &owned_value_path!("syslog", "structured_data"),
589                    Kind::object(Collection::from_unknown(Kind::object(
590                        Collection::from_unknown(Kind::bytes()),
591                    ))),
592                    None,
593                )
594                .with_metadata_field(
595                    &owned_value_path!("syslog", "tls_client_metadata"),
596                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
597                    None,
598                );
599
600        assert_eq!(definitions, Some(expected_definition));
601    }
602
603    #[test]
604    fn output_schema_definition_legacy_namespace() {
605        let config = SyslogConfig::default();
606
607        let definitions = config
608            .outputs(LogNamespace::Legacy)
609            .remove(0)
610            .schema_definition(true);
611
612        let expected_definition = Definition::new_with_default_metadata(
613            Kind::object(Collection::empty()),
614            [LogNamespace::Legacy],
615        )
616        .with_event_field(
617            &owned_value_path!("message"),
618            Kind::bytes(),
619            Some("message"),
620        )
621        .with_event_field(
622            &owned_value_path!("timestamp"),
623            Kind::timestamp(),
624            Some("timestamp"),
625        )
626        .with_event_field(
627            &owned_value_path!("hostname"),
628            Kind::bytes().or_undefined(),
629            Some("host"),
630        )
631        .with_event_field(
632            &owned_value_path!("source_ip"),
633            Kind::bytes().or_undefined(),
634            None,
635        )
636        .with_event_field(
637            &owned_value_path!("severity"),
638            Kind::bytes().or_undefined(),
639            Some("severity"),
640        )
641        .with_event_field(
642            &owned_value_path!("facility"),
643            Kind::bytes().or_undefined(),
644            None,
645        )
646        .with_event_field(
647            &owned_value_path!("version"),
648            Kind::integer().or_undefined(),
649            None,
650        )
651        .with_event_field(
652            &owned_value_path!("appname"),
653            Kind::bytes().or_undefined(),
654            Some("service"),
655        )
656        .with_event_field(
657            &owned_value_path!("msgid"),
658            Kind::bytes().or_undefined(),
659            None,
660        )
661        .with_event_field(
662            &owned_value_path!("procid"),
663            Kind::integer().or_bytes().or_undefined(),
664            None,
665        )
666        .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
667        .with_standard_vector_source_metadata();
668
669        assert_eq!(definitions, Some(expected_definition));
670    }
671
672    #[test]
673    fn config_tcp() {
674        let config: SyslogConfig = serde_yaml::from_str(indoc! {
675            r#"
676            mode: tcp
677            address: "127.0.0.1:1235"
678            "#,
679        })
680        .unwrap();
681        assert!(matches!(config.mode, Mode::Tcp { .. }));
682    }
683
684    #[test]
685    fn config_tcp_with_receive_buffer_size() {
686        let config: SyslogConfig = serde_yaml::from_str(indoc! {
687            r#"
688            mode: tcp
689            address: "127.0.0.1:1235"
690            receive_buffer_bytes: 256
691            "#,
692        })
693        .unwrap();
694
695        let receive_buffer_bytes = match config.mode {
696            Mode::Tcp {
697                receive_buffer_bytes,
698                ..
699            } => receive_buffer_bytes,
700            _ => panic!("expected Mode::Tcp"),
701        };
702
703        assert_eq!(receive_buffer_bytes, Some(256));
704    }
705
706    #[test]
707    fn config_tcp_keepalive_empty() {
708        let config: SyslogConfig = serde_yaml::from_str(indoc! {
709            r#"
710            mode: tcp
711            address: "127.0.0.1:1235"
712            "#,
713        })
714        .unwrap();
715
716        let keepalive = match config.mode {
717            Mode::Tcp { keepalive, .. } => keepalive,
718            _ => panic!("expected Mode::Tcp"),
719        };
720
721        assert_eq!(keepalive, None);
722    }
723
724    #[test]
725    fn config_tcp_keepalive_full() {
726        let config: SyslogConfig = serde_yaml::from_str(indoc! {
727            r#"
728            mode: tcp
729            address: "127.0.0.1:1235"
730            keepalive:
731              time_secs: 7200
732            "#,
733        })
734        .unwrap();
735
736        let keepalive = match config.mode {
737            Mode::Tcp { keepalive, .. } => keepalive,
738            _ => panic!("expected Mode::Tcp"),
739        };
740
741        let keepalive = keepalive.expect("keepalive config not set");
742
743        assert_eq!(keepalive.time_secs, Some(7200));
744    }
745
746    #[test]
747    fn config_udp() {
748        let config: SyslogConfig = serde_yaml::from_str(indoc! {
749            r#"
750            mode: udp
751            address: "127.0.0.1:1235"
752            max_length: 32187
753            "#,
754        })
755        .unwrap();
756        assert!(matches!(config.mode, Mode::Udp { .. }));
757    }
758
759    #[test]
760    fn config_udp_with_receive_buffer_size() {
761        let config: SyslogConfig = serde_yaml::from_str(indoc! {
762            r#"
763            mode: udp
764            address: "127.0.0.1:1235"
765            max_length: 32187
766            receive_buffer_bytes: 256
767            "#,
768        })
769        .unwrap();
770
771        let receive_buffer_bytes = match config.mode {
772            Mode::Udp {
773                receive_buffer_bytes,
774                ..
775            } => receive_buffer_bytes,
776            _ => panic!("expected Mode::Udp"),
777        };
778
779        assert_eq!(receive_buffer_bytes, Some(256));
780    }
781
782    #[cfg(unix)]
783    #[test]
784    fn config_unix() {
785        let config: SyslogConfig = serde_yaml::from_str(indoc! {
786            r#"
787            mode: unix
788            path: "127.0.0.1:1235"
789            "#,
790        })
791        .unwrap();
792        assert!(matches!(config.mode, Mode::Unix { .. }));
793    }
794
795    #[cfg(unix)]
796    #[test]
797    fn config_unix_permissions() {
798        let config: SyslogConfig = serde_yaml::from_str(indoc! {
799            r#"
800            mode: unix
801            path: "127.0.0.1:1235"
802            socket_file_mode: 511
803            "#,
804        })
805        .unwrap();
806        let socket_file_mode = match config.mode {
807            Mode::Unix {
808                path: _,
809                socket_file_mode,
810            } => socket_file_mode,
811            _ => panic!("expected Mode::Unix"),
812        };
813
814        assert_eq!(socket_file_mode, Some(0o777));
815    }
816
817    #[test]
818    fn syslog_ng_network_syslog_protocol() {
819        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
820        let msg = "i am foobar";
821        let raw = format!(
822            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
823            r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
824            r#"[origin ip="192.168.0.1" software="test"]"#,
825            msg
826        );
827
828        let mut expected = Event::Log(LogEvent::from(msg));
829
830        {
831            let expected = expected.as_mut_log();
832            expected.insert(
833                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
834                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
835                    .single()
836                    .expect("invalid timestamp"),
837            );
838            expected.insert(
839                log_schema().source_type_key_target_path().unwrap(),
840                "syslog",
841            );
842            expected.insert("host", "74794bfb6795");
843            expected.insert("hostname", "74794bfb6795");
844
845            expected.insert("meta.sequenceId", "1");
846            expected.insert("meta.sysUpTime", "37");
847            expected.insert("meta.language", "EN");
848            expected.insert("origin.software", "test");
849            expected.insert("origin.ip", "192.168.0.1");
850
851            expected.insert("severity", "notice");
852            expected.insert("facility", "user");
853            expected.insert("version", 1);
854            expected.insert("appname", "root");
855            expected.insert("procid", 8449);
856            expected.insert("source_ip", "192.168.0.254");
857        }
858
859        assert_event_data_eq!(
860            event_from_bytes(
861                "host",
862                Some(Bytes::from("192.168.0.254")),
863                raw.into(),
864                LogNamespace::Legacy
865            )
866            .unwrap(),
867            expected
868        );
869    }
870
871    #[test]
872    fn handles_incorrect_sd_element() {
873        let msg = "qwerty";
874        let raw = format!(
875            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
876            r"[incorrect x]", msg
877        );
878
879        let mut expected = Event::Log(LogEvent::from(msg));
880        {
881            let expected = expected.as_mut_log();
882            expected.insert(
883                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
884                Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
885                    .single()
886                    .expect("invalid timestamp"),
887            );
888            expected.insert(
889                log_schema().host_key().unwrap().to_string().as_str(),
890                "74794bfb6795",
891            );
892            expected.insert("hostname", "74794bfb6795");
893            expected.insert(
894                log_schema().source_type_key_target_path().unwrap(),
895                "syslog",
896            );
897            expected.insert("severity", "notice");
898            expected.insert("facility", "user");
899            expected.insert("version", 1);
900            expected.insert("appname", "root");
901            expected.insert("procid", 8449);
902            expected.insert("source_ip", "192.168.0.254");
903        }
904
905        let event = event_from_bytes(
906            "host",
907            Some(Bytes::from("192.168.0.254")),
908            raw.into(),
909            LogNamespace::Legacy,
910        )
911        .unwrap();
912        assert_event_data_eq!(event, expected);
913
914        let raw = format!(
915            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
916            r"[incorrect x=]", msg
917        );
918
919        let event = event_from_bytes(
920            "host",
921            Some(Bytes::from("192.168.0.254")),
922            raw.into(),
923            LogNamespace::Legacy,
924        )
925        .unwrap();
926        assert_event_data_eq!(event, expected);
927    }
928
929    #[test]
930    fn handles_empty_sd_element() {
931        fn there_is_map_called_empty(event: Event) -> bool {
932            event
933                .as_log()
934                .get("empty")
935                .expect("empty exists")
936                .is_object()
937        }
938
939        let msg = format!(
940            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
941            r"[empty]"
942        );
943
944        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
945        assert!(there_is_map_called_empty(event));
946
947        let msg = format!(
948            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
949            r#"[non_empty x="1"][empty]"#
950        );
951
952        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
953        assert!(there_is_map_called_empty(event));
954
955        let msg = format!(
956            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
957            r#"[empty][non_empty x="1"]"#
958        );
959
960        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
961        assert!(there_is_map_called_empty(event));
962
963        let msg = format!(
964            r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
965            r#"[empty not_really="testing the test"]"#
966        );
967
968        let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
969        assert!(there_is_map_called_empty(event));
970    }
971
972    #[test]
973    fn handles_weird_whitespace() {
974        // this should also match rsyslog omfwd with template=RSYSLOG_SyslogProtocol23Format
975        let raw = r#"
976            <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
977            "#;
978        let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
979
980        assert_event_data_eq!(
981            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
982            event_from_bytes(
983                "host",
984                None,
985                cleaned.to_owned().into(),
986                LogNamespace::Legacy
987            )
988            .unwrap()
989        );
990    }
991
992    #[test]
993    fn handles_dots_in_sdata() {
994        let raw =
995            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin foo.bar="baz"] hello"#;
996        let event =
997            event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
998        assert_eq!(
999            event.as_log().get(r#"origin."foo.bar""#),
1000            Some(&Value::from("baz"))
1001        );
1002    }
1003
1004    #[test]
1005    fn syslog_ng_default_network() {
1006        let msg = "i am foobar";
1007        let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
1008        let event = event_from_bytes(
1009            "host",
1010            Some(Bytes::from("192.168.0.254")),
1011            raw.into(),
1012            LogNamespace::Legacy,
1013        )
1014        .unwrap();
1015
1016        let mut expected = Event::Log(LogEvent::from(msg));
1017        {
1018            let value = event.as_log().get("timestamp").unwrap();
1019            let year = value.as_timestamp().unwrap().naive_local().year();
1020
1021            let expected = expected.as_mut_log();
1022            let expected_date: DateTime<Utc> = Local
1023                .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1024                .single()
1025                .expect("invalid timestamp")
1026                .into();
1027
1028            expected.insert(
1029                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1030                expected_date,
1031            );
1032            expected.insert(
1033                log_schema().host_key().unwrap().to_string().as_str(),
1034                "74794bfb6795",
1035            );
1036            expected.insert(
1037                log_schema().source_type_key_target_path().unwrap(),
1038                "syslog",
1039            );
1040            expected.insert("hostname", "74794bfb6795");
1041            expected.insert("severity", "notice");
1042            expected.insert("facility", "user");
1043            expected.insert("appname", "root");
1044            expected.insert("procid", 8539);
1045            expected.insert("source_ip", "192.168.0.254");
1046        }
1047
1048        assert_event_data_eq!(event, expected);
1049    }
1050
1051    #[test]
1052    fn rsyslog_omfwd_tcp_default() {
1053        let msg = "start";
1054        let raw = format!(
1055            r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1056        );
1057        let event = event_from_bytes(
1058            "host",
1059            Some(Bytes::from("192.168.0.254")),
1060            raw.into(),
1061            LogNamespace::Legacy,
1062        )
1063        .unwrap();
1064
1065        let mut expected = Event::Log(LogEvent::from(msg));
1066        {
1067            let value = event.as_log().get("timestamp").unwrap();
1068            let year = value.as_timestamp().unwrap().naive_local().year();
1069
1070            let expected = expected.as_mut_log();
1071            let expected_date: DateTime<Utc> = Local
1072                .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1073                .single()
1074                .expect("invalid timestamp")
1075                .into();
1076            expected.insert(
1077                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1078                expected_date,
1079            );
1080            expected.insert(
1081                log_schema().source_type_key_target_path().unwrap(),
1082                "syslog",
1083            );
1084            expected.insert("host", "74794bfb6795");
1085            expected.insert("hostname", "74794bfb6795");
1086            expected.insert("severity", "info");
1087            expected.insert("facility", "local7");
1088            expected.insert("appname", "liblogging-stdlog");
1089            expected.insert("origin.software", "rsyslogd");
1090            expected.insert("origin.swVersion", "8.24.0");
1091            expected.insert("source_ip", "192.168.0.254");
1092            expected.insert(event_path!("origin", "x-pid"), "8979");
1093            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1094        }
1095
1096        assert_event_data_eq!(event, expected);
1097    }
1098
1099    #[test]
1100    fn rsyslog_omfwd_tcp_forward_format() {
1101        let msg = "start";
1102        let raw = format!(
1103            r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog:  [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1104        );
1105
1106        let mut expected = Event::Log(LogEvent::from(msg));
1107        {
1108            let expected = expected.as_mut_log();
1109            expected.insert(
1110                (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1111                Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1112                    .single()
1113                    .and_then(|t| t.with_nanosecond(605_850 * 1000))
1114                    .expect("invalid timestamp"),
1115            );
1116            expected.insert(
1117                log_schema().source_type_key_target_path().unwrap(),
1118                "syslog",
1119            );
1120            expected.insert("host", "74794bfb6795");
1121            expected.insert("hostname", "74794bfb6795");
1122            expected.insert("severity", "info");
1123            expected.insert("facility", "local7");
1124            expected.insert("appname", "liblogging-stdlog");
1125            expected.insert("origin.software", "rsyslogd");
1126            expected.insert("origin.swVersion", "8.24.0");
1127            expected.insert(event_path!("origin", "x-pid"), "9043");
1128            expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1129        }
1130
1131        assert_event_data_eq!(
1132            event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1133            expected
1134        );
1135    }
1136
1137    #[tokio::test]
1138    async fn test_tcp_syslog() {
1139        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1140            let num_messages: usize = 10000;
1141            let (_guard, in_addr) = next_addr();
1142
1143            // Create and spawn the source.
1144            let config = SyslogConfig::from_mode(Mode::Tcp {
1145                address: in_addr.into(),
1146                permit_origin: None,
1147                keepalive: None,
1148                tls: None,
1149                receive_buffer_bytes: None,
1150                connection_limit: None,
1151            });
1152
1153            let key = ComponentKey::from("in");
1154            let (tx, rx) = SourceSender::new_test();
1155            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1156            let shutdown_complete = shutdown.shutdown_tripwire();
1157
1158            let source = config
1159                .build(context)
1160                .await
1161                .expect("source should not fail to build");
1162            tokio::spawn(source);
1163
1164            // Wait for source to become ready to accept traffic.
1165            wait_for_tcp(in_addr).await;
1166
1167            let output_events = CountReceiver::receive_events(rx);
1168
1169            // Now craft and send syslog messages to the source, and collect them on the other side.
1170            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1171                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1172                .collect();
1173
1174            let input_lines: Vec<String> =
1175                input_messages.iter().map(|msg| msg.to_string()).collect();
1176
1177            send_lines(in_addr, input_lines).await.unwrap();
1178
1179            // Wait a short period of time to ensure the messages get sent.
1180            sleep(Duration::from_secs(2)).await;
1181
1182            // Shutdown the source, and make sure we've got all the messages we sent in.
1183            shutdown
1184                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1185                .await;
1186            shutdown_complete.await;
1187
1188            let output_events = output_events.await;
1189            assert_eq!(output_events.len(), num_messages);
1190
1191            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1192                .into_iter()
1193                .map(|mut e| {
1194                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1195                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1196                    e.into()
1197                })
1198                .collect();
1199            assert_eq!(output_messages, input_messages);
1200        })
1201        .await;
1202    }
1203
1204    #[tokio::test]
1205    async fn test_udp_syslog() {
1206        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1207            let num_messages: usize = 1000;
1208            let (_guard, in_addr) = next_addr();
1209
1210            // Create and spawn the source.
1211            let config = SyslogConfig::from_mode(Mode::Udp {
1212                address: in_addr.into(),
1213                receive_buffer_bytes: Some(4 * 1024 * 1024),
1214            });
1215
1216            let key = ComponentKey::from("in");
1217            let (tx, rx) = SourceSender::new_test();
1218            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1219            let shutdown_complete = shutdown.shutdown_tripwire();
1220
1221            let source = config
1222                .build(context)
1223                .await
1224                .expect("source should not fail to build");
1225            tokio::spawn(source);
1226
1227            // Give UDP a brief moment to start listening.
1228            sleep(Duration::from_millis(150)).await;
1229
1230            let output_events = CountReceiver::receive_events(rx);
1231
1232            // Craft and send syslog messages as individual UDP datagrams.
1233            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1234                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1235                .collect();
1236
1237            let input_lines: Vec<String> =
1238                input_messages.iter().map(|msg| msg.to_string()).collect();
1239
1240            let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
1241            for line in input_lines {
1242                socket.send_to(line.as_bytes(), in_addr).await.unwrap();
1243            }
1244
1245            // Wait a short period of time to ensure the messages get sent.
1246            sleep(Duration::from_secs(2)).await;
1247
1248            // Shutdown the source, and make sure we've got all the messages we sent in.
1249            shutdown
1250                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1251                .await;
1252            shutdown_complete.await;
1253
1254            let output_events = output_events.await;
1255            assert_eq!(output_events.len(), num_messages);
1256
1257            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1258                .into_iter()
1259                .map(|mut e| {
1260                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1261                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1262                    e.into()
1263                })
1264                .collect();
1265            assert_eq!(output_messages, input_messages);
1266        })
1267        .await;
1268    }
1269
1270    #[cfg(unix)]
1271    #[tokio::test]
1272    async fn test_unix_stream_syslog() {
1273        use std::os::unix::net::UnixStream as StdUnixStream;
1274
1275        use futures_util::{SinkExt, stream};
1276        use tokio::{io::AsyncWriteExt, net::UnixStream};
1277        use tokio_util::codec::{FramedWrite, LinesCodec};
1278
1279        use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1280
1281        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1282            let num_messages: usize = 1;
1283            let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1284
1285            // Create and spawn the source.
1286            let config = SyslogConfig::from_mode(Mode::Unix {
1287                path: in_path.clone(),
1288                socket_file_mode: None,
1289            });
1290
1291            let key = ComponentKey::from("in");
1292            let (tx, rx) = SourceSender::new_test();
1293            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1294            let shutdown_complete = shutdown.shutdown_tripwire();
1295
1296            let source = config
1297                .build(context)
1298                .await
1299                .expect("source should not fail to build");
1300            tokio::spawn(source);
1301
1302            // Wait for source to become ready to accept traffic.
1303            while StdUnixStream::connect(&in_path).is_err() {
1304                tokio::task::yield_now().await;
1305            }
1306
1307            let output_events = CountReceiver::receive_events(rx);
1308
1309            // Now craft and send syslog messages to the source, and collect them on the other side.
1310            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1311                .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1312                .collect();
1313
1314            let stream = UnixStream::connect(&in_path).await.unwrap();
1315            let mut sink = FramedWrite::new(stream, LinesCodec::new());
1316
1317            let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1318            let mut lines = stream::iter(lines).map(Ok);
1319            sink.send_all(&mut lines).await.unwrap();
1320
1321            let stream = sink.get_mut();
1322            stream.shutdown().await.unwrap();
1323
1324            // Wait a short period of time to ensure the messages get sent.
1325            sleep(Duration::from_secs(1)).await;
1326
1327            shutdown
1328                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1329                .await;
1330            shutdown_complete.await;
1331
1332            let output_events = output_events.await;
1333            assert_eq!(output_events.len(), num_messages);
1334
1335            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1336                .into_iter()
1337                .map(|mut e| {
1338                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1339                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1340                    e.into()
1341                })
1342                .collect();
1343            assert_eq!(output_messages, input_messages);
1344        })
1345        .await;
1346    }
1347
1348    #[tokio::test]
1349    async fn test_octet_counting_syslog() {
1350        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1351            let num_messages: usize = 10000;
1352            let (_guard, in_addr) = next_addr();
1353
1354            // Create and spawn the source.
1355            let config = SyslogConfig::from_mode(Mode::Tcp {
1356                address: in_addr.into(),
1357                permit_origin: None,
1358                keepalive: None,
1359                tls: None,
1360                receive_buffer_bytes: None,
1361                connection_limit: None,
1362            });
1363
1364            let key = ComponentKey::from("in");
1365            let (tx, rx) = SourceSender::new_test();
1366            let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1367            let shutdown_complete = shutdown.shutdown_tripwire();
1368
1369            let source = config
1370                .build(context)
1371                .await
1372                .expect("source should not fail to build");
1373            tokio::spawn(source);
1374
1375            // Wait for source to become ready to accept traffic.
1376            wait_for_tcp(in_addr).await;
1377
1378            let output_events = CountReceiver::receive_events(rx);
1379
1380            // Now craft and send syslog messages to the source, and collect them on the other side.
1381            let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1382                .map(|i| {
1383                    let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1384                    msg.message.push('\n');
1385                    msg.message.push_str(&random_string(30));
1386                    msg
1387                })
1388                .collect();
1389
1390            let codec = BytesCodec::new();
1391            let input_lines: Vec<Bytes> = input_messages
1392                .iter()
1393                .map(|msg| {
1394                    let s = msg.to_string();
1395                    format!("{} {}", s.len(), s).into()
1396                })
1397                .collect();
1398
1399            send_encodable(in_addr, codec, input_lines).await.unwrap();
1400
1401            // Wait a short period of time to ensure the messages get sent.
1402            sleep(Duration::from_secs(2)).await;
1403
1404            // Shutdown the source, and make sure we've got all the messages we sent in.
1405            shutdown
1406                .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1407                .await;
1408            shutdown_complete.await;
1409
1410            let output_events = output_events.await;
1411            assert_eq!(output_events.len(), num_messages);
1412
1413            let output_messages: Vec<SyslogMessageRfc5424> = output_events
1414                .into_iter()
1415                .map(|mut e| {
1416                    e.as_mut_log().remove("hostname"); // Vector adds this field which will cause a parse error.
1417                    e.as_mut_log().remove("source_ip"); // Vector adds this field which will cause a parse error.
1418                    e.into()
1419                })
1420                .collect();
1421            assert_eq!(output_messages, input_messages);
1422        })
1423        .await;
1424    }
1425
1426    #[derive(Deserialize, PartialEq, Clone, Debug)]
1427    struct SyslogMessageRfc5424 {
1428        msgid: String,
1429        severity: Severity,
1430        facility: Facility,
1431        version: u8,
1432        timestamp: String,
1433        host: String,
1434        source_type: String,
1435        appname: String,
1436        procid: usize,
1437        message: String,
1438        #[serde(flatten)]
1439        structured_data: StructuredData,
1440    }
1441
1442    impl SyslogMessageRfc5424 {
1443        fn random(
1444            id: usize,
1445            msg_len: usize,
1446            field_len: usize,
1447            max_map_size: usize,
1448            max_children: usize,
1449        ) -> Self {
1450            let msg = random_string(msg_len);
1451            let structured_data = random_structured_data(max_map_size, max_children, field_len);
1452
1453            let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1454            //"secfrac" can contain up to 6 digits, but TCP sinks uses `AutoSi`
1455
1456            Self {
1457                msgid: format!("test{id}"),
1458                severity: Severity::LOG_INFO,
1459                facility: Facility::LOG_USER,
1460                version: 1,
1461                timestamp,
1462                host: "hogwarts".to_owned(),
1463                source_type: "syslog".to_owned(),
1464                appname: "harry".to_owned(),
1465                procid: rng().random_range(0..32768),
1466                structured_data,
1467                message: msg,
1468            }
1469        }
1470    }
1471
1472    impl fmt::Display for SyslogMessageRfc5424 {
1473        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1474            write!(
1475                f,
1476                "<{}>{} {} {} {} {} {} {} {}",
1477                encode_priority(self.severity, self.facility),
1478                self.version,
1479                self.timestamp,
1480                self.host,
1481                self.appname,
1482                self.procid,
1483                self.msgid,
1484                format_structured_data_rfc5424(&self.structured_data),
1485                self.message
1486            )
1487        }
1488    }
1489
1490    impl From<Event> for SyslogMessageRfc5424 {
1491        fn from(e: Event) -> Self {
1492            let (value, _) = e.into_log().into_parts();
1493            let mut fields = value.into_object().unwrap();
1494
1495            Self {
1496                msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1497                severity: fields
1498                    .remove("severity")
1499                    .map(value_to_string)
1500                    .and_then(|s| Severity::from_str(s.as_str()))
1501                    .unwrap(),
1502                facility: fields
1503                    .remove("facility")
1504                    .map(value_to_string)
1505                    .and_then(|s| Facility::from_str(s.as_str()))
1506                    .unwrap(),
1507                version: fields
1508                    .remove("version")
1509                    .map(value_to_string)
1510                    .map(|s| u8::from_str(s.as_str()).unwrap())
1511                    .unwrap(),
1512                timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1513                host: fields.remove("host").map(value_to_string).unwrap(),
1514                source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1515                appname: fields.remove("appname").map(value_to_string).unwrap(),
1516                procid: fields
1517                    .remove("procid")
1518                    .map(value_to_string)
1519                    .map(|s| usize::from_str(s.as_str()).unwrap())
1520                    .unwrap(),
1521                message: fields.remove("message").map(value_to_string).unwrap(),
1522                structured_data: structured_data_from_fields(fields),
1523            }
1524        }
1525    }
1526
1527    fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1528        let mut structured_data = StructuredData::default();
1529
1530        for (key, value) in fields.into_iter() {
1531            let subfields = value
1532                .into_object()
1533                .unwrap()
1534                .into_iter()
1535                .map(|(k, v)| (k.into(), value_to_string(v)))
1536                .collect();
1537
1538            structured_data.insert(key.into(), subfields);
1539        }
1540
1541        structured_data
1542    }
1543
1544    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1545    #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1546    pub enum Severity {
1547        #[serde(rename(deserialize = "emergency"))]
1548        LOG_EMERG,
1549        #[serde(rename(deserialize = "alert"))]
1550        LOG_ALERT,
1551        #[serde(rename(deserialize = "critical"))]
1552        LOG_CRIT,
1553        #[serde(rename(deserialize = "error"))]
1554        LOG_ERR,
1555        #[serde(rename(deserialize = "warn"))]
1556        LOG_WARNING,
1557        #[serde(rename(deserialize = "notice"))]
1558        LOG_NOTICE,
1559        #[serde(rename(deserialize = "info"))]
1560        LOG_INFO,
1561        #[serde(rename(deserialize = "debug"))]
1562        LOG_DEBUG,
1563    }
1564
1565    impl Severity {
1566        fn from_str(s: &str) -> Option<Self> {
1567            match s {
1568                "emergency" => Some(Self::LOG_EMERG),
1569                "alert" => Some(Self::LOG_ALERT),
1570                "critical" => Some(Self::LOG_CRIT),
1571                "error" => Some(Self::LOG_ERR),
1572                "warn" => Some(Self::LOG_WARNING),
1573                "notice" => Some(Self::LOG_NOTICE),
1574                "info" => Some(Self::LOG_INFO),
1575                "debug" => Some(Self::LOG_DEBUG),
1576
1577                x => {
1578                    #[allow(clippy::print_stdout)]
1579                    {
1580                        println!("converting severity str, got {x}");
1581                    }
1582                    None
1583                }
1584            }
1585        }
1586    }
1587
1588    #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1589    #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1590    pub enum Facility {
1591        #[serde(rename(deserialize = "kernel"))]
1592        LOG_KERN = 0 << 3,
1593        #[serde(rename(deserialize = "user"))]
1594        LOG_USER = 1 << 3,
1595        #[serde(rename(deserialize = "mail"))]
1596        LOG_MAIL = 2 << 3,
1597        #[serde(rename(deserialize = "daemon"))]
1598        LOG_DAEMON = 3 << 3,
1599        #[serde(rename(deserialize = "auth"))]
1600        LOG_AUTH = 4 << 3,
1601        #[serde(rename(deserialize = "syslog"))]
1602        LOG_SYSLOG = 5 << 3,
1603    }
1604
1605    impl Facility {
1606        fn from_str(s: &str) -> Option<Self> {
1607            match s {
1608                "kernel" => Some(Self::LOG_KERN),
1609                "user" => Some(Self::LOG_USER),
1610                "mail" => Some(Self::LOG_MAIL),
1611                "daemon" => Some(Self::LOG_DAEMON),
1612                "auth" => Some(Self::LOG_AUTH),
1613                "syslog" => Some(Self::LOG_SYSLOG),
1614                _ => None,
1615            }
1616        }
1617    }
1618
1619    type StructuredData = HashMap<String, HashMap<String, String>>;
1620
1621    fn random_structured_data(
1622        max_map_size: usize,
1623        max_children: usize,
1624        field_len: usize,
1625    ) -> StructuredData {
1626        let amount = rng().random_range(0..max_children);
1627
1628        random_maps(max_map_size, field_len)
1629            .filter(|m| !m.is_empty()) //syslog_rfc5424 ignores empty maps, tested separately
1630            .take(amount)
1631            .enumerate()
1632            .map(|(i, map)| (format!("id{i}"), map))
1633            .collect()
1634    }
1635
1636    fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1637        if data.is_empty() {
1638            "-".to_string()
1639        } else {
1640            let mut res = String::new();
1641            for (id, params) in data {
1642                res = res + "[" + id;
1643                for (name, value) in params {
1644                    res = res + " " + name + "=\"" + value + "\"";
1645                }
1646                res += "]";
1647            }
1648
1649            res
1650        }
1651    }
1652
1653    const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1654        facility as u8 | severity as u8
1655    }
1656
1657    fn value_to_string(v: Value) -> String {
1658        if v.is_bytes() {
1659            let buf = v.as_bytes().unwrap();
1660            String::from_utf8_lossy(buf).to_string()
1661        } else if v.is_timestamp() {
1662            let ts = v.as_timestamp().unwrap();
1663            ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1664        } else {
1665            v.to_string()
1666        }
1667    }
1668}