1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use chrono::Utc;
7use futures::StreamExt;
8use listenfd::ListenFd;
9use smallvec::SmallVec;
10use tokio_util::udp::UdpFramed;
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{
14 BytesDecoder, OctetCountingDecoder, SyslogDeserializerConfig,
15 decoding::{Deserializer, Framer},
16 },
17 config::{LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
20 ipallowlist::IpAllowlistConfig,
21 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, path},
22};
23use vrl::event_path;
24
25#[cfg(unix)]
26use crate::sources::util::build_unix_stream_source;
27use crate::{
28 SourceSender,
29 codecs::Decoder,
30 config::{
31 DataType, GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput, log_schema,
32 },
33 event::Event,
34 internal_events::{
35 SocketBindError, SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
36 },
37 net,
38 shutdown::ShutdownSignal,
39 sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
40 tcp::TcpKeepaliveConfig,
41 tls::{MaybeTlsSettings, TlsSourceConfig},
42};
43
44#[configurable_component(source("syslog", "Collect logs sent via Syslog."))]
46#[derive(Clone, Debug)]
47pub struct SyslogConfig {
48 #[serde(flatten)]
49 mode: Mode,
50
51 #[serde(default = "crate::serde::default_max_length")]
55 #[configurable(metadata(docs::type_unit = "bytes"))]
56 max_length: usize,
57
58 host_key: Option<OptionalValuePath>,
67
68 #[configurable(metadata(docs::hidden))]
70 #[serde(default)]
71 pub log_namespace: Option<bool>,
72}
73
74#[configurable_component]
76#[derive(Clone, Debug)]
77#[serde(tag = "mode", rename_all = "snake_case")]
78#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
79#[allow(clippy::large_enum_variant)]
80pub enum Mode {
81 Tcp {
83 #[configurable(derived)]
84 address: SocketListenAddr,
85
86 #[configurable(derived)]
87 keepalive: Option<TcpKeepaliveConfig>,
88
89 #[configurable(derived)]
90 permit_origin: Option<IpAllowlistConfig>,
91
92 #[configurable(derived)]
93 tls: Option<TlsSourceConfig>,
94
95 #[configurable(metadata(docs::type_unit = "bytes"))]
99 receive_buffer_bytes: Option<usize>,
100
101 connection_limit: Option<u32>,
103 },
104
105 Udp {
107 #[configurable(derived)]
108 address: SocketListenAddr,
109
110 #[configurable(metadata(docs::type_unit = "bytes"))]
114 receive_buffer_bytes: Option<usize>,
115 },
116
117 #[cfg(unix)]
121 Unix {
122 #[configurable(metadata(docs::examples = "/path/to/socket"))]
126 path: PathBuf,
127
128 socket_file_mode: Option<u32>,
133 },
134}
135
136impl SyslogConfig {
137 #[cfg(test)]
138 pub fn from_mode(mode: Mode) -> Self {
139 Self {
140 mode,
141 host_key: None,
142 max_length: crate::serde::default_max_length(),
143 log_namespace: None,
144 }
145 }
146}
147
148impl Default for SyslogConfig {
149 fn default() -> Self {
150 Self {
151 mode: Mode::Tcp {
152 address: SocketListenAddr::SocketAddr("0.0.0.0:514".parse().unwrap()),
153 keepalive: None,
154 permit_origin: None,
155 tls: None,
156 receive_buffer_bytes: None,
157 connection_limit: None,
158 },
159 host_key: None,
160 max_length: crate::serde::default_max_length(),
161 log_namespace: None,
162 }
163 }
164}
165
166impl GenerateConfig for SyslogConfig {
167 fn generate_config() -> toml::Value {
168 toml::Value::try_from(SyslogConfig::default()).unwrap()
169 }
170}
171
172#[async_trait::async_trait]
173#[typetag::serde(name = "syslog")]
174impl SourceConfig for SyslogConfig {
175 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
176 let log_namespace = cx.log_namespace(self.log_namespace);
177 let host_key = self
178 .host_key
179 .clone()
180 .and_then(|k| k.path)
181 .or(log_schema().host_key().cloned());
182
183 match self.mode.clone() {
184 Mode::Tcp {
185 address,
186 keepalive,
187 permit_origin,
188 tls,
189 receive_buffer_bytes,
190 connection_limit,
191 } => {
192 let source = SyslogTcpSource {
193 max_length: self.max_length,
194 host_key,
195 log_namespace,
196 };
197 let shutdown_secs = Duration::from_secs(30);
198 let tls_config = tls.as_ref().map(|tls| tls.tls_config.clone());
199 let tls_client_metadata_key = tls
200 .as_ref()
201 .and_then(|tls| tls.client_metadata_key.clone())
202 .and_then(|k| k.path);
203 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
204 source.run(
205 address,
206 keepalive,
207 shutdown_secs,
208 tls,
209 tls_client_metadata_key,
210 receive_buffer_bytes,
211 None,
212 cx,
213 false.into(),
214 connection_limit,
215 permit_origin.map(Into::into),
216 SyslogConfig::NAME,
217 log_namespace,
218 )
219 }
220 Mode::Udp {
221 address,
222 receive_buffer_bytes,
223 } => Ok(udp(
224 address,
225 self.max_length,
226 host_key,
227 receive_buffer_bytes,
228 cx.shutdown,
229 log_namespace,
230 cx.out,
231 )),
232 #[cfg(unix)]
233 Mode::Unix {
234 path,
235 socket_file_mode,
236 } => {
237 let decoder = Decoder::new(
238 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(
239 self.max_length,
240 )),
241 Deserializer::Syslog(
242 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
243 ),
244 );
245
246 build_unix_stream_source(
247 path,
248 socket_file_mode,
249 decoder,
250 move |events, host| handle_events(events, &host_key, host, log_namespace),
251 cx.shutdown,
252 cx.out,
253 )
254 }
255 }
256 }
257
258 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
259 let log_namespace = global_log_namespace.merge(self.log_namespace);
260 let schema_definition = SyslogDeserializerConfig::from_source(SyslogConfig::NAME)
261 .schema_definition(log_namespace)
262 .with_standard_vector_source_metadata();
263
264 vec![SourceOutput::new_maybe_logs(
265 DataType::Log,
266 schema_definition,
267 )]
268 }
269
270 fn resources(&self) -> Vec<Resource> {
271 match self.mode.clone() {
272 Mode::Tcp { address, .. } => vec![address.as_tcp_resource()],
273 Mode::Udp { address, .. } => vec![address.as_udp_resource()],
274 #[cfg(unix)]
275 Mode::Unix { .. } => vec![],
276 }
277 }
278
279 fn can_acknowledge(&self) -> bool {
280 false
281 }
282}
283
284#[derive(Debug, Clone)]
285struct SyslogTcpSource {
286 max_length: usize,
287 host_key: Option<OwnedValuePath>,
288 log_namespace: LogNamespace,
289}
290
291impl TcpSource for SyslogTcpSource {
292 type Error = vector_lib::codecs::decoding::Error;
293 type Item = SmallVec<[Event; 1]>;
294 type Decoder = Decoder;
295 type Acker = TcpNullAcker;
296
297 fn decoder(&self) -> Self::Decoder {
298 Decoder::new(
299 Framer::OctetCounting(OctetCountingDecoder::new_with_max_length(self.max_length)),
300 Deserializer::Syslog(SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build()),
301 )
302 }
303
304 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
305 handle_events(
306 events,
307 &self.host_key,
308 Some(host.ip().to_string().into()),
309 self.log_namespace,
310 );
311 }
312
313 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
314 TcpNullAcker
315 }
316}
317
318pub fn udp(
319 addr: SocketListenAddr,
320 _max_length: usize,
321 host_key: Option<OwnedValuePath>,
322 receive_buffer_bytes: Option<usize>,
323 shutdown: ShutdownSignal,
324 log_namespace: LogNamespace,
325 mut out: SourceSender,
326) -> super::Source {
327 Box::pin(async move {
328 let listenfd = ListenFd::from_env();
329 let socket = try_bind_udp_socket(addr, listenfd).await.map_err(|error| {
330 emit!(SocketBindError {
331 mode: SocketMode::Udp,
332 error: &error,
333 })
334 })?;
335
336 if let Some(receive_buffer_bytes) = receive_buffer_bytes
337 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
338 {
339 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
340 }
341
342 info!(
343 message = "Listening.",
344 addr = %addr,
345 r#type = "udp"
346 );
347
348 let bytes_received = register!(BytesReceived::from(Protocol::UDP));
349
350 let mut stream = UdpFramed::new(
351 socket,
352 Decoder::new(
353 Framer::Bytes(BytesDecoder::new()),
354 Deserializer::Syslog(
355 SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build(),
356 ),
357 ),
358 )
359 .take_until(shutdown)
360 .filter_map(|frame| {
361 let host_key = host_key.clone();
362 let bytes_received = bytes_received.clone();
363 async move {
364 match frame {
365 Ok(((mut events, byte_size), received_from)) => {
366 let count = events.len();
367 bytes_received.emit(ByteSize(byte_size));
368 emit!(SocketEventsReceived {
369 mode: SocketMode::Udp,
370 byte_size: events.estimated_json_encoded_size_of(),
371 count,
372 });
373 let received_from = received_from.ip().to_string().into();
374 handle_events(&mut events, &host_key, Some(received_from), log_namespace);
375 Some(events.remove(0))
376 }
377 Err(error) => {
378 emit!(SocketReceiveError {
379 mode: SocketMode::Udp,
380 error: &error,
381 });
382 None
383 }
384 }
385 }
386 })
387 .boxed();
388
389 match out.send_event_stream(&mut stream).await {
390 Ok(()) => {
391 debug!("Finished sending.");
392 Ok(())
393 }
394 Err(_) => {
395 let (count, _) = stream.size_hint();
396 emit!(StreamClosedError { count });
397 Err(())
398 }
399 }
400 })
401}
402
403fn handle_events(
404 events: &mut [Event],
405 host_key: &Option<OwnedValuePath>,
406 default_host: Option<Bytes>,
407 log_namespace: LogNamespace,
408) {
409 for event in events {
410 enrich_syslog_event(event, host_key, default_host.clone(), log_namespace);
411 }
412}
413
414fn enrich_syslog_event(
415 event: &mut Event,
416 host_key: &Option<OwnedValuePath>,
417 default_host: Option<Bytes>,
418 log_namespace: LogNamespace,
419) {
420 let log = event.as_mut_log();
421
422 if let Some(default_host) = &default_host {
423 log_namespace.insert_source_metadata(
424 SyslogConfig::NAME,
425 log,
426 Some(LegacyKey::Overwrite(path!("source_ip"))),
427 path!("source_ip"),
428 default_host.clone(),
429 );
430 }
431
432 let parsed_hostname = log
433 .get(event_path!("hostname"))
434 .map(|hostname| hostname.coerce_to_bytes());
435
436 if let Some(parsed_host) = parsed_hostname.or(default_host) {
437 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
438
439 log_namespace.insert_source_metadata(
440 SyslogConfig::NAME,
441 log,
442 legacy_host_key,
443 path!("host"),
444 parsed_host,
445 );
446 }
447
448 log_namespace.insert_standard_vector_source_metadata(log, SyslogConfig::NAME, Utc::now());
449
450 if log_namespace == LogNamespace::Legacy {
451 let timestamp = log
452 .get(event_path!("timestamp"))
453 .and_then(|timestamp| timestamp.as_timestamp().cloned())
454 .unwrap_or_else(Utc::now);
455 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
456 }
457
458 trace!(
459 message = "Processing one event.",
460 event = ?event
461 );
462}
463
464#[cfg(test)]
465mod test {
466 use std::{collections::HashMap, fmt, str::FromStr};
467
468 use chrono::prelude::*;
469 use indoc::indoc;
470 use rand::{Rng, rng};
471 use serde::Deserialize;
472 use tokio::time::{Duration, Instant, sleep};
473 use tokio_util::codec::BytesCodec;
474 use vector_lib::{
475 assert_event_data_eq,
476 codecs::decoding::format::Deserializer,
477 config::ComponentKey,
478 lookup::{OwnedTargetPath, PathPrefix, event_path, owned_value_path},
479 schema::Definition,
480 };
481 use vrl::value::{Kind, ObjectMap, Value, kind::Collection};
482
483 use super::*;
484 use crate::{
485 config::log_schema,
486 event::{Event, LogEvent},
487 test_util::{
488 CountReceiver,
489 addr::next_addr,
490 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
491 random_maps, random_string, send_encodable, send_lines, wait_for_tcp,
492 },
493 };
494
495 fn event_from_bytes(
496 host_key: &str,
497 default_host: Option<Bytes>,
498 bytes: Bytes,
499 log_namespace: LogNamespace,
500 ) -> Option<Event> {
501 let parser = SyslogDeserializerConfig::from_source(SyslogConfig::NAME).build();
502 let mut events = parser.parse(bytes, LogNamespace::Legacy).ok()?;
503 handle_events(
504 &mut events,
505 &Some(owned_value_path!(host_key)),
506 default_host,
507 log_namespace,
508 );
509 Some(events.remove(0))
510 }
511
512 #[test]
513 fn generate_config() {
514 crate::test_util::test_generate_config::<SyslogConfig>();
515 }
516
517 #[test]
518 fn output_schema_definition_vector_namespace() {
519 let config = SyslogConfig {
520 log_namespace: Some(true),
521 ..Default::default()
522 };
523
524 let definitions = config
525 .outputs(LogNamespace::Vector)
526 .remove(0)
527 .schema_definition(true);
528
529 let expected_definition =
530 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
531 .with_meaning(OwnedTargetPath::event_root(), "message")
532 .with_metadata_field(
533 &owned_value_path!("vector", "source_type"),
534 Kind::bytes(),
535 None,
536 )
537 .with_metadata_field(
538 &owned_value_path!("vector", "ingest_timestamp"),
539 Kind::timestamp(),
540 None,
541 )
542 .with_metadata_field(
543 &owned_value_path!("syslog", "timestamp"),
544 Kind::timestamp(),
545 Some("timestamp"),
546 )
547 .with_metadata_field(
548 &owned_value_path!("syslog", "hostname"),
549 Kind::bytes().or_undefined(),
550 Some("host"),
551 )
552 .with_metadata_field(
553 &owned_value_path!("syslog", "source_ip"),
554 Kind::bytes().or_undefined(),
555 None,
556 )
557 .with_metadata_field(
558 &owned_value_path!("syslog", "severity"),
559 Kind::bytes().or_undefined(),
560 Some("severity"),
561 )
562 .with_metadata_field(
563 &owned_value_path!("syslog", "facility"),
564 Kind::bytes().or_undefined(),
565 None,
566 )
567 .with_metadata_field(
568 &owned_value_path!("syslog", "version"),
569 Kind::integer().or_undefined(),
570 None,
571 )
572 .with_metadata_field(
573 &owned_value_path!("syslog", "appname"),
574 Kind::bytes().or_undefined(),
575 Some("service"),
576 )
577 .with_metadata_field(
578 &owned_value_path!("syslog", "msgid"),
579 Kind::bytes().or_undefined(),
580 None,
581 )
582 .with_metadata_field(
583 &owned_value_path!("syslog", "procid"),
584 Kind::integer().or_bytes().or_undefined(),
585 None,
586 )
587 .with_metadata_field(
588 &owned_value_path!("syslog", "structured_data"),
589 Kind::object(Collection::from_unknown(Kind::object(
590 Collection::from_unknown(Kind::bytes()),
591 ))),
592 None,
593 )
594 .with_metadata_field(
595 &owned_value_path!("syslog", "tls_client_metadata"),
596 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
597 None,
598 );
599
600 assert_eq!(definitions, Some(expected_definition));
601 }
602
603 #[test]
604 fn output_schema_definition_legacy_namespace() {
605 let config = SyslogConfig::default();
606
607 let definitions = config
608 .outputs(LogNamespace::Legacy)
609 .remove(0)
610 .schema_definition(true);
611
612 let expected_definition = Definition::new_with_default_metadata(
613 Kind::object(Collection::empty()),
614 [LogNamespace::Legacy],
615 )
616 .with_event_field(
617 &owned_value_path!("message"),
618 Kind::bytes(),
619 Some("message"),
620 )
621 .with_event_field(
622 &owned_value_path!("timestamp"),
623 Kind::timestamp(),
624 Some("timestamp"),
625 )
626 .with_event_field(
627 &owned_value_path!("hostname"),
628 Kind::bytes().or_undefined(),
629 Some("host"),
630 )
631 .with_event_field(
632 &owned_value_path!("source_ip"),
633 Kind::bytes().or_undefined(),
634 None,
635 )
636 .with_event_field(
637 &owned_value_path!("severity"),
638 Kind::bytes().or_undefined(),
639 Some("severity"),
640 )
641 .with_event_field(
642 &owned_value_path!("facility"),
643 Kind::bytes().or_undefined(),
644 None,
645 )
646 .with_event_field(
647 &owned_value_path!("version"),
648 Kind::integer().or_undefined(),
649 None,
650 )
651 .with_event_field(
652 &owned_value_path!("appname"),
653 Kind::bytes().or_undefined(),
654 Some("service"),
655 )
656 .with_event_field(
657 &owned_value_path!("msgid"),
658 Kind::bytes().or_undefined(),
659 None,
660 )
661 .with_event_field(
662 &owned_value_path!("procid"),
663 Kind::integer().or_bytes().or_undefined(),
664 None,
665 )
666 .unknown_fields(Kind::object(Collection::from_unknown(Kind::bytes())))
667 .with_standard_vector_source_metadata();
668
669 assert_eq!(definitions, Some(expected_definition));
670 }
671
672 #[test]
673 fn config_tcp() {
674 let config: SyslogConfig = serde_yaml::from_str(indoc! {
675 r#"
676 mode: tcp
677 address: "127.0.0.1:1235"
678 "#,
679 })
680 .unwrap();
681 assert!(matches!(config.mode, Mode::Tcp { .. }));
682 }
683
684 #[test]
685 fn config_tcp_with_receive_buffer_size() {
686 let config: SyslogConfig = serde_yaml::from_str(indoc! {
687 r#"
688 mode: tcp
689 address: "127.0.0.1:1235"
690 receive_buffer_bytes: 256
691 "#,
692 })
693 .unwrap();
694
695 let receive_buffer_bytes = match config.mode {
696 Mode::Tcp {
697 receive_buffer_bytes,
698 ..
699 } => receive_buffer_bytes,
700 _ => panic!("expected Mode::Tcp"),
701 };
702
703 assert_eq!(receive_buffer_bytes, Some(256));
704 }
705
706 #[test]
707 fn config_tcp_keepalive_empty() {
708 let config: SyslogConfig = serde_yaml::from_str(indoc! {
709 r#"
710 mode: tcp
711 address: "127.0.0.1:1235"
712 "#,
713 })
714 .unwrap();
715
716 let keepalive = match config.mode {
717 Mode::Tcp { keepalive, .. } => keepalive,
718 _ => panic!("expected Mode::Tcp"),
719 };
720
721 assert_eq!(keepalive, None);
722 }
723
724 #[test]
725 fn config_tcp_keepalive_full() {
726 let config: SyslogConfig = serde_yaml::from_str(indoc! {
727 r#"
728 mode: tcp
729 address: "127.0.0.1:1235"
730 keepalive:
731 time_secs: 7200
732 "#,
733 })
734 .unwrap();
735
736 let keepalive = match config.mode {
737 Mode::Tcp { keepalive, .. } => keepalive,
738 _ => panic!("expected Mode::Tcp"),
739 };
740
741 let keepalive = keepalive.expect("keepalive config not set");
742
743 assert_eq!(keepalive.time_secs, Some(7200));
744 }
745
746 #[test]
747 fn config_udp() {
748 let config: SyslogConfig = serde_yaml::from_str(indoc! {
749 r#"
750 mode: udp
751 address: "127.0.0.1:1235"
752 max_length: 32187
753 "#,
754 })
755 .unwrap();
756 assert!(matches!(config.mode, Mode::Udp { .. }));
757 }
758
759 #[test]
760 fn config_udp_with_receive_buffer_size() {
761 let config: SyslogConfig = serde_yaml::from_str(indoc! {
762 r#"
763 mode: udp
764 address: "127.0.0.1:1235"
765 max_length: 32187
766 receive_buffer_bytes: 256
767 "#,
768 })
769 .unwrap();
770
771 let receive_buffer_bytes = match config.mode {
772 Mode::Udp {
773 receive_buffer_bytes,
774 ..
775 } => receive_buffer_bytes,
776 _ => panic!("expected Mode::Udp"),
777 };
778
779 assert_eq!(receive_buffer_bytes, Some(256));
780 }
781
782 #[cfg(unix)]
783 #[test]
784 fn config_unix() {
785 let config: SyslogConfig = serde_yaml::from_str(indoc! {
786 r#"
787 mode: unix
788 path: "127.0.0.1:1235"
789 "#,
790 })
791 .unwrap();
792 assert!(matches!(config.mode, Mode::Unix { .. }));
793 }
794
795 #[cfg(unix)]
796 #[test]
797 fn config_unix_permissions() {
798 let config: SyslogConfig = serde_yaml::from_str(indoc! {
799 r#"
800 mode: unix
801 path: "127.0.0.1:1235"
802 socket_file_mode: 511
803 "#,
804 })
805 .unwrap();
806 let socket_file_mode = match config.mode {
807 Mode::Unix {
808 path: _,
809 socket_file_mode,
810 } => socket_file_mode,
811 _ => panic!("expected Mode::Unix"),
812 };
813
814 assert_eq!(socket_file_mode, Some(0o777));
815 }
816
817 #[test]
818 fn syslog_ng_network_syslog_protocol() {
819 let msg = "i am foobar";
821 let raw = format!(
822 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {}{} {}"#,
823 r#"[meta sequenceId="1" sysUpTime="37" language="EN"]"#,
824 r#"[origin ip="192.168.0.1" software="test"]"#,
825 msg
826 );
827
828 let mut expected = Event::Log(LogEvent::from(msg));
829
830 {
831 let expected = expected.as_mut_log();
832 expected.insert(
833 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
834 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
835 .single()
836 .expect("invalid timestamp"),
837 );
838 expected.insert(
839 log_schema().source_type_key_target_path().unwrap(),
840 "syslog",
841 );
842 expected.insert("host", "74794bfb6795");
843 expected.insert("hostname", "74794bfb6795");
844
845 expected.insert("meta.sequenceId", "1");
846 expected.insert("meta.sysUpTime", "37");
847 expected.insert("meta.language", "EN");
848 expected.insert("origin.software", "test");
849 expected.insert("origin.ip", "192.168.0.1");
850
851 expected.insert("severity", "notice");
852 expected.insert("facility", "user");
853 expected.insert("version", 1);
854 expected.insert("appname", "root");
855 expected.insert("procid", 8449);
856 expected.insert("source_ip", "192.168.0.254");
857 }
858
859 assert_event_data_eq!(
860 event_from_bytes(
861 "host",
862 Some(Bytes::from("192.168.0.254")),
863 raw.into(),
864 LogNamespace::Legacy
865 )
866 .unwrap(),
867 expected
868 );
869 }
870
871 #[test]
872 fn handles_incorrect_sd_element() {
873 let msg = "qwerty";
874 let raw = format!(
875 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
876 r"[incorrect x]", msg
877 );
878
879 let mut expected = Event::Log(LogEvent::from(msg));
880 {
881 let expected = expected.as_mut_log();
882 expected.insert(
883 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
884 Utc.with_ymd_and_hms(2019, 2, 13, 19, 48, 34)
885 .single()
886 .expect("invalid timestamp"),
887 );
888 expected.insert(
889 log_schema().host_key().unwrap().to_string().as_str(),
890 "74794bfb6795",
891 );
892 expected.insert("hostname", "74794bfb6795");
893 expected.insert(
894 log_schema().source_type_key_target_path().unwrap(),
895 "syslog",
896 );
897 expected.insert("severity", "notice");
898 expected.insert("facility", "user");
899 expected.insert("version", 1);
900 expected.insert("appname", "root");
901 expected.insert("procid", 8449);
902 expected.insert("source_ip", "192.168.0.254");
903 }
904
905 let event = event_from_bytes(
906 "host",
907 Some(Bytes::from("192.168.0.254")),
908 raw.into(),
909 LogNamespace::Legacy,
910 )
911 .unwrap();
912 assert_event_data_eq!(event, expected);
913
914 let raw = format!(
915 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#,
916 r"[incorrect x=]", msg
917 );
918
919 let event = event_from_bytes(
920 "host",
921 Some(Bytes::from("192.168.0.254")),
922 raw.into(),
923 LogNamespace::Legacy,
924 )
925 .unwrap();
926 assert_event_data_eq!(event, expected);
927 }
928
929 #[test]
930 fn handles_empty_sd_element() {
931 fn there_is_map_called_empty(event: Event) -> bool {
932 event
933 .as_log()
934 .get("empty")
935 .expect("empty exists")
936 .is_object()
937 }
938
939 let msg = format!(
940 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
941 r"[empty]"
942 );
943
944 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
945 assert!(there_is_map_called_empty(event));
946
947 let msg = format!(
948 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
949 r#"[non_empty x="1"][empty]"#
950 );
951
952 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
953 assert!(there_is_map_called_empty(event));
954
955 let msg = format!(
956 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
957 r#"[empty][non_empty x="1"]"#
958 );
959
960 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
961 assert!(there_is_map_called_empty(event));
962
963 let msg = format!(
964 r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} qwerty"#,
965 r#"[empty not_really="testing the test"]"#
966 );
967
968 let event = event_from_bytes("host", None, msg.into(), LogNamespace::Legacy).unwrap();
969 assert!(there_is_map_called_empty(event));
970 }
971
972 #[test]
973 fn handles_weird_whitespace() {
974 let raw = r#"
976 <13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar
977 "#;
978 let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#;
979
980 assert_event_data_eq!(
981 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap(),
982 event_from_bytes(
983 "host",
984 None,
985 cleaned.to_owned().into(),
986 LogNamespace::Legacy
987 )
988 .unwrap()
989 );
990 }
991
992 #[test]
993 fn handles_dots_in_sdata() {
994 let raw =
995 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin foo.bar="baz"] hello"#;
996 let event =
997 event_from_bytes("host", None, raw.to_owned().into(), LogNamespace::Legacy).unwrap();
998 assert_eq!(
999 event.as_log().get(r#"origin."foo.bar""#),
1000 Some(&Value::from("baz"))
1001 );
1002 }
1003
1004 #[test]
1005 fn syslog_ng_default_network() {
1006 let msg = "i am foobar";
1007 let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {msg}"#);
1008 let event = event_from_bytes(
1009 "host",
1010 Some(Bytes::from("192.168.0.254")),
1011 raw.into(),
1012 LogNamespace::Legacy,
1013 )
1014 .unwrap();
1015
1016 let mut expected = Event::Log(LogEvent::from(msg));
1017 {
1018 let value = event.as_log().get("timestamp").unwrap();
1019 let year = value.as_timestamp().unwrap().naive_local().year();
1020
1021 let expected = expected.as_mut_log();
1022 let expected_date: DateTime<Utc> = Local
1023 .with_ymd_and_hms(year, 2, 13, 20, 7, 26)
1024 .single()
1025 .expect("invalid timestamp")
1026 .into();
1027
1028 expected.insert(
1029 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1030 expected_date,
1031 );
1032 expected.insert(
1033 log_schema().host_key().unwrap().to_string().as_str(),
1034 "74794bfb6795",
1035 );
1036 expected.insert(
1037 log_schema().source_type_key_target_path().unwrap(),
1038 "syslog",
1039 );
1040 expected.insert("hostname", "74794bfb6795");
1041 expected.insert("severity", "notice");
1042 expected.insert("facility", "user");
1043 expected.insert("appname", "root");
1044 expected.insert("procid", 8539);
1045 expected.insert("source_ip", "192.168.0.254");
1046 }
1047
1048 assert_event_data_eq!(event, expected);
1049 }
1050
1051 #[test]
1052 fn rsyslog_omfwd_tcp_default() {
1053 let msg = "start";
1054 let raw = format!(
1055 r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {msg}"#
1056 );
1057 let event = event_from_bytes(
1058 "host",
1059 Some(Bytes::from("192.168.0.254")),
1060 raw.into(),
1061 LogNamespace::Legacy,
1062 )
1063 .unwrap();
1064
1065 let mut expected = Event::Log(LogEvent::from(msg));
1066 {
1067 let value = event.as_log().get("timestamp").unwrap();
1068 let year = value.as_timestamp().unwrap().naive_local().year();
1069
1070 let expected = expected.as_mut_log();
1071 let expected_date: DateTime<Utc> = Local
1072 .with_ymd_and_hms(year, 2, 13, 21, 31, 56)
1073 .single()
1074 .expect("invalid timestamp")
1075 .into();
1076 expected.insert(
1077 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1078 expected_date,
1079 );
1080 expected.insert(
1081 log_schema().source_type_key_target_path().unwrap(),
1082 "syslog",
1083 );
1084 expected.insert("host", "74794bfb6795");
1085 expected.insert("hostname", "74794bfb6795");
1086 expected.insert("severity", "info");
1087 expected.insert("facility", "local7");
1088 expected.insert("appname", "liblogging-stdlog");
1089 expected.insert("origin.software", "rsyslogd");
1090 expected.insert("origin.swVersion", "8.24.0");
1091 expected.insert("source_ip", "192.168.0.254");
1092 expected.insert(event_path!("origin", "x-pid"), "8979");
1093 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1094 }
1095
1096 assert_event_data_eq!(event, expected);
1097 }
1098
1099 #[test]
1100 fn rsyslog_omfwd_tcp_forward_format() {
1101 let msg = "start";
1102 let raw = format!(
1103 r#"<190>2019-02-13T21:53:30.605850+00:00 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="9043" x-info="http://www.rsyslog.com"] {msg}"#
1104 );
1105
1106 let mut expected = Event::Log(LogEvent::from(msg));
1107 {
1108 let expected = expected.as_mut_log();
1109 expected.insert(
1110 (PathPrefix::Event, log_schema().timestamp_key().unwrap()),
1111 Utc.with_ymd_and_hms(2019, 2, 13, 21, 53, 30)
1112 .single()
1113 .and_then(|t| t.with_nanosecond(605_850 * 1000))
1114 .expect("invalid timestamp"),
1115 );
1116 expected.insert(
1117 log_schema().source_type_key_target_path().unwrap(),
1118 "syslog",
1119 );
1120 expected.insert("host", "74794bfb6795");
1121 expected.insert("hostname", "74794bfb6795");
1122 expected.insert("severity", "info");
1123 expected.insert("facility", "local7");
1124 expected.insert("appname", "liblogging-stdlog");
1125 expected.insert("origin.software", "rsyslogd");
1126 expected.insert("origin.swVersion", "8.24.0");
1127 expected.insert(event_path!("origin", "x-pid"), "9043");
1128 expected.insert(event_path!("origin", "x-info"), "http://www.rsyslog.com");
1129 }
1130
1131 assert_event_data_eq!(
1132 event_from_bytes("host", None, raw.into(), LogNamespace::Legacy).unwrap(),
1133 expected
1134 );
1135 }
1136
1137 #[tokio::test]
1138 async fn test_tcp_syslog() {
1139 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1140 let num_messages: usize = 10000;
1141 let (_guard, in_addr) = next_addr();
1142
1143 let config = SyslogConfig::from_mode(Mode::Tcp {
1145 address: in_addr.into(),
1146 permit_origin: None,
1147 keepalive: None,
1148 tls: None,
1149 receive_buffer_bytes: None,
1150 connection_limit: None,
1151 });
1152
1153 let key = ComponentKey::from("in");
1154 let (tx, rx) = SourceSender::new_test();
1155 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1156 let shutdown_complete = shutdown.shutdown_tripwire();
1157
1158 let source = config
1159 .build(context)
1160 .await
1161 .expect("source should not fail to build");
1162 tokio::spawn(source);
1163
1164 wait_for_tcp(in_addr).await;
1166
1167 let output_events = CountReceiver::receive_events(rx);
1168
1169 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1171 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1172 .collect();
1173
1174 let input_lines: Vec<String> =
1175 input_messages.iter().map(|msg| msg.to_string()).collect();
1176
1177 send_lines(in_addr, input_lines).await.unwrap();
1178
1179 sleep(Duration::from_secs(2)).await;
1181
1182 shutdown
1184 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1185 .await;
1186 shutdown_complete.await;
1187
1188 let output_events = output_events.await;
1189 assert_eq!(output_events.len(), num_messages);
1190
1191 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1192 .into_iter()
1193 .map(|mut e| {
1194 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1197 })
1198 .collect();
1199 assert_eq!(output_messages, input_messages);
1200 })
1201 .await;
1202 }
1203
1204 #[tokio::test]
1205 async fn test_udp_syslog() {
1206 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1207 let num_messages: usize = 1000;
1208 let (_guard, in_addr) = next_addr();
1209
1210 let config = SyslogConfig::from_mode(Mode::Udp {
1212 address: in_addr.into(),
1213 receive_buffer_bytes: Some(4 * 1024 * 1024),
1214 });
1215
1216 let key = ComponentKey::from("in");
1217 let (tx, rx) = SourceSender::new_test();
1218 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1219 let shutdown_complete = shutdown.shutdown_tripwire();
1220
1221 let source = config
1222 .build(context)
1223 .await
1224 .expect("source should not fail to build");
1225 tokio::spawn(source);
1226
1227 sleep(Duration::from_millis(150)).await;
1229
1230 let output_events = CountReceiver::receive_events(rx);
1231
1232 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1234 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1235 .collect();
1236
1237 let input_lines: Vec<String> =
1238 input_messages.iter().map(|msg| msg.to_string()).collect();
1239
1240 let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
1241 for line in input_lines {
1242 socket.send_to(line.as_bytes(), in_addr).await.unwrap();
1243 }
1244
1245 sleep(Duration::from_secs(2)).await;
1247
1248 shutdown
1250 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1251 .await;
1252 shutdown_complete.await;
1253
1254 let output_events = output_events.await;
1255 assert_eq!(output_events.len(), num_messages);
1256
1257 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1258 .into_iter()
1259 .map(|mut e| {
1260 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1263 })
1264 .collect();
1265 assert_eq!(output_messages, input_messages);
1266 })
1267 .await;
1268 }
1269
1270 #[cfg(unix)]
1271 #[tokio::test]
1272 async fn test_unix_stream_syslog() {
1273 use std::os::unix::net::UnixStream as StdUnixStream;
1274
1275 use futures_util::{SinkExt, stream};
1276 use tokio::{io::AsyncWriteExt, net::UnixStream};
1277 use tokio_util::codec::{FramedWrite, LinesCodec};
1278
1279 use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
1280
1281 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1282 let num_messages: usize = 1;
1283 let in_path = tempfile::tempdir().unwrap().keep().join("stream_test");
1284
1285 let config = SyslogConfig::from_mode(Mode::Unix {
1287 path: in_path.clone(),
1288 socket_file_mode: None,
1289 });
1290
1291 let key = ComponentKey::from("in");
1292 let (tx, rx) = SourceSender::new_test();
1293 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1294 let shutdown_complete = shutdown.shutdown_tripwire();
1295
1296 let source = config
1297 .build(context)
1298 .await
1299 .expect("source should not fail to build");
1300 tokio::spawn(source);
1301
1302 while StdUnixStream::connect(&in_path).is_err() {
1304 tokio::task::yield_now().await;
1305 }
1306
1307 let output_events = CountReceiver::receive_events(rx);
1308
1309 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1311 .map(|i| SyslogMessageRfc5424::random(i, 30, 4, 3, 3))
1312 .collect();
1313
1314 let stream = UnixStream::connect(&in_path).await.unwrap();
1315 let mut sink = FramedWrite::new(stream, LinesCodec::new());
1316
1317 let lines: Vec<String> = input_messages.iter().map(|msg| msg.to_string()).collect();
1318 let mut lines = stream::iter(lines).map(Ok);
1319 sink.send_all(&mut lines).await.unwrap();
1320
1321 let stream = sink.get_mut();
1322 stream.shutdown().await.unwrap();
1323
1324 sleep(Duration::from_secs(1)).await;
1326
1327 shutdown
1328 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1329 .await;
1330 shutdown_complete.await;
1331
1332 let output_events = output_events.await;
1333 assert_eq!(output_events.len(), num_messages);
1334
1335 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1336 .into_iter()
1337 .map(|mut e| {
1338 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1341 })
1342 .collect();
1343 assert_eq!(output_messages, input_messages);
1344 })
1345 .await;
1346 }
1347
1348 #[tokio::test]
1349 async fn test_octet_counting_syslog() {
1350 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
1351 let num_messages: usize = 10000;
1352 let (_guard, in_addr) = next_addr();
1353
1354 let config = SyslogConfig::from_mode(Mode::Tcp {
1356 address: in_addr.into(),
1357 permit_origin: None,
1358 keepalive: None,
1359 tls: None,
1360 receive_buffer_bytes: None,
1361 connection_limit: None,
1362 });
1363
1364 let key = ComponentKey::from("in");
1365 let (tx, rx) = SourceSender::new_test();
1366 let (context, shutdown) = SourceContext::new_shutdown(&key, tx);
1367 let shutdown_complete = shutdown.shutdown_tripwire();
1368
1369 let source = config
1370 .build(context)
1371 .await
1372 .expect("source should not fail to build");
1373 tokio::spawn(source);
1374
1375 wait_for_tcp(in_addr).await;
1377
1378 let output_events = CountReceiver::receive_events(rx);
1379
1380 let input_messages: Vec<SyslogMessageRfc5424> = (0..num_messages)
1382 .map(|i| {
1383 let mut msg = SyslogMessageRfc5424::random(i, 30, 4, 3, 3);
1384 msg.message.push('\n');
1385 msg.message.push_str(&random_string(30));
1386 msg
1387 })
1388 .collect();
1389
1390 let codec = BytesCodec::new();
1391 let input_lines: Vec<Bytes> = input_messages
1392 .iter()
1393 .map(|msg| {
1394 let s = msg.to_string();
1395 format!("{} {}", s.len(), s).into()
1396 })
1397 .collect();
1398
1399 send_encodable(in_addr, codec, input_lines).await.unwrap();
1400
1401 sleep(Duration::from_secs(2)).await;
1403
1404 shutdown
1406 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
1407 .await;
1408 shutdown_complete.await;
1409
1410 let output_events = output_events.await;
1411 assert_eq!(output_events.len(), num_messages);
1412
1413 let output_messages: Vec<SyslogMessageRfc5424> = output_events
1414 .into_iter()
1415 .map(|mut e| {
1416 e.as_mut_log().remove("hostname"); e.as_mut_log().remove("source_ip"); e.into()
1419 })
1420 .collect();
1421 assert_eq!(output_messages, input_messages);
1422 })
1423 .await;
1424 }
1425
1426 #[derive(Deserialize, PartialEq, Clone, Debug)]
1427 struct SyslogMessageRfc5424 {
1428 msgid: String,
1429 severity: Severity,
1430 facility: Facility,
1431 version: u8,
1432 timestamp: String,
1433 host: String,
1434 source_type: String,
1435 appname: String,
1436 procid: usize,
1437 message: String,
1438 #[serde(flatten)]
1439 structured_data: StructuredData,
1440 }
1441
1442 impl SyslogMessageRfc5424 {
1443 fn random(
1444 id: usize,
1445 msg_len: usize,
1446 field_len: usize,
1447 max_map_size: usize,
1448 max_children: usize,
1449 ) -> Self {
1450 let msg = random_string(msg_len);
1451 let structured_data = random_structured_data(max_map_size, max_children, field_len);
1452
1453 let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
1454 Self {
1457 msgid: format!("test{id}"),
1458 severity: Severity::LOG_INFO,
1459 facility: Facility::LOG_USER,
1460 version: 1,
1461 timestamp,
1462 host: "hogwarts".to_owned(),
1463 source_type: "syslog".to_owned(),
1464 appname: "harry".to_owned(),
1465 procid: rng().random_range(0..32768),
1466 structured_data,
1467 message: msg,
1468 }
1469 }
1470 }
1471
1472 impl fmt::Display for SyslogMessageRfc5424 {
1473 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1474 write!(
1475 f,
1476 "<{}>{} {} {} {} {} {} {} {}",
1477 encode_priority(self.severity, self.facility),
1478 self.version,
1479 self.timestamp,
1480 self.host,
1481 self.appname,
1482 self.procid,
1483 self.msgid,
1484 format_structured_data_rfc5424(&self.structured_data),
1485 self.message
1486 )
1487 }
1488 }
1489
1490 impl From<Event> for SyslogMessageRfc5424 {
1491 fn from(e: Event) -> Self {
1492 let (value, _) = e.into_log().into_parts();
1493 let mut fields = value.into_object().unwrap();
1494
1495 Self {
1496 msgid: fields.remove("msgid").map(value_to_string).unwrap(),
1497 severity: fields
1498 .remove("severity")
1499 .map(value_to_string)
1500 .and_then(|s| Severity::from_str(s.as_str()))
1501 .unwrap(),
1502 facility: fields
1503 .remove("facility")
1504 .map(value_to_string)
1505 .and_then(|s| Facility::from_str(s.as_str()))
1506 .unwrap(),
1507 version: fields
1508 .remove("version")
1509 .map(value_to_string)
1510 .map(|s| u8::from_str(s.as_str()).unwrap())
1511 .unwrap(),
1512 timestamp: fields.remove("timestamp").map(value_to_string).unwrap(),
1513 host: fields.remove("host").map(value_to_string).unwrap(),
1514 source_type: fields.remove("source_type").map(value_to_string).unwrap(),
1515 appname: fields.remove("appname").map(value_to_string).unwrap(),
1516 procid: fields
1517 .remove("procid")
1518 .map(value_to_string)
1519 .map(|s| usize::from_str(s.as_str()).unwrap())
1520 .unwrap(),
1521 message: fields.remove("message").map(value_to_string).unwrap(),
1522 structured_data: structured_data_from_fields(fields),
1523 }
1524 }
1525 }
1526
1527 fn structured_data_from_fields(fields: ObjectMap) -> StructuredData {
1528 let mut structured_data = StructuredData::default();
1529
1530 for (key, value) in fields.into_iter() {
1531 let subfields = value
1532 .into_object()
1533 .unwrap()
1534 .into_iter()
1535 .map(|(k, v)| (k.into(), value_to_string(v)))
1536 .collect();
1537
1538 structured_data.insert(key.into(), subfields);
1539 }
1540
1541 structured_data
1542 }
1543
1544 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1545 #[derive(Copy, Clone, Deserialize, PartialEq, Eq, Debug)]
1546 pub enum Severity {
1547 #[serde(rename(deserialize = "emergency"))]
1548 LOG_EMERG,
1549 #[serde(rename(deserialize = "alert"))]
1550 LOG_ALERT,
1551 #[serde(rename(deserialize = "critical"))]
1552 LOG_CRIT,
1553 #[serde(rename(deserialize = "error"))]
1554 LOG_ERR,
1555 #[serde(rename(deserialize = "warn"))]
1556 LOG_WARNING,
1557 #[serde(rename(deserialize = "notice"))]
1558 LOG_NOTICE,
1559 #[serde(rename(deserialize = "info"))]
1560 LOG_INFO,
1561 #[serde(rename(deserialize = "debug"))]
1562 LOG_DEBUG,
1563 }
1564
1565 impl Severity {
1566 fn from_str(s: &str) -> Option<Self> {
1567 match s {
1568 "emergency" => Some(Self::LOG_EMERG),
1569 "alert" => Some(Self::LOG_ALERT),
1570 "critical" => Some(Self::LOG_CRIT),
1571 "error" => Some(Self::LOG_ERR),
1572 "warn" => Some(Self::LOG_WARNING),
1573 "notice" => Some(Self::LOG_NOTICE),
1574 "info" => Some(Self::LOG_INFO),
1575 "debug" => Some(Self::LOG_DEBUG),
1576
1577 x => {
1578 #[allow(clippy::print_stdout)]
1579 {
1580 println!("converting severity str, got {x}");
1581 }
1582 None
1583 }
1584 }
1585 }
1586 }
1587
1588 #[allow(non_camel_case_types, clippy::upper_case_acronyms)]
1589 #[derive(Copy, Clone, PartialEq, Eq, Deserialize, Debug)]
1590 pub enum Facility {
1591 #[serde(rename(deserialize = "kernel"))]
1592 LOG_KERN = 0 << 3,
1593 #[serde(rename(deserialize = "user"))]
1594 LOG_USER = 1 << 3,
1595 #[serde(rename(deserialize = "mail"))]
1596 LOG_MAIL = 2 << 3,
1597 #[serde(rename(deserialize = "daemon"))]
1598 LOG_DAEMON = 3 << 3,
1599 #[serde(rename(deserialize = "auth"))]
1600 LOG_AUTH = 4 << 3,
1601 #[serde(rename(deserialize = "syslog"))]
1602 LOG_SYSLOG = 5 << 3,
1603 }
1604
1605 impl Facility {
1606 fn from_str(s: &str) -> Option<Self> {
1607 match s {
1608 "kernel" => Some(Self::LOG_KERN),
1609 "user" => Some(Self::LOG_USER),
1610 "mail" => Some(Self::LOG_MAIL),
1611 "daemon" => Some(Self::LOG_DAEMON),
1612 "auth" => Some(Self::LOG_AUTH),
1613 "syslog" => Some(Self::LOG_SYSLOG),
1614 _ => None,
1615 }
1616 }
1617 }
1618
1619 type StructuredData = HashMap<String, HashMap<String, String>>;
1620
1621 fn random_structured_data(
1622 max_map_size: usize,
1623 max_children: usize,
1624 field_len: usize,
1625 ) -> StructuredData {
1626 let amount = rng().random_range(0..max_children);
1627
1628 random_maps(max_map_size, field_len)
1629 .filter(|m| !m.is_empty()) .take(amount)
1631 .enumerate()
1632 .map(|(i, map)| (format!("id{i}"), map))
1633 .collect()
1634 }
1635
1636 fn format_structured_data_rfc5424(data: &StructuredData) -> String {
1637 if data.is_empty() {
1638 "-".to_string()
1639 } else {
1640 let mut res = String::new();
1641 for (id, params) in data {
1642 res = res + "[" + id;
1643 for (name, value) in params {
1644 res = res + " " + name + "=\"" + value + "\"";
1645 }
1646 res += "]";
1647 }
1648
1649 res
1650 }
1651 }
1652
1653 const fn encode_priority(severity: Severity, facility: Facility) -> u8 {
1654 facility as u8 | severity as u8
1655 }
1656
1657 fn value_to_string(v: Value) -> String {
1658 if v.is_bytes() {
1659 let buf = v.as_bytes().unwrap();
1660 String::from_utf8_lossy(buf).to_string()
1661 } else if v.is_timestamp() {
1662 let ts = v.as_timestamp().unwrap();
1663 ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1664 } else {
1665 v.to_string()
1666 }
1667 }
1668}